1
2
3
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import native_str
9 standard_library.install_aliases()
10 from builtins import next
11 from builtins import str
12 from builtins import range
13 from builtins import object
14 import collections
15 import datetime
16 import hashlib
17 import io
18 import logging
19 import math
20 import os
21 import pycurl
22 import queue
23 import re
24 import socket
25 import ssl
26 import sys
27 import threading
28 from . import timer
29 import urllib.parse
30
31 if sys.version_info >= (3, 0):
32 from io import BytesIO
33 else:
34 from cStringIO import StringIO as BytesIO
35
36 import arvados
37 import arvados.config as config
38 import arvados.errors
39 import arvados.retry as retry
40 import arvados.util
41
42 _logger = logging.getLogger('arvados.keep')
43 global_client_object = None
44
45
46
47
48 if sys.platform == 'darwin':
49 if not hasattr(socket, 'TCP_KEEPALIVE'):
50 socket.TCP_KEEPALIVE = 0x010
51 if not hasattr(socket, 'TCP_KEEPINTVL'):
52 socket.TCP_KEEPINTVL = 0x101
53 if not hasattr(socket, 'TCP_KEEPCNT'):
54 socket.TCP_KEEPCNT = 0x102
58 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
59 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
60
62 self.hints = []
63 self._perm_sig = None
64 self._perm_expiry = None
65 pieces = iter(locator_str.split('+'))
66 self.md5sum = next(pieces)
67 try:
68 self.size = int(next(pieces))
69 except StopIteration:
70 self.size = None
71 for hint in pieces:
72 if self.HINT_RE.match(hint) is None:
73 raise ValueError("invalid hint format: {}".format(hint))
74 elif hint.startswith('A'):
75 self.parse_permission_hint(hint)
76 else:
77 self.hints.append(hint)
78
85
87 if self.size is not None:
88 return "%s+%i" % (self.md5sum, self.size)
89 else:
90 return self.md5sum
91
93
94
95 data_name = '_{}'.format(name)
96 def getter(self):
97 return getattr(self, data_name)
98 def setter(self, hex_str):
99 if not arvados.util.is_hex(hex_str, length):
100 raise ValueError("{} is not a {}-digit hex string: {!r}".
101 format(name, length, hex_str))
102 setattr(self, data_name, hex_str)
103 return property(getter, setter)
104
105 md5sum = _make_hex_prop('md5sum', 32)
106 perm_sig = _make_hex_prop('perm_sig', 40)
107
108 @property
110 return self._perm_expiry
111
112 @perm_expiry.setter
114 if not arvados.util.is_hex(value, 1, 8):
115 raise ValueError(
116 "permission timestamp must be a hex Unix timestamp: {}".
117 format(value))
118 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
119
126
132
134 if self.perm_expiry is None:
135 return False
136 elif as_of_dt is None:
137 as_of_dt = datetime.datetime.now()
138 return self.perm_expiry <= as_of_dt
139
140
141 -class Keep(object):
142 """Simple interface to a global KeepClient object.
143
144 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
145 own API client. The global KeepClient will build an API client from the
146 current Arvados configuration, which may not match the one you built.
147 """
148 _last_key = None
149
150 @classmethod
167
168 @staticmethod
169 - def get(locator, **kwargs):
171
172 @staticmethod
173 - def put(data, **kwargs):
175
177
178 - def __init__(self, cache_max=(256 * 1024 * 1024)):
179 self.cache_max = cache_max
180 self._cache = []
181 self._cache_lock = threading.Lock()
182
184 __slots__ = ("locator", "ready", "content")
185
187 self.locator = locator
188 self.ready = threading.Event()
189 self.content = None
190
192 self.ready.wait()
193 return self.content
194
195 - def set(self, value):
196 self.content = value
197 self.ready.set()
198
200 if self.content is None:
201 return 0
202 else:
203 return len(self.content)
204
206 '''Cap the cache size to self.cache_max'''
207 with self._cache_lock:
208
209
210 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
211 sm = sum([slot.size() for slot in self._cache])
212 while len(self._cache) > 0 and sm > self.cache_max:
213 for i in range(len(self._cache)-1, -1, -1):
214 if self._cache[i].ready.is_set():
215 del self._cache[i]
216 break
217 sm = sum([slot.size() for slot in self._cache])
218
219 - def _get(self, locator):
220
221 for i in range(0, len(self._cache)):
222 if self._cache[i].locator == locator:
223 n = self._cache[i]
224 if i != 0:
225
226 del self._cache[i]
227 self._cache.insert(0, n)
228 return n
229 return None
230
231 - def get(self, locator):
232 with self._cache_lock:
233 return self._get(locator)
234
236 '''Reserve a cache slot for the specified locator,
237 or return the existing slot.'''
238 with self._cache_lock:
239 n = self._get(locator)
240 if n:
241 return n, False
242 else:
243
244 n = KeepBlockCache.CacheSlot(locator)
245 self._cache.insert(0, n)
246 return n, True
247
250 self._lk = threading.Lock()
251 self._val = v
252
254 with self._lk:
255 self._val += v
256
258 with self._lk:
259 return self._val
260
263
264
265
266
267
268
269
270 DEFAULT_TIMEOUT = (2, 256, 32768)
271 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
272
273
275 """Make requests to a single Keep service, and track results.
276
277 A KeepService is intended to last long enough to perform one
278 transaction (GET or PUT) against one Keep service. This can
279 involve calling either get() or put() multiple times in order
280 to retry after transient failures. However, calling both get()
281 and put() on a single instance -- or using the same instance
282 to access two different Keep services -- will not produce
283 sensible behavior.
284 """
285
286 HTTP_ERRORS = (
287 socket.error,
288 ssl.SSLError,
289 arvados.errors.HttpError,
290 )
291
292 - def __init__(self, root, user_agent_pool=queue.LifoQueue(),
293 upload_counter=None,
294 download_counter=None,
295 headers={},
296 insecure=False):
297 self.root = root
298 self._user_agent_pool = user_agent_pool
299 self._result = {'error': None}
300 self._usable = True
301 self._session = None
302 self._socket = None
303 self.get_headers = {'Accept': 'application/octet-stream'}
304 self.get_headers.update(headers)
305 self.put_headers = headers
306 self.upload_counter = upload_counter
307 self.download_counter = download_counter
308 self.insecure = insecure
309
311 """Is it worth attempting a request?"""
312 return self._usable
313
315 """Did the request succeed or encounter permanent failure?"""
316 return self._result['error'] == False or not self._usable
317
320
322 try:
323 return self._user_agent_pool.get(block=False)
324 except queue.Empty:
325 return pycurl.Curl()
326
328 try:
329 ua.reset()
330 self._user_agent_pool.put(ua, block=False)
331 except:
332 ua.close()
333
339
341 return self._socket_open_pycurl_7_21_5(
342 purpose=None,
343 address=collections.namedtuple(
344 'Address', ['family', 'socktype', 'protocol', 'addr'],
345 )(family, socktype, protocol, address))
346
348 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
349 s = socket.socket(address.family, address.socktype, address.protocol)
350 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
351
352 if hasattr(socket, 'TCP_KEEPIDLE'):
353 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
354 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
355 self._socket = s
356 return s
357
358 - def get(self, locator, method="GET", timeout=None):
359
360 url = self.root + str(locator)
361 _logger.debug("Request: %s %s", method, url)
362 curl = self._get_user_agent()
363 ok = None
364 try:
365 with timer.Timer() as t:
366 self._headers = {}
367 response_body = BytesIO()
368 curl.setopt(pycurl.NOSIGNAL, 1)
369 curl.setopt(pycurl.OPENSOCKETFUNCTION,
370 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
371 curl.setopt(pycurl.URL, url.encode('utf-8'))
372 curl.setopt(pycurl.HTTPHEADER, [
373 '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
374 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
375 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
376 if self.insecure:
377 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
378 if method == "HEAD":
379 curl.setopt(pycurl.NOBODY, True)
380 self._setcurltimeouts(curl, timeout)
381
382 try:
383 curl.perform()
384 except Exception as e:
385 raise arvados.errors.HttpError(0, str(e))
386 finally:
387 if self._socket:
388 self._socket.close()
389 self._socket = None
390 self._result = {
391 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
392 'body': response_body.getvalue(),
393 'headers': self._headers,
394 'error': False,
395 }
396
397 ok = retry.check_http_response_success(self._result['status_code'])
398 if not ok:
399 self._result['error'] = arvados.errors.HttpError(
400 self._result['status_code'],
401 self._headers.get('x-status-line', 'Error'))
402 except self.HTTP_ERRORS as e:
403 self._result = {
404 'error': e,
405 }
406 self._usable = ok != False
407 if self._result.get('status_code', None):
408
409
410
411 self._put_user_agent(curl)
412 else:
413
414
415 curl.close()
416 if not ok:
417 _logger.debug("Request fail: GET %s => %s: %s",
418 url, type(self._result['error']), str(self._result['error']))
419 return None
420 if method == "HEAD":
421 _logger.info("HEAD %s: %s bytes",
422 self._result['status_code'],
423 self._result.get('content-length'))
424 return True
425
426 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
427 self._result['status_code'],
428 len(self._result['body']),
429 t.msecs,
430 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
431
432 if self.download_counter:
433 self.download_counter.add(len(self._result['body']))
434 resp_md5 = hashlib.md5(self._result['body']).hexdigest()
435 if resp_md5 != locator.md5sum:
436 _logger.warning("Checksum fail: md5(%s) = %s",
437 url, resp_md5)
438 self._result['error'] = arvados.errors.HttpError(
439 0, 'Checksum fail')
440 return None
441 return self._result['body']
442
443 - def put(self, hash_s, body, timeout=None):
444 url = self.root + hash_s
445 _logger.debug("Request: PUT %s", url)
446 curl = self._get_user_agent()
447 ok = None
448 try:
449 with timer.Timer() as t:
450 self._headers = {}
451 body_reader = BytesIO(body)
452 response_body = BytesIO()
453 curl.setopt(pycurl.NOSIGNAL, 1)
454 curl.setopt(pycurl.OPENSOCKETFUNCTION,
455 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
456 curl.setopt(pycurl.URL, url.encode('utf-8'))
457
458
459
460
461
462
463 curl.setopt(pycurl.UPLOAD, True)
464 curl.setopt(pycurl.INFILESIZE, len(body))
465 curl.setopt(pycurl.READFUNCTION, body_reader.read)
466 curl.setopt(pycurl.HTTPHEADER, [
467 '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
468 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
469 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
470 if self.insecure:
471 curl.setopt(pycurl.SSL_VERIFYPEER, 0)
472 self._setcurltimeouts(curl, timeout)
473 try:
474 curl.perform()
475 except Exception as e:
476 raise arvados.errors.HttpError(0, str(e))
477 finally:
478 if self._socket:
479 self._socket.close()
480 self._socket = None
481 self._result = {
482 'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
483 'body': response_body.getvalue().decode('utf-8'),
484 'headers': self._headers,
485 'error': False,
486 }
487 ok = retry.check_http_response_success(self._result['status_code'])
488 if not ok:
489 self._result['error'] = arvados.errors.HttpError(
490 self._result['status_code'],
491 self._headers.get('x-status-line', 'Error'))
492 except self.HTTP_ERRORS as e:
493 self._result = {
494 'error': e,
495 }
496 self._usable = ok != False
497 if self._result.get('status_code', None):
498
499 self._put_user_agent(curl)
500 else:
501 curl.close()
502 if not ok:
503 _logger.debug("Request fail: PUT %s => %s: %s",
504 url, type(self._result['error']), str(self._result['error']))
505 return False
506 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
507 self._result['status_code'],
508 len(body),
509 t.msecs,
510 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
511 if self.upload_counter:
512 self.upload_counter.add(len(body))
513 return True
514
516 if not timeouts:
517 return
518 elif isinstance(timeouts, tuple):
519 if len(timeouts) == 2:
520 conn_t, xfer_t = timeouts
521 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
522 else:
523 conn_t, xfer_t, bandwidth_bps = timeouts
524 else:
525 conn_t, xfer_t = (timeouts, timeouts)
526 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
527 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
528 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
529 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
530
532 if isinstance(header_line, bytes):
533 header_line = header_line.decode('iso-8859-1')
534 if ':' in header_line:
535 name, value = header_line.split(':', 1)
536 name = name.strip().lower()
537 value = value.strip()
538 elif self._headers:
539 name = self._lastheadername
540 value = self._headers[name] + ' ' + header_line.strip()
541 elif header_line.startswith('HTTP/'):
542 name = 'x-status-line'
543 value = header_line
544 else:
545 _logger.error("Unexpected header line: %s", header_line)
546 return
547 self._lastheadername = name
548 self._headers[name] = value
549
550
551
554 queue.Queue.__init__(self)
555 self.wanted_copies = copies
556 self.successful_copies = 0
557 self.response = None
558 self.successful_copies_lock = threading.Lock()
559 self.pending_tries = copies
560 self.pending_tries_notification = threading.Condition()
561
563 with self.successful_copies_lock:
564 self.successful_copies += replicas_nr
565 self.response = response
566 with self.pending_tries_notification:
567 self.pending_tries_notification.notify_all()
568
570 with self.pending_tries_notification:
571 self.pending_tries += 1
572 self.pending_tries_notification.notify()
573
575 with self.successful_copies_lock:
576 return self.wanted_copies - self.successful_copies
577
579 with self.pending_tries_notification:
580 while True:
581 if self.pending_copies() < 1:
582
583
584
585
586
587
588 self.pending_tries_notification.notify_all()
589
590 while True:
591 self.get_nowait()
592 self.task_done()
593 elif self.pending_tries > 0:
594 service, service_root = self.get_nowait()
595 if service.finished():
596 self.task_done()
597 continue
598 self.pending_tries -= 1
599 return service, service_root
600 elif self.empty():
601 self.pending_tries_notification.notify_all()
602 raise queue.Empty
603 else:
604 self.pending_tries_notification.wait()
605
606
608 - def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
609 self.total_task_nr = 0
610 self.wanted_copies = copies
611 if (not max_service_replicas) or (max_service_replicas >= copies):
612 num_threads = 1
613 else:
614 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
615 _logger.debug("Pool max threads is %d", num_threads)
616 self.workers = []
617 self.queue = KeepClient.KeepWriterQueue(copies)
618
619 for _ in range(num_threads):
620 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
621 self.workers.append(w)
622
624 self.queue.put((ks, service_root))
625 self.total_task_nr += 1
626
628 return self.queue.successful_copies
629
631
632 for worker in self.workers:
633 worker.start()
634
635 self.queue.join()
636
639
640
642 TaskFailed = RuntimeError()
643
644 - def __init__(self, queue, data, data_hash, timeout=None):
645 super(KeepClient.KeepWriterThread, self).__init__()
646 self.timeout = timeout
647 self.queue = queue
648 self.data = data
649 self.data_hash = data_hash
650 self.daemon = True
651
653 while True:
654 try:
655 service, service_root = self.queue.get_next_task()
656 except queue.Empty:
657 return
658 try:
659 locator, copies = self.do_task(service, service_root)
660 except Exception as e:
661 if e is not self.TaskFailed:
662 _logger.exception("Exception in KeepWriterThread")
663 self.queue.write_fail(service)
664 else:
665 self.queue.write_success(locator, copies)
666 finally:
667 self.queue.task_done()
668
669 - def do_task(self, service, service_root):
670 success = bool(service.put(self.data_hash,
671 self.data,
672 timeout=self.timeout))
673 result = service.last_result()
674
675 if not success:
676 if result.get('status_code', None):
677 _logger.debug("Request fail: PUT %s => %s %s",
678 self.data_hash,
679 result['status_code'],
680 result['body'])
681 raise self.TaskFailed
682
683 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
684 str(threading.current_thread()),
685 self.data_hash,
686 len(self.data),
687 service_root)
688 try:
689 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
690 except (KeyError, ValueError):
691 replicas_stored = 1
692
693 return result['body'].strip(), replicas_stored
694
695
700 """Initialize a new KeepClient.
701
702 Arguments:
703 :api_client:
704 The API client to use to find Keep services. If not
705 provided, KeepClient will build one from available Arvados
706 configuration.
707
708 :proxy:
709 If specified, this KeepClient will send requests to this Keep
710 proxy. Otherwise, KeepClient will fall back to the setting of the
711 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
712 If you want to KeepClient does not use a proxy, pass in an empty
713 string.
714
715 :timeout:
716 The initial timeout (in seconds) for HTTP requests to Keep
717 non-proxy servers. A tuple of three floats is interpreted as
718 (connection_timeout, read_timeout, minimum_bandwidth). A connection
719 will be aborted if the average traffic rate falls below
720 minimum_bandwidth bytes per second over an interval of read_timeout
721 seconds. Because timeouts are often a result of transient server
722 load, the actual connection timeout will be increased by a factor
723 of two on each retry.
724 Default: (2, 256, 32768).
725
726 :proxy_timeout:
727 The initial timeout (in seconds) for HTTP requests to
728 Keep proxies. A tuple of three floats is interpreted as
729 (connection_timeout, read_timeout, minimum_bandwidth). The behavior
730 described above for adjusting connection timeouts on retry also
731 applies.
732 Default: (20, 256, 32768).
733
734 :api_token:
735 If you're not using an API client, but only talking
736 directly to a Keep proxy, this parameter specifies an API token
737 to authenticate Keep requests. It is an error to specify both
738 api_client and api_token. If you specify neither, KeepClient
739 will use one available from the Arvados configuration.
740
741 :local_store:
742 If specified, this KeepClient will bypass Keep
743 services, and save data to the named directory. If unspecified,
744 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
745 environment variable. If you want to ensure KeepClient does not
746 use local storage, pass in an empty string. This is primarily
747 intended to mock a server for testing.
748
749 :num_retries:
750 The default number of times to retry failed requests.
751 This will be used as the default num_retries value when get() and
752 put() are called. Default 0.
753 """
754 self.lock = threading.Lock()
755 if proxy is None:
756 if config.get('ARVADOS_KEEP_SERVICES'):
757 proxy = config.get('ARVADOS_KEEP_SERVICES')
758 else:
759 proxy = config.get('ARVADOS_KEEP_PROXY')
760 if api_token is None:
761 if api_client is None:
762 api_token = config.get('ARVADOS_API_TOKEN')
763 else:
764 api_token = api_client.api_token
765 elif api_client is not None:
766 raise ValueError(
767 "can't build KeepClient with both API client and token")
768 if local_store is None:
769 local_store = os.environ.get('KEEP_LOCAL_STORE')
770
771 if api_client is None:
772 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
773 else:
774 self.insecure = api_client.insecure
775
776 self.block_cache = block_cache if block_cache else KeepBlockCache()
777 self.timeout = timeout
778 self.proxy_timeout = proxy_timeout
779 self._user_agent_pool = queue.LifoQueue()
780 self.upload_counter = Counter()
781 self.download_counter = Counter()
782 self.put_counter = Counter()
783 self.get_counter = Counter()
784 self.hits_counter = Counter()
785 self.misses_counter = Counter()
786
787 if local_store:
788 self.local_store = local_store
789 self.get = self.local_store_get
790 self.put = self.local_store_put
791 else:
792 self.num_retries = num_retries
793 self.max_replicas_per_service = None
794 if proxy:
795 proxy_uris = proxy.split()
796 for i in range(len(proxy_uris)):
797 if not proxy_uris[i].endswith('/'):
798 proxy_uris[i] += '/'
799
800 url = urllib.parse.urlparse(proxy_uris[i])
801 if not (url.scheme and url.netloc):
802 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
803 self.api_token = api_token
804 self._gateway_services = {}
805 self._keep_services = [{
806 'uuid': "00000-bi6l4-%015d" % idx,
807 'service_type': 'proxy',
808 '_service_root': uri,
809 } for idx, uri in enumerate(proxy_uris)]
810 self._writable_services = self._keep_services
811 self.using_proxy = True
812 self._static_services_list = True
813 else:
814
815
816 if api_client is None:
817 api_client = arvados.api('v1')
818 self.api_client = api_client
819 self.api_token = api_client.api_token
820 self._gateway_services = {}
821 self._keep_services = None
822 self._writable_services = None
823 self.using_proxy = None
824 self._static_services_list = False
825
827 """Return the appropriate timeout to use for this client.
828
829 The proxy timeout setting if the backend service is currently a proxy,
830 the regular timeout setting otherwise. The `attempt_number` indicates
831 how many times the operation has been tried already (starting from 0
832 for the first try), and scales the connection timeout portion of the
833 return value accordingly.
834
835 """
836
837
838 t = self.proxy_timeout if self.using_proxy else self.timeout
839 if len(t) == 2:
840 return (t[0] * (1 << attempt_number), t[1])
841 else:
842 return (t[0] * (1 << attempt_number), t[1], t[2])
844 return any(ks.get('service_type', 'disk') != 'disk'
845 for ks in service_list)
846
848 if (self._static_services_list or
849 (self._keep_services and not force_rebuild)):
850 return
851 with self.lock:
852 try:
853 keep_services = self.api_client.keep_services().accessible()
854 except Exception:
855 keep_services = self.api_client.keep_disks().list()
856
857
858
859
860 self._gateway_services = {ks['uuid']: ks for ks in
861 keep_services.execute()['items']}
862 if not self._gateway_services:
863 raise arvados.errors.NoKeepServersError()
864
865
866 for r in self._gateway_services.values():
867 host = r['service_host']
868 if not host.startswith('[') and host.find(':') >= 0:
869
870 host = '[' + host + ']'
871 r['_service_root'] = "{}://{}:{:d}/".format(
872 'https' if r['service_ssl_flag'] else 'http',
873 host,
874 r['service_port'])
875
876 _logger.debug(str(self._gateway_services))
877 self._keep_services = [
878 ks for ks in self._gateway_services.values()
879 if not ks.get('service_type', '').startswith('gateway:')]
880 self._writable_services = [ks for ks in self._keep_services
881 if not ks.get('read_only')]
882
883
884
885 if self._any_nondisk_services(self._writable_services):
886 self.max_replicas_per_service = None
887 else:
888 self.max_replicas_per_service = 1
889
891 """Compute the weight of a Keep service endpoint for a data
892 block with a known hash.
893
894 The weight is md5(h + u) where u is the last 15 characters of
895 the service endpoint's UUID.
896 """
897 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
898
900 """Return an array of Keep service endpoints, in the order in
901 which they should be probed when reading or writing data with
902 the given hash+hints.
903 """
904 self.build_services_list(force_rebuild)
905
906 sorted_roots = []
907
908
909
910 for hint in locator.hints:
911 if hint.startswith('K@'):
912 if len(hint) == 7:
913 sorted_roots.append(
914 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
915 elif len(hint) == 29:
916 svc = self._gateway_services.get(hint[2:])
917 if svc:
918 sorted_roots.append(svc['_service_root'])
919
920
921
922
923 use_services = self._keep_services
924 if need_writable:
925 use_services = self._writable_services
926 self.using_proxy = self._any_nondisk_services(use_services)
927 sorted_roots.extend([
928 svc['_service_root'] for svc in sorted(
929 use_services,
930 reverse=True,
931 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
932 _logger.debug("{}: {}".format(locator, sorted_roots))
933 return sorted_roots
934
935 - def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
936
937
938
939
940 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
941 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
942 for root in local_roots:
943 if root not in roots_map:
944 roots_map[root] = self.KeepService(
945 root, self._user_agent_pool,
946 upload_counter=self.upload_counter,
947 download_counter=self.download_counter,
948 headers=headers,
949 insecure=self.insecure)
950 return local_roots
951
952 @staticmethod
954
955
956
957
958
959 if isinstance(result, Exception):
960 return None
961 result, tried_server_count = result
962 if (result is not None) and (result is not False):
963 return True
964 elif tried_server_count < 1:
965 _logger.info("No more Keep services to try; giving up")
966 return False
967 else:
968 return None
969
971 """Fetch a block only if is in the cache, otherwise return None."""
972 slot = self.block_cache.get(loc)
973 if slot is not None and slot.ready.is_set():
974 return slot.get()
975 else:
976 return None
977
978 @retry.retry_method
979 - def head(self, loc_s, **kwargs):
980 return self._get_or_head(loc_s, method="HEAD", **kwargs)
981
982 @retry.retry_method
983 - def get(self, loc_s, **kwargs):
985
986 - def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
987 """Get data from Keep.
988
989 This method fetches one or more blocks of data from Keep. It
990 sends a request each Keep service registered with the API
991 server (or the proxy provided when this client was
992 instantiated), then each service named in location hints, in
993 sequence. As soon as one service provides the data, it's
994 returned.
995
996 Arguments:
997 * loc_s: A string of one or more comma-separated locators to fetch.
998 This method returns the concatenation of these blocks.
999 * num_retries: The number of times to retry GET requests to
1000 *each* Keep server if it returns temporary failures, with
1001 exponential backoff. Note that, in each loop, the method may try
1002 to fetch data from every available Keep service, along with any
1003 that are named in location hints in the locator. The default value
1004 is set when the KeepClient is initialized.
1005 """
1006 if ',' in loc_s:
1007 return ''.join(self.get(x) for x in loc_s.split(','))
1008
1009 self.get_counter.add(1)
1010
1011 slot = None
1012 blob = None
1013 try:
1014 locator = KeepLocator(loc_s)
1015 if method == "GET":
1016 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1017 if not first:
1018 self.hits_counter.add(1)
1019 blob = slot.get()
1020 if blob is None:
1021 raise arvados.errors.KeepReadError(
1022 "failed to read {}".format(loc_s))
1023 return blob
1024
1025 self.misses_counter.add(1)
1026
1027 headers = {
1028 'X-Request-Id': (request_id or
1029 (hasattr(self, 'api_client') and self.api_client.request_id) or
1030 arvados.util.new_request_id()),
1031 }
1032
1033
1034
1035
1036
1037 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1038 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1039 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1040 for hint in locator.hints if (
1041 hint.startswith('K@') and
1042 len(hint) == 29 and
1043 self._gateway_services.get(hint[2:])
1044 )])
1045
1046 roots_map = {
1047 root: self.KeepService(root, self._user_agent_pool,
1048 upload_counter=self.upload_counter,
1049 download_counter=self.download_counter,
1050 headers=headers,
1051 insecure=self.insecure)
1052 for root in hint_roots
1053 }
1054
1055
1056
1057
1058
1059
1060 sorted_roots = []
1061 roots_map = {}
1062 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1063 backoff_start=2)
1064 for tries_left in loop:
1065 try:
1066 sorted_roots = self.map_new_services(
1067 roots_map, locator,
1068 force_rebuild=(tries_left < num_retries),
1069 need_writable=False,
1070 headers=headers)
1071 except Exception as error:
1072 loop.save_result(error)
1073 continue
1074
1075
1076
1077 services_to_try = [roots_map[root]
1078 for root in sorted_roots
1079 if roots_map[root].usable()]
1080 for keep_service in services_to_try:
1081 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1082 if blob is not None:
1083 break
1084 loop.save_result((blob, len(services_to_try)))
1085
1086
1087 if loop.success():
1088 if method == "HEAD":
1089 return True
1090 else:
1091 return blob
1092 finally:
1093 if slot is not None:
1094 slot.set(blob)
1095 self.block_cache.cap_cache()
1096
1097
1098
1099 not_founds = sum(1 for key in sorted_roots
1100 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1101 service_errors = ((key, roots_map[key].last_result()['error'])
1102 for key in sorted_roots)
1103 if not roots_map:
1104 raise arvados.errors.KeepReadError(
1105 "failed to read {}: no Keep services available ({})".format(
1106 loc_s, loop.last_result()))
1107 elif not_founds == len(sorted_roots):
1108 raise arvados.errors.NotFoundError(
1109 "{} not found".format(loc_s), service_errors)
1110 else:
1111 raise arvados.errors.KeepReadError(
1112 "failed to read {}".format(loc_s), service_errors, label="service")
1113
1114 @retry.retry_method
1115 - def put(self, data, copies=2, num_retries=None, request_id=None):
1116 """Save data in Keep.
1117
1118 This method will get a list of Keep services from the API server, and
1119 send the data to each one simultaneously in a new thread. Once the
1120 uploads are finished, if enough copies are saved, this method returns
1121 the most recent HTTP response body. If requests fail to upload
1122 enough copies, this method raises KeepWriteError.
1123
1124 Arguments:
1125 * data: The string of data to upload.
1126 * copies: The number of copies that the user requires be saved.
1127 Default 2.
1128 * num_retries: The number of times to retry PUT requests to
1129 *each* Keep server if it returns temporary failures, with
1130 exponential backoff. The default value is set when the
1131 KeepClient is initialized.
1132 """
1133
1134 if not isinstance(data, bytes):
1135 data = data.encode()
1136
1137 self.put_counter.add(1)
1138
1139 data_hash = hashlib.md5(data).hexdigest()
1140 loc_s = data_hash + '+' + str(len(data))
1141 if copies < 1:
1142 return loc_s
1143 locator = KeepLocator(loc_s)
1144
1145 headers = {
1146 'X-Request-Id': (request_id or
1147 (hasattr(self, 'api_client') and self.api_client.request_id) or
1148 arvados.util.new_request_id()),
1149 'X-Keep-Desired-Replicas': str(copies),
1150 }
1151 roots_map = {}
1152 loop = retry.RetryLoop(num_retries, self._check_loop_result,
1153 backoff_start=2)
1154 done = 0
1155 for tries_left in loop:
1156 try:
1157 sorted_roots = self.map_new_services(
1158 roots_map, locator,
1159 force_rebuild=(tries_left < num_retries),
1160 need_writable=True,
1161 headers=headers)
1162 except Exception as error:
1163 loop.save_result(error)
1164 continue
1165
1166 writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1167 data_hash=data_hash,
1168 copies=copies - done,
1169 max_service_replicas=self.max_replicas_per_service,
1170 timeout=self.current_timeout(num_retries - tries_left))
1171 for service_root, ks in [(root, roots_map[root])
1172 for root in sorted_roots]:
1173 if ks.finished():
1174 continue
1175 writer_pool.add_task(ks, service_root)
1176 writer_pool.join()
1177 done += writer_pool.done()
1178 loop.save_result((done >= copies, writer_pool.total_task_nr))
1179
1180 if loop.success():
1181 return writer_pool.response()
1182 if not roots_map:
1183 raise arvados.errors.KeepWriteError(
1184 "failed to write {}: no Keep services available ({})".format(
1185 data_hash, loop.last_result()))
1186 else:
1187 service_errors = ((key, roots_map[key].last_result()['error'])
1188 for key in sorted_roots
1189 if roots_map[key].last_result()['error'])
1190 raise arvados.errors.KeepWriteError(
1191 "failed to write {} (wanted {} copies but wrote {})".format(
1192 data_hash, copies, writer_pool.done()), service_errors, label="service")
1193
1195 """A stub for put().
1196
1197 This method is used in place of the real put() method when
1198 using local storage (see constructor's local_store argument).
1199
1200 copies and num_retries arguments are ignored: they are here
1201 only for the sake of offering the same call signature as
1202 put().
1203
1204 Data stored this way can be retrieved via local_store_get().
1205 """
1206 md5 = hashlib.md5(data).hexdigest()
1207 locator = '%s+%d' % (md5, len(data))
1208 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1209 f.write(data)
1210 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1211 os.path.join(self.local_store, md5))
1212 return locator
1213
1225
1228