Package arvados :: Module events
[hide private]
[frames] | no frames]

Source Code for Module arvados.events

  1  # Copyright (C) The Arvados Authors. All rights reserved. 
  2  # 
  3  # SPDX-License-Identifier: Apache-2.0 
  4   
  5  from __future__ import absolute_import 
  6  from future import standard_library 
  7  standard_library.install_aliases() 
  8  from builtins import str 
  9  from builtins import object 
 10  import arvados 
 11  from . import config 
 12  from . import errors 
 13  from .retry import RetryLoop 
 14   
 15  import logging 
 16  import json 
 17  import _thread 
 18  import threading 
 19  import time 
 20  import os 
 21  import re 
 22  import ssl 
 23  from ws4py.client.threadedclient import WebSocketClient 
 24   
 25  _logger = logging.getLogger('arvados.events') 
 26   
 27   
28 -class _EventClient(WebSocketClient):
29 - def __init__(self, url, filters, on_event, last_log_id, on_closed):
30 ssl_options = {'ca_certs': arvados.util.ca_certs_path()} 31 if config.flag_is_true('ARVADOS_API_HOST_INSECURE'): 32 ssl_options['cert_reqs'] = ssl.CERT_NONE 33 else: 34 ssl_options['cert_reqs'] = ssl.CERT_REQUIRED 35 36 # Warning: If the host part of url resolves to both IPv6 and 37 # IPv4 addresses (common with "localhost"), only one of them 38 # will be attempted -- and it might not be the right one. See 39 # ws4py's WebSocketBaseClient.__init__. 40 super(_EventClient, self).__init__(url, ssl_options=ssl_options) 41 42 self.filters = filters 43 self.on_event = on_event 44 self.last_log_id = last_log_id 45 self._closing_lock = threading.RLock() 46 self._closing = False 47 self._closed = threading.Event() 48 self.on_closed = on_closed
49
50 - def opened(self):
51 for f in self.filters: 52 self.subscribe(f, self.last_log_id)
53
54 - def closed(self, code, reason=None):
55 self._closed.set() 56 self.on_closed()
57
58 - def received_message(self, m):
59 with self._closing_lock: 60 if not self._closing: 61 self.on_event(json.loads(str(m)))
62
63 - def close(self, code=1000, reason='', timeout=0):
64 """Close event client and optionally wait for it to finish. 65 66 :timeout: is the number of seconds to wait for ws4py to 67 indicate that the connection has closed. 68 """ 69 super(_EventClient, self).close(code, reason) 70 with self._closing_lock: 71 # make sure we don't process any more messages. 72 self._closing = True 73 # wait for ws4py to tell us the connection is closed. 74 self._closed.wait(timeout=timeout)
75
76 - def subscribe(self, f, last_log_id=None):
77 m = {"method": "subscribe", "filters": f} 78 if last_log_id is not None: 79 m["last_log_id"] = last_log_id 80 self.send(json.dumps(m))
81
82 - def unsubscribe(self, f):
83 self.send(json.dumps({"method": "unsubscribe", "filters": f}))
84 85
86 -class EventClient(object):
87 - def __init__(self, url, filters, on_event_cb, last_log_id):
88 self.url = url 89 if filters: 90 self.filters = [filters] 91 else: 92 self.filters = [[]] 93 self.on_event_cb = on_event_cb 94 self.last_log_id = last_log_id 95 self.is_closed = threading.Event() 96 self._setup_event_client()
97
98 - def _setup_event_client(self):
99 self.ec = _EventClient(self.url, self.filters, self.on_event, 100 self.last_log_id, self.on_closed) 101 self.ec.daemon = True 102 try: 103 self.ec.connect() 104 except Exception: 105 self.ec.close_connection() 106 raise
107
108 - def subscribe(self, f, last_log_id=None):
109 self.filters.append(f) 110 self.ec.subscribe(f, last_log_id)
111
112 - def unsubscribe(self, f):
113 del self.filters[self.filters.index(f)] 114 self.ec.unsubscribe(f)
115
116 - def close(self, code=1000, reason='', timeout=0):
117 self.is_closed.set() 118 self.ec.close(code, reason, timeout)
119
120 - def on_event(self, m):
121 if m.get('id') != None: 122 self.last_log_id = m.get('id') 123 try: 124 self.on_event_cb(m) 125 except Exception as e: 126 _logger.exception("Unexpected exception from event callback.") 127 _thread.interrupt_main()
128
129 - def on_closed(self):
130 if not self.is_closed.is_set(): 131 _logger.warning("Unexpected close. Reconnecting.") 132 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15): 133 try: 134 self._setup_event_client() 135 _logger.warning("Reconnect successful.") 136 break 137 except Exception as e: 138 _logger.warning("Error '%s' during websocket reconnect.", e) 139 if tries_left == 0: 140 _logger.exception("EventClient thread could not contact websocket server.") 141 self.is_closed.set() 142 _thread.interrupt_main() 143 return
144
145 - def run_forever(self):
146 # Have to poll here to let KeyboardInterrupt get raised. 147 while not self.is_closed.wait(1): 148 pass
149 150
151 -class PollClient(threading.Thread):
152 - def __init__(self, api, filters, on_event, poll_time, last_log_id):
153 super(PollClient, self).__init__() 154 self.api = api 155 if filters: 156 self.filters = [filters] 157 else: 158 self.filters = [[]] 159 self.on_event = on_event 160 self.poll_time = poll_time 161 self.daemon = True 162 self.last_log_id = last_log_id 163 self._closing = threading.Event() 164 self._closing_lock = threading.RLock() 165 166 if self.last_log_id != None: 167 # Caller supplied the last-seen event ID from a previous 168 # connection. 169 self._skip_old_events = [["id", ">", str(self.last_log_id)]] 170 else: 171 # We need to do a reverse-order query to find the most 172 # recent event ID (see "if not self._skip_old_events" 173 # in run()). 174 self._skip_old_events = False
175
176 - def run(self):
177 self.on_event({'status': 200}) 178 179 while not self._closing.is_set(): 180 moreitems = False 181 for f in self.filters: 182 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time): 183 try: 184 if not self._skip_old_events: 185 # If the caller didn't provide a known 186 # recent ID, our first request will ask 187 # for the single most recent event from 188 # the last 2 hours (the time restriction 189 # avoids doing an expensive database 190 # query, and leaves a big enough margin to 191 # account for clock skew). If we do find a 192 # recent event, we remember its ID but 193 # then discard it (we are supposed to be 194 # returning new/current events, not old 195 # ones). 196 # 197 # Subsequent requests will get multiple 198 # events in chronological order, and 199 # filter on that same cutoff time, or 200 # (once we see our first matching event) 201 # the ID of the last-seen event. 202 # 203 # Note: self._skip_old_events must not be 204 # set until the threshold is decided. 205 # Otherwise, tests will be unreliable. 206 filter_by_time = [[ 207 "created_at", ">=", 208 time.strftime( 209 "%Y-%m-%dT%H:%M:%SZ", 210 time.gmtime(time.time()-7200))]] 211 items = self.api.logs().list( 212 order="id desc", 213 limit=1, 214 filters=f+filter_by_time).execute() 215 if items["items"]: 216 self._skip_old_events = [ 217 ["id", ">", str(items["items"][0]["id"])]] 218 items = { 219 "items": [], 220 "items_available": 0, 221 } 222 else: 223 # No recent events. We can keep using 224 # the same timestamp threshold until 225 # we receive our first new event. 226 self._skip_old_events = filter_by_time 227 else: 228 # In this case, either we know the most 229 # recent matching ID, or we know there 230 # were no matching events in the 2-hour 231 # window before subscribing. Either way we 232 # can safely ask for events in ascending 233 # order. 234 items = self.api.logs().list( 235 order="id asc", 236 filters=f+self._skip_old_events).execute() 237 break 238 except errors.ApiError as error: 239 pass 240 else: 241 tries_left = 0 242 break 243 if tries_left == 0: 244 _logger.exception("PollClient thread could not contact API server.") 245 with self._closing_lock: 246 self._closing.set() 247 _thread.interrupt_main() 248 return 249 for i in items["items"]: 250 self._skip_old_events = [["id", ">", str(i["id"])]] 251 with self._closing_lock: 252 if self._closing.is_set(): 253 return 254 try: 255 self.on_event(i) 256 except Exception as e: 257 _logger.exception("Unexpected exception from event callback.") 258 _thread.interrupt_main() 259 if items["items_available"] > len(items["items"]): 260 moreitems = True 261 if not moreitems: 262 self._closing.wait(self.poll_time)
263
264 - def run_forever(self):
265 # Have to poll here, otherwise KeyboardInterrupt will never get processed. 266 while not self._closing.is_set(): 267 self._closing.wait(1)
268
269 - def close(self, code=None, reason=None, timeout=0):
270 """Close poll client and optionally wait for it to finish. 271 272 If an :on_event: handler is running in a different thread, 273 first wait (indefinitely) for it to return. 274 275 After closing, wait up to :timeout: seconds for the thread to 276 finish the poll request in progress (if any). 277 278 :code: and :reason: are ignored. They are present for 279 interface compatibility with EventClient. 280 """ 281 282 with self._closing_lock: 283 self._closing.set() 284 try: 285 self.join(timeout=timeout) 286 except RuntimeError: 287 # "join() raises a RuntimeError if an attempt is made to join the 288 # current thread as that would cause a deadlock. It is also an 289 # error to join() a thread before it has been started and attempts 290 # to do so raises the same exception." 291 pass
292
293 - def subscribe(self, f):
294 self.on_event({'status': 200}) 295 self.filters.append(f)
296
297 - def unsubscribe(self, f):
298 del self.filters[self.filters.index(f)]
299 300
301 -def _subscribe_websocket(api, filters, on_event, last_log_id=None):
302 endpoint = api._rootDesc.get('websocketUrl', None) 303 if not endpoint: 304 raise errors.FeatureNotEnabledError( 305 "Server does not advertise a websocket endpoint") 306 uri_with_token = "{}?api_token={}".format(endpoint, api.api_token) 307 try: 308 client = EventClient(uri_with_token, filters, on_event, last_log_id) 309 except Exception: 310 _logger.warning("Failed to connect to websockets on %s" % endpoint) 311 raise 312 else: 313 return client
314 315
316 -def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
317 """ 318 :api: 319 a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe(). 320 :filters: 321 Initial subscription filters. 322 :on_event: 323 The callback when a message is received. 324 :poll_fallback: 325 If websockets are not available, fall back to polling every N seconds. If poll_fallback=False, this will return None if websockets are not available. 326 :last_log_id: 327 Log rows that are newer than the log id 328 """ 329 330 if not poll_fallback: 331 return _subscribe_websocket(api, filters, on_event, last_log_id) 332 333 try: 334 if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'): 335 return _subscribe_websocket(api, filters, on_event, last_log_id) 336 else: 337 _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true") 338 except Exception as e: 339 _logger.warning("Falling back to polling after websocket error: %s" % e) 340 p = PollClient(api, filters, on_event, poll_fallback, last_log_id) 341 p.start() 342 return p
343