Package arvados :: Module events
[hide private]
[frames] | no frames]

Source Code for Module arvados.events

  1  # Copyright (C) The Arvados Authors. All rights reserved. 
  2  # 
  3  # SPDX-License-Identifier: Apache-2.0 
  4   
  5  from __future__ import absolute_import 
  6  from future import standard_library 
  7  standard_library.install_aliases() 
  8  from builtins import str 
  9  from builtins import object 
 10  import arvados 
 11  from . import config 
 12  from . import errors 
 13  from .retry import RetryLoop 
 14   
 15  import logging 
 16  import json 
 17  import _thread 
 18  import threading 
 19  import time 
 20  import os 
 21  import re 
 22  import ssl 
 23  from ws4py.client.threadedclient import WebSocketClient 
 24   
 25  _logger = logging.getLogger('arvados.events') 
 26   
 27   
28 -class _EventClient(WebSocketClient):
29 - def __init__(self, url, filters, on_event, last_log_id, on_closed):
30 ssl_options = {'ca_certs': arvados.util.ca_certs_path()} 31 if config.flag_is_true('ARVADOS_API_HOST_INSECURE'): 32 ssl_options['cert_reqs'] = ssl.CERT_NONE 33 else: 34 ssl_options['cert_reqs'] = ssl.CERT_REQUIRED 35 36 # Warning: If the host part of url resolves to both IPv6 and 37 # IPv4 addresses (common with "localhost"), only one of them 38 # will be attempted -- and it might not be the right one. See 39 # ws4py's WebSocketBaseClient.__init__. 40 super(_EventClient, self).__init__(url, ssl_options=ssl_options) 41 42 self.filters = filters 43 self.on_event = on_event 44 self.last_log_id = last_log_id 45 self._closing_lock = threading.RLock() 46 self._closing = False 47 self._closed = threading.Event() 48 self.on_closed = on_closed
49
50 - def opened(self):
51 for f in self.filters: 52 self.subscribe(f, self.last_log_id)
53
54 - def closed(self, code, reason=None):
55 self._closed.set() 56 self.on_closed()
57
58 - def received_message(self, m):
59 with self._closing_lock: 60 if not self._closing: 61 self.on_event(json.loads(str(m)))
62
63 - def close(self, code=1000, reason='', timeout=0):
64 """Close event client and optionally wait for it to finish. 65 66 :timeout: is the number of seconds to wait for ws4py to 67 indicate that the connection has closed. 68 """ 69 super(_EventClient, self).close(code, reason) 70 with self._closing_lock: 71 # make sure we don't process any more messages. 72 self._closing = True 73 # wait for ws4py to tell us the connection is closed. 74 self._closed.wait(timeout=timeout)
75
76 - def subscribe(self, f, last_log_id=None):
77 m = {"method": "subscribe", "filters": f} 78 if last_log_id is not None: 79 m["last_log_id"] = last_log_id 80 self.send(json.dumps(m))
81
82 - def unsubscribe(self, f):
83 self.send(json.dumps({"method": "unsubscribe", "filters": f}))
84 85
86 -class EventClient(object):
87 - def __init__(self, url, filters, on_event_cb, last_log_id):
88 self.url = url 89 if filters: 90 self.filters = [filters] 91 else: 92 self.filters = [[]] 93 self.on_event_cb = on_event_cb 94 self.last_log_id = last_log_id 95 self.is_closed = threading.Event() 96 self._setup_event_client()
97
98 - def _setup_event_client(self):
99 self.ec = _EventClient(self.url, self.filters, self.on_event, 100 self.last_log_id, self.on_closed) 101 self.ec.daemon = True 102 try: 103 self.ec.connect() 104 except Exception: 105 self.ec.close_connection() 106 raise
107
108 - def subscribe(self, f, last_log_id=None):
109 self.filters.append(f) 110 self.ec.subscribe(f, last_log_id)
111
112 - def unsubscribe(self, f):
113 del self.filters[self.filters.index(f)] 114 self.ec.unsubscribe(f)
115
116 - def close(self, code=1000, reason='', timeout=0):
117 self.is_closed.set() 118 self.ec.close(code, reason, timeout)
119
120 - def on_event(self, m):
121 if m.get('id') != None: 122 self.last_log_id = m.get('id') 123 try: 124 self.on_event_cb(m) 125 except Exception as e: 126 _logger.exception("Unexpected exception from event callback.") 127 _thread.interrupt_main()
128
129 - def on_closed(self):
130 if not self.is_closed.is_set(): 131 _logger.warning("Unexpected close. Reconnecting.") 132 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15): 133 try: 134 self._setup_event_client() 135 _logger.warning("Reconnect successful.") 136 break 137 except Exception as e: 138 _logger.warning("Error '%s' during websocket reconnect.", e) 139 if tries_left == 0: 140 _logger.exception("EventClient thread could not contact websocket server.") 141 self.is_closed.set() 142 _thread.interrupt_main() 143 return
144
145 - def run_forever(self):
146 # Have to poll here to let KeyboardInterrupt get raised. 147 while not self.is_closed.wait(1): 148 pass
149 150
151 -class PollClient(threading.Thread):
152 - def __init__(self, api, filters, on_event, poll_time, last_log_id):
153 super(PollClient, self).__init__() 154 self.api = api 155 if filters: 156 self.filters = [filters] 157 else: 158 self.filters = [[]] 159 self.on_event = on_event 160 self.poll_time = poll_time 161 self.daemon = True 162 self.last_log_id = last_log_id 163 self._closing = threading.Event() 164 self._closing_lock = threading.RLock()
165
166 - def run(self):
167 if self.last_log_id != None: 168 # Caller supplied the last-seen event ID from a previous 169 # connection 170 skip_old_events = [["id", ">", str(self.last_log_id)]] 171 else: 172 # We need to do a reverse-order query to find the most 173 # recent event ID (see "if not skip_old_events" below). 174 skip_old_events = False 175 176 self.on_event({'status': 200}) 177 178 while not self._closing.is_set(): 179 moreitems = False 180 for f in self.filters: 181 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time): 182 try: 183 if not skip_old_events: 184 # If the caller didn't provide a known 185 # recent ID, our first request will ask 186 # for the single most recent event from 187 # the last 2 hours (the time restriction 188 # avoids doing an expensive database 189 # query, and leaves a big enough margin to 190 # account for clock skew). If we do find a 191 # recent event, we remember its ID but 192 # then discard it (we are supposed to be 193 # returning new/current events, not old 194 # ones). 195 # 196 # Subsequent requests will get multiple 197 # events in chronological order, and 198 # filter on that same cutoff time, or 199 # (once we see our first matching event) 200 # the ID of the last-seen event. 201 skip_old_events = [[ 202 "created_at", ">=", 203 time.strftime( 204 "%Y-%m-%dT%H:%M:%SZ", 205 time.gmtime(time.time()-7200))]] 206 items = self.api.logs().list( 207 order="id desc", 208 limit=1, 209 filters=f+skip_old_events).execute() 210 if items["items"]: 211 skip_old_events = [ 212 ["id", ">", str(items["items"][0]["id"])]] 213 items = { 214 "items": [], 215 "items_available": 0, 216 } 217 else: 218 # In this case, either we know the most 219 # recent matching ID, or we know there 220 # were no matching events in the 2-hour 221 # window before subscribing. Either way we 222 # can safely ask for events in ascending 223 # order. 224 items = self.api.logs().list( 225 order="id asc", 226 filters=f+skip_old_events).execute() 227 break 228 except errors.ApiError as error: 229 pass 230 else: 231 tries_left = 0 232 break 233 if tries_left == 0: 234 _logger.exception("PollClient thread could not contact API server.") 235 with self._closing_lock: 236 self._closing.set() 237 _thread.interrupt_main() 238 return 239 for i in items["items"]: 240 skip_old_events = [["id", ">", str(i["id"])]] 241 with self._closing_lock: 242 if self._closing.is_set(): 243 return 244 try: 245 self.on_event(i) 246 except Exception as e: 247 _logger.exception("Unexpected exception from event callback.") 248 _thread.interrupt_main() 249 if items["items_available"] > len(items["items"]): 250 moreitems = True 251 if not moreitems: 252 self._closing.wait(self.poll_time)
253
254 - def run_forever(self):
255 # Have to poll here, otherwise KeyboardInterrupt will never get processed. 256 while not self._closing.is_set(): 257 self._closing.wait(1)
258
259 - def close(self, code=None, reason=None, timeout=0):
260 """Close poll client and optionally wait for it to finish. 261 262 If an :on_event: handler is running in a different thread, 263 first wait (indefinitely) for it to return. 264 265 After closing, wait up to :timeout: seconds for the thread to 266 finish the poll request in progress (if any). 267 268 :code: and :reason: are ignored. They are present for 269 interface compatibility with EventClient. 270 """ 271 272 with self._closing_lock: 273 self._closing.set() 274 try: 275 self.join(timeout=timeout) 276 except RuntimeError: 277 # "join() raises a RuntimeError if an attempt is made to join the 278 # current thread as that would cause a deadlock. It is also an 279 # error to join() a thread before it has been started and attempts 280 # to do so raises the same exception." 281 pass
282
283 - def subscribe(self, f):
284 self.on_event({'status': 200}) 285 self.filters.append(f)
286
287 - def unsubscribe(self, f):
288 del self.filters[self.filters.index(f)]
289 290
291 -def _subscribe_websocket(api, filters, on_event, last_log_id=None):
292 endpoint = api._rootDesc.get('websocketUrl', None) 293 if not endpoint: 294 raise errors.FeatureNotEnabledError( 295 "Server does not advertise a websocket endpoint") 296 uri_with_token = "{}?api_token={}".format(endpoint, api.api_token) 297 try: 298 client = EventClient(uri_with_token, filters, on_event, last_log_id) 299 except Exception: 300 _logger.warning("Failed to connect to websockets on %s" % endpoint) 301 raise 302 else: 303 return client
304 305
306 -def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
307 """ 308 :api: 309 a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe(). 310 :filters: 311 Initial subscription filters. 312 :on_event: 313 The callback when a message is received. 314 :poll_fallback: 315 If websockets are not available, fall back to polling every N seconds. If poll_fallback=False, this will return None if websockets are not available. 316 :last_log_id: 317 Log rows that are newer than the log id 318 """ 319 320 if not poll_fallback: 321 return _subscribe_websocket(api, filters, on_event, last_log_id) 322 323 try: 324 if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'): 325 return _subscribe_websocket(api, filters, on_event, last_log_id) 326 else: 327 _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true") 328 except Exception as e: 329 _logger.warning("Falling back to polling after websocket error: %s" % e) 330 p = PollClient(api, filters, on_event, poll_fallback, last_log_id) 331 p.start() 332 return p
333