arvados.events
Follow events on an Arvados cluster
This module provides different ways to get notified about events that happen on an Arvados cluster. You indicate which events you want updates about, and provide a function that is called any time one of those events is received from the server.
subscribe
is the main entry point. It helps you construct one of the two
API-compatible client classes: EventClient
(which uses WebSockets) or
PollClient
(which periodically queries the logs list methods).
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4"""Follow events on an Arvados cluster 5 6This module provides different ways to get notified about events that happen 7on an Arvados cluster. You indicate which events you want updates about, and 8provide a function that is called any time one of those events is received 9from the server. 10 11`subscribe` is the main entry point. It helps you construct one of the two 12API-compatible client classes: `EventClient` (which uses WebSockets) or 13`PollClient` (which periodically queries the logs list methods). 14""" 15 16import enum 17import json 18import logging 19import os 20import re 21import ssl 22import sys 23import _thread 24import threading 25import time 26 27import websockets.exceptions as ws_exc 28import websockets.sync.client as ws_client 29 30from . import config 31from . import errors 32from . import util 33from .retry import RetryLoop 34from ._version import __version__ 35 36from typing import ( 37 Any, 38 Callable, 39 Dict, 40 Iterable, 41 List, 42 Optional, 43 Union, 44) 45 46EventCallback = Callable[[Dict[str, Any]], object] 47"""Type signature for an event handler callback""" 48FilterCondition = List[Union[None, str, 'Filter']] 49"""Type signature for a single filter condition""" 50Filter = List[FilterCondition] 51"""Type signature for an entire filter""" 52 53_logger = logging.getLogger('arvados.events') 54 55class WSMethod(enum.Enum): 56 """Arvados WebSocket methods 57 58 This enum represents valid values for the `method` field in messages 59 sent to an Arvados WebSocket server. 60 """ 61 SUBSCRIBE = 'subscribe' 62 SUB = SUBSCRIBE 63 UNSUBSCRIBE = 'unsubscribe' 64 UNSUB = UNSUBSCRIBE 65 66 67class EventClient(threading.Thread): 68 """Follow Arvados events via WebSocket 69 70 EventClient follows events on Arvados cluster published by the WebSocket 71 server. Users can select the events they want to follow and run their own 72 callback function on each. 73 """ 74 _USER_AGENT = 'Python/{}.{}.{} arvados.events/{}'.format( 75 *sys.version_info[:3], 76 __version__, 77 ) 78 79 def __init__( 80 self, 81 url: str, 82 filters: Optional[Filter], 83 on_event_cb: EventCallback, 84 last_log_id: Optional[int]=None, 85 *, 86 insecure: Optional[bool]=None, 87 ) -> None: 88 """Initialize a WebSocket client 89 90 Constructor arguments: 91 92 * url: str --- The `wss` URL for an Arvados WebSocket server. 93 94 * filters: arvados.events.Filter | None --- One event filter to 95 subscribe to after connecting to the WebSocket server. If not 96 specified, the client will subscribe to all events. 97 98 * on_event_cb: arvados.events.EventCallback --- When the client 99 receives an event from the WebSocket server, it calls this 100 function with the event object. 101 102 * last_log_id: int | None --- If specified, this will be used as the 103 value for the `last_log_id` field in subscribe messages sent by 104 the client. 105 106 Constructor keyword arguments: 107 108 * insecure: bool | None --- If `True`, the client will not check the 109 validity of the server's TLS certificate. If not specified, uses 110 the value from the user's `ARVADOS_API_HOST_INSECURE` setting. 111 """ 112 self.url = url 113 self.filters = [filters or []] 114 self.on_event_cb = on_event_cb 115 self.last_log_id = last_log_id 116 self.is_closed = threading.Event() 117 self._ssl_ctx = ssl.create_default_context( 118 purpose=ssl.Purpose.SERVER_AUTH, 119 cafile=util.ca_certs_path(), 120 ) 121 if insecure is None: 122 insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 123 if insecure: 124 self._ssl_ctx.check_hostname = False 125 self._ssl_ctx.verify_mode = ssl.CERT_NONE 126 self._subscribe_lock = threading.Lock() 127 self._connect() 128 super().__init__(daemon=True) 129 self.start() 130 131 def _connect(self) -> None: 132 # There are no locks protecting this method. After the thread starts, 133 # it should only be called from inside. 134 self._client = ws_client.connect( 135 self.url, 136 logger=_logger, 137 ssl_context=self._ssl_ctx, 138 user_agent_header=self._USER_AGENT, 139 ) 140 self._client_ok = True 141 142 def _subscribe(self, f: Filter, last_log_id: Optional[int]) -> None: 143 extra = {} 144 if last_log_id is not None: 145 extra['last_log_id'] = last_log_id 146 return self._update_sub(WSMethod.SUBSCRIBE, f, **extra) 147 148 def _update_sub(self, method: WSMethod, f: Filter, **extra: Any) -> None: 149 msg = json.dumps({ 150 'method': method.value, 151 'filters': f, 152 **extra, 153 }) 154 self._client.send(msg) 155 156 def close(self, code: int=1000, reason: str='', timeout: float=0) -> None: 157 """Close the WebSocket connection and stop processing events 158 159 Arguments: 160 161 * code: int --- The WebSocket close code sent to the server when 162 disconnecting. Default 1000. 163 164 * reason: str --- The WebSocket close reason sent to the server when 165 disconnecting. Default is an empty string. 166 167 * timeout: float --- How long to wait for the WebSocket server to 168 acknowledge the disconnection, in seconds. Default 0, which means 169 no timeout. 170 """ 171 self.is_closed.set() 172 self._client.close_timeout = timeout or None 173 self._client.close(code, reason) 174 175 def run_forever(self) -> None: 176 """Run the WebSocket client indefinitely 177 178 This method blocks until the `close` method is called (e.g., from 179 another thread) or the client permanently loses its connection. 180 """ 181 # Have to poll here to let KeyboardInterrupt get raised. 182 while not self.is_closed.wait(1): 183 pass 184 185 def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None: 186 """Subscribe to another set of events from the server 187 188 Arguments: 189 190 * f: arvados.events.Filter | None --- One filter to subscribe to 191 events for. 192 193 * last_log_id: int | None --- If specified, request events starting 194 from this id. If not specified, the server will only send events 195 that occur after processing the subscription. 196 """ 197 with self._subscribe_lock: 198 self._subscribe(f, last_log_id) 199 self.filters.append(f) 200 201 def unsubscribe(self, f: Filter) -> None: 202 """Unsubscribe from an event stream 203 204 Arguments: 205 206 * f: arvados.events.Filter | None --- One event filter to stop 207 receiving events for. 208 """ 209 with self._subscribe_lock: 210 try: 211 index = self.filters.index(f) 212 except ValueError: 213 raise ValueError(f"filter not subscribed: {f!r}") from None 214 self._update_sub(WSMethod.UNSUBSCRIBE, f) 215 del self.filters[index] 216 217 def on_closed(self) -> None: 218 """Handle disconnection from the WebSocket server 219 220 This method is called when the client loses its connection from 221 receiving events. This implementation tries to establish a new 222 connection if it was not closed client-side. 223 """ 224 if self.is_closed.is_set(): 225 return 226 _logger.warning("Unexpected close. Reconnecting.") 227 for _ in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15): 228 try: 229 self._connect() 230 except Exception as e: 231 _logger.warning("Error '%s' during websocket reconnect.", e) 232 else: 233 _logger.warning("Reconnect successful.") 234 break 235 else: 236 _logger.error("EventClient thread could not contact websocket server.") 237 self.is_closed.set() 238 _thread.interrupt_main() 239 240 def on_event(self, m: Dict[str, Any]) -> None: 241 """Handle an event from the WebSocket server 242 243 This method is called whenever the client receives an event from the 244 server. This implementation records the `id` field internally, then 245 calls the callback function provided at initialization time. 246 247 Arguments: 248 249 * m: Dict[str, Any] --- The event object, deserialized from JSON. 250 """ 251 try: 252 self.last_log_id = m['id'] 253 except KeyError: 254 pass 255 try: 256 self.on_event_cb(m) 257 except Exception: 258 _logger.exception("Unexpected exception from event callback.") 259 _thread.interrupt_main() 260 261 def run(self) -> None: 262 """Run the client loop 263 264 This method runs in a separate thread to receive and process events 265 from the server. 266 """ 267 self.name = f'ArvadosWebsockets-{self.ident}' 268 while self._client_ok and not self.is_closed.is_set(): 269 try: 270 with self._subscribe_lock: 271 for f in self.filters: 272 self._subscribe(f, self.last_log_id) 273 for msg_s in self._client: 274 if not self.is_closed.is_set(): 275 msg = json.loads(msg_s) 276 self.on_event(msg) 277 except ws_exc.ConnectionClosed: 278 self._client_ok = False 279 self.on_closed() 280 281 282class PollClient(threading.Thread): 283 """Follow Arvados events via polling logs 284 285 PollClient follows events on Arvados cluster by periodically running 286 logs list API calls. Users can select the events they want to follow and 287 run their own callback function on each. 288 """ 289 def __init__( 290 self, 291 api: 'arvados.api_resources.ArvadosAPIClient', 292 filters: Optional[Filter], 293 on_event: EventCallback, 294 poll_time: float=15, 295 last_log_id: Optional[int]=None, 296 ) -> None: 297 """Initialize a polling client 298 299 Constructor arguments: 300 301 * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API 302 client used to query logs. It will be used in a separate thread, 303 so if it is not an instance of `arvados.api.ThreadSafeAPIClient` 304 it should not be reused after the thread is started. 305 306 * filters: arvados.events.Filter | None --- One event filter to 307 subscribe to after connecting to the WebSocket server. If not 308 specified, the client will subscribe to all events. 309 310 * on_event: arvados.events.EventCallback --- When the client 311 receives an event from the WebSocket server, it calls this 312 function with the event object. 313 314 * poll_time: float --- The number of seconds to wait between querying 315 logs. Default 15. 316 317 * last_log_id: int | None --- If specified, queries will include a 318 filter for logs with an `id` at least this value. 319 """ 320 super(PollClient, self).__init__() 321 self.api = api 322 if filters: 323 self.filters = [filters] 324 else: 325 self.filters = [[]] 326 self.on_event = on_event 327 self.poll_time = poll_time 328 self.daemon = True 329 self.last_log_id = last_log_id 330 self._closing = threading.Event() 331 self._closing_lock = threading.RLock() 332 333 if self.last_log_id != None: 334 # Caller supplied the last-seen event ID from a previous 335 # connection. 336 self._skip_old_events = [["id", ">", str(self.last_log_id)]] 337 else: 338 # We need to do a reverse-order query to find the most 339 # recent event ID (see "if not self._skip_old_events" 340 # in run()). 341 self._skip_old_events = False 342 343 def run(self): 344 """Run the client loop 345 346 This method runs in a separate thread to poll and process events 347 from the server. 348 """ 349 self.on_event({'status': 200}) 350 351 while not self._closing.is_set(): 352 moreitems = False 353 for f in self.filters: 354 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time): 355 try: 356 if not self._skip_old_events: 357 # If the caller didn't provide a known 358 # recent ID, our first request will ask 359 # for the single most recent event from 360 # the last 2 hours (the time restriction 361 # avoids doing an expensive database 362 # query, and leaves a big enough margin to 363 # account for clock skew). If we do find a 364 # recent event, we remember its ID but 365 # then discard it (we are supposed to be 366 # returning new/current events, not old 367 # ones). 368 # 369 # Subsequent requests will get multiple 370 # events in chronological order, and 371 # filter on that same cutoff time, or 372 # (once we see our first matching event) 373 # the ID of the last-seen event. 374 # 375 # Note: self._skip_old_events must not be 376 # set until the threshold is decided. 377 # Otherwise, tests will be unreliable. 378 filter_by_time = [[ 379 "created_at", ">=", 380 time.strftime( 381 "%Y-%m-%dT%H:%M:%SZ", 382 time.gmtime(time.time()-7200))]] 383 items = self.api.logs().list( 384 order="id desc", 385 limit=1, 386 filters=f+filter_by_time).execute() 387 if items["items"]: 388 self._skip_old_events = [ 389 ["id", ">", str(items["items"][0]["id"])]] 390 items = { 391 "items": [], 392 "items_available": 0, 393 } 394 else: 395 # No recent events. We can keep using 396 # the same timestamp threshold until 397 # we receive our first new event. 398 self._skip_old_events = filter_by_time 399 else: 400 # In this case, either we know the most 401 # recent matching ID, or we know there 402 # were no matching events in the 2-hour 403 # window before subscribing. Either way we 404 # can safely ask for events in ascending 405 # order. 406 items = self.api.logs().list( 407 order="id asc", 408 filters=f+self._skip_old_events).execute() 409 break 410 except errors.ApiError as error: 411 pass 412 else: 413 tries_left = 0 414 break 415 if tries_left == 0: 416 _logger.exception("PollClient thread could not contact API server.") 417 with self._closing_lock: 418 self._closing.set() 419 _thread.interrupt_main() 420 return 421 for i in items["items"]: 422 self._skip_old_events = [["id", ">", str(i["id"])]] 423 with self._closing_lock: 424 if self._closing.is_set(): 425 return 426 try: 427 self.on_event(i) 428 except Exception as e: 429 _logger.exception("Unexpected exception from event callback.") 430 _thread.interrupt_main() 431 if items["items_available"] > len(items["items"]): 432 moreitems = True 433 if not moreitems: 434 self._closing.wait(self.poll_time) 435 436 def run_forever(self): 437 """Run the polling client indefinitely 438 439 This method blocks until the `close` method is called (e.g., from 440 another thread) or the client permanently loses its connection. 441 """ 442 # Have to poll here, otherwise KeyboardInterrupt will never get processed. 443 while not self._closing.is_set(): 444 self._closing.wait(1) 445 446 def close(self, code: Optional[int]=None, reason: Optional[str]=None, timeout: float=0) -> None: 447 """Stop polling and processing events 448 449 Arguments: 450 451 * code: Optional[int] --- Ignored; this argument exists for API 452 compatibility with `EventClient.close`. 453 454 * reason: Optional[str] --- Ignored; this argument exists for API 455 compatibility with `EventClient.close`. 456 457 * timeout: float --- How long to wait for the client thread to finish 458 processing events. Default 0, which means no timeout. 459 """ 460 with self._closing_lock: 461 self._closing.set() 462 try: 463 self.join(timeout=timeout) 464 except RuntimeError: 465 # "join() raises a RuntimeError if an attempt is made to join the 466 # current thread as that would cause a deadlock. It is also an 467 # error to join() a thread before it has been started and attempts 468 # to do so raises the same exception." 469 pass 470 471 def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None: 472 """Subscribe to another set of events from the server 473 474 Arguments: 475 476 * f: arvados.events.Filter | None --- One filter to subscribe to. 477 478 * last_log_id: Optional[int] --- Ignored; this argument exists for 479 API compatibility with `EventClient.subscribe`. 480 """ 481 self.on_event({'status': 200}) 482 self.filters.append(f) 483 484 def unsubscribe(self, f): 485 """Unsubscribe from an event stream 486 487 Arguments: 488 489 * f: arvados.events.Filter | None --- One event filter to stop 490 receiving events for. 491 """ 492 del self.filters[self.filters.index(f)] 493 494 495def _subscribe_websocket(api, filters, on_event, last_log_id=None): 496 endpoint = api._rootDesc.get('websocketUrl', None) 497 if not endpoint: 498 raise errors.FeatureNotEnabledError( 499 "Server does not advertise a websocket endpoint") 500 uri_with_token = "{}?api_token={}".format(endpoint, api.api_token) 501 try: 502 client = EventClient(uri_with_token, filters, on_event, last_log_id) 503 except Exception: 504 _logger.warning("Failed to connect to websockets on %s" % endpoint) 505 raise 506 else: 507 return client 508 509def subscribe( 510 api: 'arvados.api_resources.ArvadosAPIClient', 511 filters: Optional[Filter], 512 on_event: EventCallback, 513 poll_fallback: float=15, 514 last_log_id: Optional[int]=None, 515) -> Union[EventClient, PollClient]: 516 """Start a thread to monitor events 517 518 This method tries to construct an `EventClient` to process Arvados 519 events via WebSockets. If that fails, or the 520 `ARVADOS_DISABLE_WEBSOCKETS` flag is set in user configuration, it falls 521 back to constructing a `PollClient` to process the events via API 522 polling. 523 524 Arguments: 525 526 * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API 527 client used to query logs. It may be used in a separate thread, 528 so if it is not an instance of `arvados.api.ThreadSafeAPIClient` 529 it should not be reused after this method returns. 530 531 * filters: arvados.events.Filter | None --- One event filter to 532 subscribe to after initializing the client. If not specified, the 533 client will subscribe to all events. 534 535 * on_event: arvados.events.EventCallback --- When the client receives an 536 event, it calls this function with the event object. 537 538 * poll_time: float --- The number of seconds to wait between querying 539 logs. If 0, this function will refuse to construct a `PollClient`. 540 Default 15. 541 542 * last_log_id: int | None --- If specified, start processing events with 543 at least this `id` value. 544 """ 545 if not poll_fallback: 546 return _subscribe_websocket(api, filters, on_event, last_log_id) 547 548 try: 549 if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'): 550 return _subscribe_websocket(api, filters, on_event, last_log_id) 551 else: 552 _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true") 553 except Exception as e: 554 _logger.warning("Falling back to polling after websocket error: %s" % e) 555 p = PollClient(api, filters, on_event, poll_fallback, last_log_id) 556 p.start() 557 return p
Type signature for an event handler callback
Type signature for a single filter condition
Type signature for an entire filter
56class WSMethod(enum.Enum): 57 """Arvados WebSocket methods 58 59 This enum represents valid values for the `method` field in messages 60 sent to an Arvados WebSocket server. 61 """ 62 SUBSCRIBE = 'subscribe' 63 SUB = SUBSCRIBE 64 UNSUBSCRIBE = 'unsubscribe' 65 UNSUB = UNSUBSCRIBE
Arvados WebSocket methods
This enum represents valid values for the method
field in messages
sent to an Arvados WebSocket server.
Inherited Members
- enum.Enum
- name
- value
68class EventClient(threading.Thread): 69 """Follow Arvados events via WebSocket 70 71 EventClient follows events on Arvados cluster published by the WebSocket 72 server. Users can select the events they want to follow and run their own 73 callback function on each. 74 """ 75 _USER_AGENT = 'Python/{}.{}.{} arvados.events/{}'.format( 76 *sys.version_info[:3], 77 __version__, 78 ) 79 80 def __init__( 81 self, 82 url: str, 83 filters: Optional[Filter], 84 on_event_cb: EventCallback, 85 last_log_id: Optional[int]=None, 86 *, 87 insecure: Optional[bool]=None, 88 ) -> None: 89 """Initialize a WebSocket client 90 91 Constructor arguments: 92 93 * url: str --- The `wss` URL for an Arvados WebSocket server. 94 95 * filters: arvados.events.Filter | None --- One event filter to 96 subscribe to after connecting to the WebSocket server. If not 97 specified, the client will subscribe to all events. 98 99 * on_event_cb: arvados.events.EventCallback --- When the client 100 receives an event from the WebSocket server, it calls this 101 function with the event object. 102 103 * last_log_id: int | None --- If specified, this will be used as the 104 value for the `last_log_id` field in subscribe messages sent by 105 the client. 106 107 Constructor keyword arguments: 108 109 * insecure: bool | None --- If `True`, the client will not check the 110 validity of the server's TLS certificate. If not specified, uses 111 the value from the user's `ARVADOS_API_HOST_INSECURE` setting. 112 """ 113 self.url = url 114 self.filters = [filters or []] 115 self.on_event_cb = on_event_cb 116 self.last_log_id = last_log_id 117 self.is_closed = threading.Event() 118 self._ssl_ctx = ssl.create_default_context( 119 purpose=ssl.Purpose.SERVER_AUTH, 120 cafile=util.ca_certs_path(), 121 ) 122 if insecure is None: 123 insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 124 if insecure: 125 self._ssl_ctx.check_hostname = False 126 self._ssl_ctx.verify_mode = ssl.CERT_NONE 127 self._subscribe_lock = threading.Lock() 128 self._connect() 129 super().__init__(daemon=True) 130 self.start() 131 132 def _connect(self) -> None: 133 # There are no locks protecting this method. After the thread starts, 134 # it should only be called from inside. 135 self._client = ws_client.connect( 136 self.url, 137 logger=_logger, 138 ssl_context=self._ssl_ctx, 139 user_agent_header=self._USER_AGENT, 140 ) 141 self._client_ok = True 142 143 def _subscribe(self, f: Filter, last_log_id: Optional[int]) -> None: 144 extra = {} 145 if last_log_id is not None: 146 extra['last_log_id'] = last_log_id 147 return self._update_sub(WSMethod.SUBSCRIBE, f, **extra) 148 149 def _update_sub(self, method: WSMethod, f: Filter, **extra: Any) -> None: 150 msg = json.dumps({ 151 'method': method.value, 152 'filters': f, 153 **extra, 154 }) 155 self._client.send(msg) 156 157 def close(self, code: int=1000, reason: str='', timeout: float=0) -> None: 158 """Close the WebSocket connection and stop processing events 159 160 Arguments: 161 162 * code: int --- The WebSocket close code sent to the server when 163 disconnecting. Default 1000. 164 165 * reason: str --- The WebSocket close reason sent to the server when 166 disconnecting. Default is an empty string. 167 168 * timeout: float --- How long to wait for the WebSocket server to 169 acknowledge the disconnection, in seconds. Default 0, which means 170 no timeout. 171 """ 172 self.is_closed.set() 173 self._client.close_timeout = timeout or None 174 self._client.close(code, reason) 175 176 def run_forever(self) -> None: 177 """Run the WebSocket client indefinitely 178 179 This method blocks until the `close` method is called (e.g., from 180 another thread) or the client permanently loses its connection. 181 """ 182 # Have to poll here to let KeyboardInterrupt get raised. 183 while not self.is_closed.wait(1): 184 pass 185 186 def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None: 187 """Subscribe to another set of events from the server 188 189 Arguments: 190 191 * f: arvados.events.Filter | None --- One filter to subscribe to 192 events for. 193 194 * last_log_id: int | None --- If specified, request events starting 195 from this id. If not specified, the server will only send events 196 that occur after processing the subscription. 197 """ 198 with self._subscribe_lock: 199 self._subscribe(f, last_log_id) 200 self.filters.append(f) 201 202 def unsubscribe(self, f: Filter) -> None: 203 """Unsubscribe from an event stream 204 205 Arguments: 206 207 * f: arvados.events.Filter | None --- One event filter to stop 208 receiving events for. 209 """ 210 with self._subscribe_lock: 211 try: 212 index = self.filters.index(f) 213 except ValueError: 214 raise ValueError(f"filter not subscribed: {f!r}") from None 215 self._update_sub(WSMethod.UNSUBSCRIBE, f) 216 del self.filters[index] 217 218 def on_closed(self) -> None: 219 """Handle disconnection from the WebSocket server 220 221 This method is called when the client loses its connection from 222 receiving events. This implementation tries to establish a new 223 connection if it was not closed client-side. 224 """ 225 if self.is_closed.is_set(): 226 return 227 _logger.warning("Unexpected close. Reconnecting.") 228 for _ in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15): 229 try: 230 self._connect() 231 except Exception as e: 232 _logger.warning("Error '%s' during websocket reconnect.", e) 233 else: 234 _logger.warning("Reconnect successful.") 235 break 236 else: 237 _logger.error("EventClient thread could not contact websocket server.") 238 self.is_closed.set() 239 _thread.interrupt_main() 240 241 def on_event(self, m: Dict[str, Any]) -> None: 242 """Handle an event from the WebSocket server 243 244 This method is called whenever the client receives an event from the 245 server. This implementation records the `id` field internally, then 246 calls the callback function provided at initialization time. 247 248 Arguments: 249 250 * m: Dict[str, Any] --- The event object, deserialized from JSON. 251 """ 252 try: 253 self.last_log_id = m['id'] 254 except KeyError: 255 pass 256 try: 257 self.on_event_cb(m) 258 except Exception: 259 _logger.exception("Unexpected exception from event callback.") 260 _thread.interrupt_main() 261 262 def run(self) -> None: 263 """Run the client loop 264 265 This method runs in a separate thread to receive and process events 266 from the server. 267 """ 268 self.name = f'ArvadosWebsockets-{self.ident}' 269 while self._client_ok and not self.is_closed.is_set(): 270 try: 271 with self._subscribe_lock: 272 for f in self.filters: 273 self._subscribe(f, self.last_log_id) 274 for msg_s in self._client: 275 if not self.is_closed.is_set(): 276 msg = json.loads(msg_s) 277 self.on_event(msg) 278 except ws_exc.ConnectionClosed: 279 self._client_ok = False 280 self.on_closed()
Follow Arvados events via WebSocket
EventClient follows events on Arvados cluster published by the WebSocket server. Users can select the events they want to follow and run their own callback function on each.
80 def __init__( 81 self, 82 url: str, 83 filters: Optional[Filter], 84 on_event_cb: EventCallback, 85 last_log_id: Optional[int]=None, 86 *, 87 insecure: Optional[bool]=None, 88 ) -> None: 89 """Initialize a WebSocket client 90 91 Constructor arguments: 92 93 * url: str --- The `wss` URL for an Arvados WebSocket server. 94 95 * filters: arvados.events.Filter | None --- One event filter to 96 subscribe to after connecting to the WebSocket server. If not 97 specified, the client will subscribe to all events. 98 99 * on_event_cb: arvados.events.EventCallback --- When the client 100 receives an event from the WebSocket server, it calls this 101 function with the event object. 102 103 * last_log_id: int | None --- If specified, this will be used as the 104 value for the `last_log_id` field in subscribe messages sent by 105 the client. 106 107 Constructor keyword arguments: 108 109 * insecure: bool | None --- If `True`, the client will not check the 110 validity of the server's TLS certificate. If not specified, uses 111 the value from the user's `ARVADOS_API_HOST_INSECURE` setting. 112 """ 113 self.url = url 114 self.filters = [filters or []] 115 self.on_event_cb = on_event_cb 116 self.last_log_id = last_log_id 117 self.is_closed = threading.Event() 118 self._ssl_ctx = ssl.create_default_context( 119 purpose=ssl.Purpose.SERVER_AUTH, 120 cafile=util.ca_certs_path(), 121 ) 122 if insecure is None: 123 insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 124 if insecure: 125 self._ssl_ctx.check_hostname = False 126 self._ssl_ctx.verify_mode = ssl.CERT_NONE 127 self._subscribe_lock = threading.Lock() 128 self._connect() 129 super().__init__(daemon=True) 130 self.start()
Initialize a WebSocket client
Constructor arguments:
url: str — The
wss
URL for an Arvados WebSocket server.filters: arvados.events.Filter | None — One event filter to subscribe to after connecting to the WebSocket server. If not specified, the client will subscribe to all events.
on_event_cb: arvados.events.EventCallback — When the client receives an event from the WebSocket server, it calls this function with the event object.
last_log_id: int | None — If specified, this will be used as the value for the
last_log_id
field in subscribe messages sent by the client.
Constructor keyword arguments:
- insecure: bool | None — If
True
, the client will not check the validity of the server’s TLS certificate. If not specified, uses the value from the user’sARVADOS_API_HOST_INSECURE
setting.
157 def close(self, code: int=1000, reason: str='', timeout: float=0) -> None: 158 """Close the WebSocket connection and stop processing events 159 160 Arguments: 161 162 * code: int --- The WebSocket close code sent to the server when 163 disconnecting. Default 1000. 164 165 * reason: str --- The WebSocket close reason sent to the server when 166 disconnecting. Default is an empty string. 167 168 * timeout: float --- How long to wait for the WebSocket server to 169 acknowledge the disconnection, in seconds. Default 0, which means 170 no timeout. 171 """ 172 self.is_closed.set() 173 self._client.close_timeout = timeout or None 174 self._client.close(code, reason)
Close the WebSocket connection and stop processing events
Arguments:
code: int — The WebSocket close code sent to the server when disconnecting. Default 1000.
reason: str — The WebSocket close reason sent to the server when disconnecting. Default is an empty string.
timeout: float — How long to wait for the WebSocket server to acknowledge the disconnection, in seconds. Default 0, which means no timeout.
176 def run_forever(self) -> None: 177 """Run the WebSocket client indefinitely 178 179 This method blocks until the `close` method is called (e.g., from 180 another thread) or the client permanently loses its connection. 181 """ 182 # Have to poll here to let KeyboardInterrupt get raised. 183 while not self.is_closed.wait(1): 184 pass
Run the WebSocket client indefinitely
This method blocks until the close
method is called (e.g., from
another thread) or the client permanently loses its connection.
186 def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None: 187 """Subscribe to another set of events from the server 188 189 Arguments: 190 191 * f: arvados.events.Filter | None --- One filter to subscribe to 192 events for. 193 194 * last_log_id: int | None --- If specified, request events starting 195 from this id. If not specified, the server will only send events 196 that occur after processing the subscription. 197 """ 198 with self._subscribe_lock: 199 self._subscribe(f, last_log_id) 200 self.filters.append(f)
Subscribe to another set of events from the server
Arguments:
f: arvados.events.Filter | None — One filter to subscribe to events for.
last_log_id: int | None — If specified, request events starting from this id. If not specified, the server will only send events that occur after processing the subscription.
202 def unsubscribe(self, f: Filter) -> None: 203 """Unsubscribe from an event stream 204 205 Arguments: 206 207 * f: arvados.events.Filter | None --- One event filter to stop 208 receiving events for. 209 """ 210 with self._subscribe_lock: 211 try: 212 index = self.filters.index(f) 213 except ValueError: 214 raise ValueError(f"filter not subscribed: {f!r}") from None 215 self._update_sub(WSMethod.UNSUBSCRIBE, f) 216 del self.filters[index]
Unsubscribe from an event stream
Arguments:
- f: arvados.events.Filter | None — One event filter to stop receiving events for.
218 def on_closed(self) -> None: 219 """Handle disconnection from the WebSocket server 220 221 This method is called when the client loses its connection from 222 receiving events. This implementation tries to establish a new 223 connection if it was not closed client-side. 224 """ 225 if self.is_closed.is_set(): 226 return 227 _logger.warning("Unexpected close. Reconnecting.") 228 for _ in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15): 229 try: 230 self._connect() 231 except Exception as e: 232 _logger.warning("Error '%s' during websocket reconnect.", e) 233 else: 234 _logger.warning("Reconnect successful.") 235 break 236 else: 237 _logger.error("EventClient thread could not contact websocket server.") 238 self.is_closed.set() 239 _thread.interrupt_main()
Handle disconnection from the WebSocket server
This method is called when the client loses its connection from receiving events. This implementation tries to establish a new connection if it was not closed client-side.
241 def on_event(self, m: Dict[str, Any]) -> None: 242 """Handle an event from the WebSocket server 243 244 This method is called whenever the client receives an event from the 245 server. This implementation records the `id` field internally, then 246 calls the callback function provided at initialization time. 247 248 Arguments: 249 250 * m: Dict[str, Any] --- The event object, deserialized from JSON. 251 """ 252 try: 253 self.last_log_id = m['id'] 254 except KeyError: 255 pass 256 try: 257 self.on_event_cb(m) 258 except Exception: 259 _logger.exception("Unexpected exception from event callback.") 260 _thread.interrupt_main()
Handle an event from the WebSocket server
This method is called whenever the client receives an event from the
server. This implementation records the id
field internally, then
calls the callback function provided at initialization time.
Arguments:
- m: Dict[str, Any] — The event object, deserialized from JSON.
262 def run(self) -> None: 263 """Run the client loop 264 265 This method runs in a separate thread to receive and process events 266 from the server. 267 """ 268 self.name = f'ArvadosWebsockets-{self.ident}' 269 while self._client_ok and not self.is_closed.is_set(): 270 try: 271 with self._subscribe_lock: 272 for f in self.filters: 273 self._subscribe(f, self.last_log_id) 274 for msg_s in self._client: 275 if not self.is_closed.is_set(): 276 msg = json.loads(msg_s) 277 self.on_event(msg) 278 except ws_exc.ConnectionClosed: 279 self._client_ok = False 280 self.on_closed()
Run the client loop
This method runs in a separate thread to receive and process events from the server.
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- daemon
- isDaemon
- setDaemon
- getName
- setName
- native_id
283class PollClient(threading.Thread): 284 """Follow Arvados events via polling logs 285 286 PollClient follows events on Arvados cluster by periodically running 287 logs list API calls. Users can select the events they want to follow and 288 run their own callback function on each. 289 """ 290 def __init__( 291 self, 292 api: 'arvados.api_resources.ArvadosAPIClient', 293 filters: Optional[Filter], 294 on_event: EventCallback, 295 poll_time: float=15, 296 last_log_id: Optional[int]=None, 297 ) -> None: 298 """Initialize a polling client 299 300 Constructor arguments: 301 302 * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API 303 client used to query logs. It will be used in a separate thread, 304 so if it is not an instance of `arvados.api.ThreadSafeAPIClient` 305 it should not be reused after the thread is started. 306 307 * filters: arvados.events.Filter | None --- One event filter to 308 subscribe to after connecting to the WebSocket server. If not 309 specified, the client will subscribe to all events. 310 311 * on_event: arvados.events.EventCallback --- When the client 312 receives an event from the WebSocket server, it calls this 313 function with the event object. 314 315 * poll_time: float --- The number of seconds to wait between querying 316 logs. Default 15. 317 318 * last_log_id: int | None --- If specified, queries will include a 319 filter for logs with an `id` at least this value. 320 """ 321 super(PollClient, self).__init__() 322 self.api = api 323 if filters: 324 self.filters = [filters] 325 else: 326 self.filters = [[]] 327 self.on_event = on_event 328 self.poll_time = poll_time 329 self.daemon = True 330 self.last_log_id = last_log_id 331 self._closing = threading.Event() 332 self._closing_lock = threading.RLock() 333 334 if self.last_log_id != None: 335 # Caller supplied the last-seen event ID from a previous 336 # connection. 337 self._skip_old_events = [["id", ">", str(self.last_log_id)]] 338 else: 339 # We need to do a reverse-order query to find the most 340 # recent event ID (see "if not self._skip_old_events" 341 # in run()). 342 self._skip_old_events = False 343 344 def run(self): 345 """Run the client loop 346 347 This method runs in a separate thread to poll and process events 348 from the server. 349 """ 350 self.on_event({'status': 200}) 351 352 while not self._closing.is_set(): 353 moreitems = False 354 for f in self.filters: 355 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time): 356 try: 357 if not self._skip_old_events: 358 # If the caller didn't provide a known 359 # recent ID, our first request will ask 360 # for the single most recent event from 361 # the last 2 hours (the time restriction 362 # avoids doing an expensive database 363 # query, and leaves a big enough margin to 364 # account for clock skew). If we do find a 365 # recent event, we remember its ID but 366 # then discard it (we are supposed to be 367 # returning new/current events, not old 368 # ones). 369 # 370 # Subsequent requests will get multiple 371 # events in chronological order, and 372 # filter on that same cutoff time, or 373 # (once we see our first matching event) 374 # the ID of the last-seen event. 375 # 376 # Note: self._skip_old_events must not be 377 # set until the threshold is decided. 378 # Otherwise, tests will be unreliable. 379 filter_by_time = [[ 380 "created_at", ">=", 381 time.strftime( 382 "%Y-%m-%dT%H:%M:%SZ", 383 time.gmtime(time.time()-7200))]] 384 items = self.api.logs().list( 385 order="id desc", 386 limit=1, 387 filters=f+filter_by_time).execute() 388 if items["items"]: 389 self._skip_old_events = [ 390 ["id", ">", str(items["items"][0]["id"])]] 391 items = { 392 "items": [], 393 "items_available": 0, 394 } 395 else: 396 # No recent events. We can keep using 397 # the same timestamp threshold until 398 # we receive our first new event. 399 self._skip_old_events = filter_by_time 400 else: 401 # In this case, either we know the most 402 # recent matching ID, or we know there 403 # were no matching events in the 2-hour 404 # window before subscribing. Either way we 405 # can safely ask for events in ascending 406 # order. 407 items = self.api.logs().list( 408 order="id asc", 409 filters=f+self._skip_old_events).execute() 410 break 411 except errors.ApiError as error: 412 pass 413 else: 414 tries_left = 0 415 break 416 if tries_left == 0: 417 _logger.exception("PollClient thread could not contact API server.") 418 with self._closing_lock: 419 self._closing.set() 420 _thread.interrupt_main() 421 return 422 for i in items["items"]: 423 self._skip_old_events = [["id", ">", str(i["id"])]] 424 with self._closing_lock: 425 if self._closing.is_set(): 426 return 427 try: 428 self.on_event(i) 429 except Exception as e: 430 _logger.exception("Unexpected exception from event callback.") 431 _thread.interrupt_main() 432 if items["items_available"] > len(items["items"]): 433 moreitems = True 434 if not moreitems: 435 self._closing.wait(self.poll_time) 436 437 def run_forever(self): 438 """Run the polling client indefinitely 439 440 This method blocks until the `close` method is called (e.g., from 441 another thread) or the client permanently loses its connection. 442 """ 443 # Have to poll here, otherwise KeyboardInterrupt will never get processed. 444 while not self._closing.is_set(): 445 self._closing.wait(1) 446 447 def close(self, code: Optional[int]=None, reason: Optional[str]=None, timeout: float=0) -> None: 448 """Stop polling and processing events 449 450 Arguments: 451 452 * code: Optional[int] --- Ignored; this argument exists for API 453 compatibility with `EventClient.close`. 454 455 * reason: Optional[str] --- Ignored; this argument exists for API 456 compatibility with `EventClient.close`. 457 458 * timeout: float --- How long to wait for the client thread to finish 459 processing events. Default 0, which means no timeout. 460 """ 461 with self._closing_lock: 462 self._closing.set() 463 try: 464 self.join(timeout=timeout) 465 except RuntimeError: 466 # "join() raises a RuntimeError if an attempt is made to join the 467 # current thread as that would cause a deadlock. It is also an 468 # error to join() a thread before it has been started and attempts 469 # to do so raises the same exception." 470 pass 471 472 def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None: 473 """Subscribe to another set of events from the server 474 475 Arguments: 476 477 * f: arvados.events.Filter | None --- One filter to subscribe to. 478 479 * last_log_id: Optional[int] --- Ignored; this argument exists for 480 API compatibility with `EventClient.subscribe`. 481 """ 482 self.on_event({'status': 200}) 483 self.filters.append(f) 484 485 def unsubscribe(self, f): 486 """Unsubscribe from an event stream 487 488 Arguments: 489 490 * f: arvados.events.Filter | None --- One event filter to stop 491 receiving events for. 492 """ 493 del self.filters[self.filters.index(f)]
Follow Arvados events via polling logs
PollClient follows events on Arvados cluster by periodically running logs list API calls. Users can select the events they want to follow and run their own callback function on each.
290 def __init__( 291 self, 292 api: 'arvados.api_resources.ArvadosAPIClient', 293 filters: Optional[Filter], 294 on_event: EventCallback, 295 poll_time: float=15, 296 last_log_id: Optional[int]=None, 297 ) -> None: 298 """Initialize a polling client 299 300 Constructor arguments: 301 302 * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API 303 client used to query logs. It will be used in a separate thread, 304 so if it is not an instance of `arvados.api.ThreadSafeAPIClient` 305 it should not be reused after the thread is started. 306 307 * filters: arvados.events.Filter | None --- One event filter to 308 subscribe to after connecting to the WebSocket server. If not 309 specified, the client will subscribe to all events. 310 311 * on_event: arvados.events.EventCallback --- When the client 312 receives an event from the WebSocket server, it calls this 313 function with the event object. 314 315 * poll_time: float --- The number of seconds to wait between querying 316 logs. Default 15. 317 318 * last_log_id: int | None --- If specified, queries will include a 319 filter for logs with an `id` at least this value. 320 """ 321 super(PollClient, self).__init__() 322 self.api = api 323 if filters: 324 self.filters = [filters] 325 else: 326 self.filters = [[]] 327 self.on_event = on_event 328 self.poll_time = poll_time 329 self.daemon = True 330 self.last_log_id = last_log_id 331 self._closing = threading.Event() 332 self._closing_lock = threading.RLock() 333 334 if self.last_log_id != None: 335 # Caller supplied the last-seen event ID from a previous 336 # connection. 337 self._skip_old_events = [["id", ">", str(self.last_log_id)]] 338 else: 339 # We need to do a reverse-order query to find the most 340 # recent event ID (see "if not self._skip_old_events" 341 # in run()). 342 self._skip_old_events = False
Initialize a polling client
Constructor arguments:
api: arvados.api_resources.ArvadosAPIClient — The Arvados API client used to query logs. It will be used in a separate thread, so if it is not an instance of
arvados.api.ThreadSafeAPIClient
it should not be reused after the thread is started.filters: arvados.events.Filter | None — One event filter to subscribe to after connecting to the WebSocket server. If not specified, the client will subscribe to all events.
on_event: arvados.events.EventCallback — When the client receives an event from the WebSocket server, it calls this function with the event object.
poll_time: float — The number of seconds to wait between querying logs. Default 15.
last_log_id: int | None — If specified, queries will include a filter for logs with an
id
at least this value.
1108 @property 1109 def daemon(self): 1110 """A boolean value indicating whether this thread is a daemon thread. 1111 1112 This must be set before start() is called, otherwise RuntimeError is 1113 raised. Its initial value is inherited from the creating thread; the 1114 main thread is not a daemon thread and therefore all threads created in 1115 the main thread default to daemon = False. 1116 1117 The entire Python program exits when only daemon threads are left. 1118 1119 """ 1120 assert self._initialized, "Thread.__init__() not called" 1121 return self._daemonic
A boolean value indicating whether this thread is a daemon thread.
This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.
The entire Python program exits when only daemon threads are left.
344 def run(self): 345 """Run the client loop 346 347 This method runs in a separate thread to poll and process events 348 from the server. 349 """ 350 self.on_event({'status': 200}) 351 352 while not self._closing.is_set(): 353 moreitems = False 354 for f in self.filters: 355 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time): 356 try: 357 if not self._skip_old_events: 358 # If the caller didn't provide a known 359 # recent ID, our first request will ask 360 # for the single most recent event from 361 # the last 2 hours (the time restriction 362 # avoids doing an expensive database 363 # query, and leaves a big enough margin to 364 # account for clock skew). If we do find a 365 # recent event, we remember its ID but 366 # then discard it (we are supposed to be 367 # returning new/current events, not old 368 # ones). 369 # 370 # Subsequent requests will get multiple 371 # events in chronological order, and 372 # filter on that same cutoff time, or 373 # (once we see our first matching event) 374 # the ID of the last-seen event. 375 # 376 # Note: self._skip_old_events must not be 377 # set until the threshold is decided. 378 # Otherwise, tests will be unreliable. 379 filter_by_time = [[ 380 "created_at", ">=", 381 time.strftime( 382 "%Y-%m-%dT%H:%M:%SZ", 383 time.gmtime(time.time()-7200))]] 384 items = self.api.logs().list( 385 order="id desc", 386 limit=1, 387 filters=f+filter_by_time).execute() 388 if items["items"]: 389 self._skip_old_events = [ 390 ["id", ">", str(items["items"][0]["id"])]] 391 items = { 392 "items": [], 393 "items_available": 0, 394 } 395 else: 396 # No recent events. We can keep using 397 # the same timestamp threshold until 398 # we receive our first new event. 399 self._skip_old_events = filter_by_time 400 else: 401 # In this case, either we know the most 402 # recent matching ID, or we know there 403 # were no matching events in the 2-hour 404 # window before subscribing. Either way we 405 # can safely ask for events in ascending 406 # order. 407 items = self.api.logs().list( 408 order="id asc", 409 filters=f+self._skip_old_events).execute() 410 break 411 except errors.ApiError as error: 412 pass 413 else: 414 tries_left = 0 415 break 416 if tries_left == 0: 417 _logger.exception("PollClient thread could not contact API server.") 418 with self._closing_lock: 419 self._closing.set() 420 _thread.interrupt_main() 421 return 422 for i in items["items"]: 423 self._skip_old_events = [["id", ">", str(i["id"])]] 424 with self._closing_lock: 425 if self._closing.is_set(): 426 return 427 try: 428 self.on_event(i) 429 except Exception as e: 430 _logger.exception("Unexpected exception from event callback.") 431 _thread.interrupt_main() 432 if items["items_available"] > len(items["items"]): 433 moreitems = True 434 if not moreitems: 435 self._closing.wait(self.poll_time)
Run the client loop
This method runs in a separate thread to poll and process events from the server.
437 def run_forever(self): 438 """Run the polling client indefinitely 439 440 This method blocks until the `close` method is called (e.g., from 441 another thread) or the client permanently loses its connection. 442 """ 443 # Have to poll here, otherwise KeyboardInterrupt will never get processed. 444 while not self._closing.is_set(): 445 self._closing.wait(1)
Run the polling client indefinitely
This method blocks until the close
method is called (e.g., from
another thread) or the client permanently loses its connection.
447 def close(self, code: Optional[int]=None, reason: Optional[str]=None, timeout: float=0) -> None: 448 """Stop polling and processing events 449 450 Arguments: 451 452 * code: Optional[int] --- Ignored; this argument exists for API 453 compatibility with `EventClient.close`. 454 455 * reason: Optional[str] --- Ignored; this argument exists for API 456 compatibility with `EventClient.close`. 457 458 * timeout: float --- How long to wait for the client thread to finish 459 processing events. Default 0, which means no timeout. 460 """ 461 with self._closing_lock: 462 self._closing.set() 463 try: 464 self.join(timeout=timeout) 465 except RuntimeError: 466 # "join() raises a RuntimeError if an attempt is made to join the 467 # current thread as that would cause a deadlock. It is also an 468 # error to join() a thread before it has been started and attempts 469 # to do so raises the same exception." 470 pass
Stop polling and processing events
Arguments:
code: Optional[int] — Ignored; this argument exists for API compatibility with
EventClient.close
.reason: Optional[str] — Ignored; this argument exists for API compatibility with
EventClient.close
.timeout: float — How long to wait for the client thread to finish processing events. Default 0, which means no timeout.
472 def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None: 473 """Subscribe to another set of events from the server 474 475 Arguments: 476 477 * f: arvados.events.Filter | None --- One filter to subscribe to. 478 479 * last_log_id: Optional[int] --- Ignored; this argument exists for 480 API compatibility with `EventClient.subscribe`. 481 """ 482 self.on_event({'status': 200}) 483 self.filters.append(f)
Subscribe to another set of events from the server
Arguments:
f: arvados.events.Filter | None — One filter to subscribe to.
last_log_id: Optional[int] — Ignored; this argument exists for API compatibility with
EventClient.subscribe
.
485 def unsubscribe(self, f): 486 """Unsubscribe from an event stream 487 488 Arguments: 489 490 * f: arvados.events.Filter | None --- One event filter to stop 491 receiving events for. 492 """ 493 del self.filters[self.filters.index(f)]
Unsubscribe from an event stream
Arguments:
- f: arvados.events.Filter | None — One event filter to stop receiving events for.
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- isDaemon
- setDaemon
- getName
- setName
- native_id
510def subscribe( 511 api: 'arvados.api_resources.ArvadosAPIClient', 512 filters: Optional[Filter], 513 on_event: EventCallback, 514 poll_fallback: float=15, 515 last_log_id: Optional[int]=None, 516) -> Union[EventClient, PollClient]: 517 """Start a thread to monitor events 518 519 This method tries to construct an `EventClient` to process Arvados 520 events via WebSockets. If that fails, or the 521 `ARVADOS_DISABLE_WEBSOCKETS` flag is set in user configuration, it falls 522 back to constructing a `PollClient` to process the events via API 523 polling. 524 525 Arguments: 526 527 * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API 528 client used to query logs. It may be used in a separate thread, 529 so if it is not an instance of `arvados.api.ThreadSafeAPIClient` 530 it should not be reused after this method returns. 531 532 * filters: arvados.events.Filter | None --- One event filter to 533 subscribe to after initializing the client. If not specified, the 534 client will subscribe to all events. 535 536 * on_event: arvados.events.EventCallback --- When the client receives an 537 event, it calls this function with the event object. 538 539 * poll_time: float --- The number of seconds to wait between querying 540 logs. If 0, this function will refuse to construct a `PollClient`. 541 Default 15. 542 543 * last_log_id: int | None --- If specified, start processing events with 544 at least this `id` value. 545 """ 546 if not poll_fallback: 547 return _subscribe_websocket(api, filters, on_event, last_log_id) 548 549 try: 550 if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'): 551 return _subscribe_websocket(api, filters, on_event, last_log_id) 552 else: 553 _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true") 554 except Exception as e: 555 _logger.warning("Falling back to polling after websocket error: %s" % e) 556 p = PollClient(api, filters, on_event, poll_fallback, last_log_id) 557 p.start() 558 return p
Start a thread to monitor events
This method tries to construct an EventClient
to process Arvados
events via WebSockets. If that fails, or the
ARVADOS_DISABLE_WEBSOCKETS
flag is set in user configuration, it falls
back to constructing a PollClient
to process the events via API
polling.
Arguments:
api: arvados.api_resources.ArvadosAPIClient — The Arvados API client used to query logs. It may be used in a separate thread, so if it is not an instance of
arvados.api.ThreadSafeAPIClient
it should not be reused after this method returns.filters: arvados.events.Filter | None — One event filter to subscribe to after initializing the client. If not specified, the client will subscribe to all events.
on_event: arvados.events.EventCallback — When the client receives an event, it calls this function with the event object.
poll_time: float — The number of seconds to wait between querying logs. If 0, this function will refuse to construct a
PollClient
. Default 15.last_log_id: int | None — If specified, start processing events with at least this
id
value.