arvados.events

Follow events on an Arvados cluster

This module provides different ways to get notified about events that happen on an Arvados cluster. You indicate which events you want updates about, and provide a function that is called any time one of those events is received from the server.

subscribe is the main entry point. It helps you construct one of the two API-compatible client classes: EventClient (which uses WebSockets) or PollClient (which periodically queries the logs list methods).

  1# Copyright (C) The Arvados Authors. All rights reserved.
  2#
  3# SPDX-License-Identifier: Apache-2.0
  4"""Follow events on an Arvados cluster
  5
  6This module provides different ways to get notified about events that happen
  7on an Arvados cluster. You indicate which events you want updates about, and
  8provide a function that is called any time one of those events is received
  9from the server.
 10
 11`subscribe` is the main entry point. It helps you construct one of the two
 12API-compatible client classes: `EventClient` (which uses WebSockets) or
 13`PollClient` (which periodically queries the logs list methods).
 14"""
 15
 16import enum
 17import json
 18import logging
 19import os
 20import re
 21import ssl
 22import sys
 23import _thread
 24import threading
 25import time
 26
 27import websockets.exceptions as ws_exc
 28import websockets.sync.client as ws_client
 29
 30from . import config
 31from . import errors
 32from . import util
 33from .retry import RetryLoop
 34from ._version import __version__
 35
 36from typing import (
 37    Any,
 38    Callable,
 39    Dict,
 40    Iterable,
 41    List,
 42    Optional,
 43    Union,
 44)
 45
 46EventCallback = Callable[[Dict[str, Any]], object]
 47"""Type signature for an event handler callback"""
 48FilterCondition = List[Union[None, str, 'Filter']]
 49"""Type signature for a single filter condition"""
 50Filter = List[FilterCondition]
 51"""Type signature for an entire filter"""
 52
 53_logger = logging.getLogger('arvados.events')
 54
 55class WSMethod(enum.Enum):
 56    """Arvados WebSocket methods
 57
 58    This enum represents valid values for the `method` field in messages
 59    sent to an Arvados WebSocket server.
 60    """
 61    SUBSCRIBE = 'subscribe'
 62    SUB = SUBSCRIBE
 63    UNSUBSCRIBE = 'unsubscribe'
 64    UNSUB = UNSUBSCRIBE
 65
 66
 67class EventClient(threading.Thread):
 68    """Follow Arvados events via WebSocket
 69
 70    EventClient follows events on Arvados cluster published by the WebSocket
 71    server. Users can select the events they want to follow and run their own
 72    callback function on each.
 73    """
 74    _USER_AGENT = 'Python/{}.{}.{} arvados.events/{}'.format(
 75        *sys.version_info[:3],
 76        __version__,
 77    )
 78
 79    def __init__(
 80            self,
 81            url: str,
 82            filters: Optional[Filter],
 83            on_event_cb: EventCallback,
 84            last_log_id: Optional[int]=None,
 85            *,
 86            insecure: Optional[bool]=None,
 87    ) -> None:
 88        """Initialize a WebSocket client
 89
 90        Constructor arguments:
 91
 92        * url: str --- The `wss` URL for an Arvados WebSocket server.
 93
 94        * filters: arvados.events.Filter | None --- One event filter to
 95          subscribe to after connecting to the WebSocket server. If not
 96          specified, the client will subscribe to all events.
 97
 98        * on_event_cb: arvados.events.EventCallback --- When the client
 99          receives an event from the WebSocket server, it calls this
100          function with the event object.
101
102        * last_log_id: int | None --- If specified, this will be used as the
103          value for the `last_log_id` field in subscribe messages sent by
104          the client.
105
106        Constructor keyword arguments:
107
108        * insecure: bool | None --- If `True`, the client will not check the
109          validity of the server's TLS certificate. If not specified, uses
110          the value from the user's `ARVADOS_API_HOST_INSECURE` setting.
111        """
112        self.url = url
113        self.filters = [filters or []]
114        self.on_event_cb = on_event_cb
115        self.last_log_id = last_log_id
116        self.is_closed = threading.Event()
117        self._ssl_ctx = ssl.create_default_context(
118            purpose=ssl.Purpose.SERVER_AUTH,
119            cafile=util.ca_certs_path(),
120        )
121        if insecure is None:
122            insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
123        if insecure:
124            self._ssl_ctx.check_hostname = False
125            self._ssl_ctx.verify_mode = ssl.CERT_NONE
126        self._subscribe_lock = threading.Lock()
127        self._connect()
128        super().__init__(daemon=True)
129        self.start()
130
131    def _connect(self) -> None:
132        # There are no locks protecting this method. After the thread starts,
133        # it should only be called from inside.
134        self._client = ws_client.connect(
135            self.url,
136            logger=_logger,
137            ssl_context=self._ssl_ctx,
138            user_agent_header=self._USER_AGENT,
139        )
140        self._client_ok = True
141
142    def _subscribe(self, f: Filter, last_log_id: Optional[int]) -> None:
143        extra = {}
144        if last_log_id is not None:
145            extra['last_log_id'] = last_log_id
146        return self._update_sub(WSMethod.SUBSCRIBE, f, **extra)
147
148    def _update_sub(self, method: WSMethod, f: Filter, **extra: Any) -> None:
149        msg = json.dumps({
150            'method': method.value,
151            'filters': f,
152            **extra,
153        })
154        self._client.send(msg)
155
156    def close(self, code: int=1000, reason: str='', timeout: float=0) -> None:
157        """Close the WebSocket connection and stop processing events
158
159        Arguments:
160
161        * code: int --- The WebSocket close code sent to the server when
162          disconnecting. Default 1000.
163
164        * reason: str --- The WebSocket close reason sent to the server when
165          disconnecting. Default is an empty string.
166
167        * timeout: float --- How long to wait for the WebSocket server to
168          acknowledge the disconnection, in seconds. Default 0, which means
169          no timeout.
170        """
171        self.is_closed.set()
172        self._client.close_timeout = timeout or None
173        self._client.close(code, reason)
174
175    def run_forever(self) -> None:
176        """Run the WebSocket client indefinitely
177
178        This method blocks until the `close` method is called (e.g., from
179        another thread) or the client permanently loses its connection.
180        """
181        # Have to poll here to let KeyboardInterrupt get raised.
182        while not self.is_closed.wait(1):
183            pass
184
185    def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
186        """Subscribe to another set of events from the server
187
188        Arguments:
189
190        * f: arvados.events.Filter | None --- One filter to subscribe to
191          events for.
192
193        * last_log_id: int | None --- If specified, request events starting
194          from this id. If not specified, the server will only send events
195          that occur after processing the subscription.
196        """
197        with self._subscribe_lock:
198            self._subscribe(f, last_log_id)
199            self.filters.append(f)
200
201    def unsubscribe(self, f: Filter) -> None:
202        """Unsubscribe from an event stream
203
204        Arguments:
205
206        * f: arvados.events.Filter | None --- One event filter to stop
207        receiving events for.
208        """
209        with self._subscribe_lock:
210            try:
211                index = self.filters.index(f)
212            except ValueError:
213                raise ValueError(f"filter not subscribed: {f!r}") from None
214            self._update_sub(WSMethod.UNSUBSCRIBE, f)
215            del self.filters[index]
216
217    def on_closed(self) -> None:
218        """Handle disconnection from the WebSocket server
219
220        This method is called when the client loses its connection from
221        receiving events. This implementation tries to establish a new
222        connection if it was not closed client-side.
223        """
224        if self.is_closed.is_set():
225            return
226        _logger.warning("Unexpected close. Reconnecting.")
227        for _ in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
228            try:
229                self._connect()
230            except Exception as e:
231                _logger.warning("Error '%s' during websocket reconnect.", e)
232            else:
233                _logger.warning("Reconnect successful.")
234                break
235        else:
236            _logger.error("EventClient thread could not contact websocket server.")
237            self.is_closed.set()
238            _thread.interrupt_main()
239
240    def on_event(self, m: Dict[str, Any]) -> None:
241        """Handle an event from the WebSocket server
242
243        This method is called whenever the client receives an event from the
244        server. This implementation records the `id` field internally, then
245        calls the callback function provided at initialization time.
246
247        Arguments:
248
249        * m: Dict[str, Any] --- The event object, deserialized from JSON.
250        """
251        try:
252            self.last_log_id = m['id']
253        except KeyError:
254            pass
255        try:
256            self.on_event_cb(m)
257        except Exception:
258            _logger.exception("Unexpected exception from event callback.")
259            _thread.interrupt_main()
260
261    def run(self) -> None:
262        """Run the client loop
263
264        This method runs in a separate thread to receive and process events
265        from the server.
266        """
267        self.setName(f'ArvadosWebsockets-{self.ident}')
268        while self._client_ok and not self.is_closed.is_set():
269            try:
270                with self._subscribe_lock:
271                    for f in self.filters:
272                        self._subscribe(f, self.last_log_id)
273                for msg_s in self._client:
274                    if not self.is_closed.is_set():
275                        msg = json.loads(msg_s)
276                        self.on_event(msg)
277            except ws_exc.ConnectionClosed:
278                self._client_ok = False
279                self.on_closed()
280
281
282class PollClient(threading.Thread):
283    """Follow Arvados events via polling logs
284
285    PollClient follows events on Arvados cluster by periodically running
286    logs list API calls. Users can select the events they want to follow and
287    run their own callback function on each.
288    """
289    def __init__(
290            self,
291            api: 'arvados.api_resources.ArvadosAPIClient',
292            filters: Optional[Filter],
293            on_event: EventCallback,
294            poll_time: float=15,
295            last_log_id: Optional[int]=None,
296    ) -> None:
297        """Initialize a polling client
298
299        Constructor arguments:
300
301        * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
302          client used to query logs. It will be used in a separate thread,
303          so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
304          it should not be reused after the thread is started.
305
306        * filters: arvados.events.Filter | None --- One event filter to
307          subscribe to after connecting to the WebSocket server. If not
308          specified, the client will subscribe to all events.
309
310        * on_event: arvados.events.EventCallback --- When the client
311          receives an event from the WebSocket server, it calls this
312          function with the event object.
313
314        * poll_time: float --- The number of seconds to wait between querying
315          logs. Default 15.
316
317        * last_log_id: int | None --- If specified, queries will include a
318          filter for logs with an `id` at least this value.
319        """
320        super(PollClient, self).__init__()
321        self.api = api
322        if filters:
323            self.filters = [filters]
324        else:
325            self.filters = [[]]
326        self.on_event = on_event
327        self.poll_time = poll_time
328        self.daemon = True
329        self.last_log_id = last_log_id
330        self._closing = threading.Event()
331        self._closing_lock = threading.RLock()
332
333        if self.last_log_id != None:
334            # Caller supplied the last-seen event ID from a previous
335            # connection.
336            self._skip_old_events = [["id", ">", str(self.last_log_id)]]
337        else:
338            # We need to do a reverse-order query to find the most
339            # recent event ID (see "if not self._skip_old_events"
340            # in run()).
341            self._skip_old_events = False
342
343    def run(self):
344        """Run the client loop
345
346        This method runs in a separate thread to poll and process events
347        from the server.
348        """
349        self.on_event({'status': 200})
350
351        while not self._closing.is_set():
352            moreitems = False
353            for f in self.filters:
354                for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
355                    try:
356                        if not self._skip_old_events:
357                            # If the caller didn't provide a known
358                            # recent ID, our first request will ask
359                            # for the single most recent event from
360                            # the last 2 hours (the time restriction
361                            # avoids doing an expensive database
362                            # query, and leaves a big enough margin to
363                            # account for clock skew). If we do find a
364                            # recent event, we remember its ID but
365                            # then discard it (we are supposed to be
366                            # returning new/current events, not old
367                            # ones).
368                            #
369                            # Subsequent requests will get multiple
370                            # events in chronological order, and
371                            # filter on that same cutoff time, or
372                            # (once we see our first matching event)
373                            # the ID of the last-seen event.
374                            #
375                            # Note: self._skip_old_events must not be
376                            # set until the threshold is decided.
377                            # Otherwise, tests will be unreliable.
378                            filter_by_time = [[
379                                "created_at", ">=",
380                                time.strftime(
381                                    "%Y-%m-%dT%H:%M:%SZ",
382                                    time.gmtime(time.time()-7200))]]
383                            items = self.api.logs().list(
384                                order="id desc",
385                                limit=1,
386                                filters=f+filter_by_time).execute()
387                            if items["items"]:
388                                self._skip_old_events = [
389                                    ["id", ">", str(items["items"][0]["id"])]]
390                                items = {
391                                    "items": [],
392                                    "items_available": 0,
393                                }
394                            else:
395                                # No recent events. We can keep using
396                                # the same timestamp threshold until
397                                # we receive our first new event.
398                                self._skip_old_events = filter_by_time
399                        else:
400                            # In this case, either we know the most
401                            # recent matching ID, or we know there
402                            # were no matching events in the 2-hour
403                            # window before subscribing. Either way we
404                            # can safely ask for events in ascending
405                            # order.
406                            items = self.api.logs().list(
407                                order="id asc",
408                                filters=f+self._skip_old_events).execute()
409                        break
410                    except errors.ApiError as error:
411                        pass
412                    else:
413                        tries_left = 0
414                        break
415                if tries_left == 0:
416                    _logger.exception("PollClient thread could not contact API server.")
417                    with self._closing_lock:
418                        self._closing.set()
419                    _thread.interrupt_main()
420                    return
421                for i in items["items"]:
422                    self._skip_old_events = [["id", ">", str(i["id"])]]
423                    with self._closing_lock:
424                        if self._closing.is_set():
425                            return
426                        try:
427                            self.on_event(i)
428                        except Exception as e:
429                            _logger.exception("Unexpected exception from event callback.")
430                            _thread.interrupt_main()
431                if items["items_available"] > len(items["items"]):
432                    moreitems = True
433            if not moreitems:
434                self._closing.wait(self.poll_time)
435
436    def run_forever(self):
437        """Run the polling client indefinitely
438
439        This method blocks until the `close` method is called (e.g., from
440        another thread) or the client permanently loses its connection.
441        """
442        # Have to poll here, otherwise KeyboardInterrupt will never get processed.
443        while not self._closing.is_set():
444            self._closing.wait(1)
445
446    def close(self, code: Optional[int]=None, reason: Optional[str]=None, timeout: float=0) -> None:
447        """Stop polling and processing events
448
449        Arguments:
450
451        * code: Optional[int] --- Ignored; this argument exists for API
452          compatibility with `EventClient.close`.
453
454        * reason: Optional[str] --- Ignored; this argument exists for API
455          compatibility with `EventClient.close`.
456
457        * timeout: float --- How long to wait for the client thread to finish
458          processing events. Default 0, which means no timeout.
459        """
460        with self._closing_lock:
461            self._closing.set()
462        try:
463            self.join(timeout=timeout)
464        except RuntimeError:
465            # "join() raises a RuntimeError if an attempt is made to join the
466            # current thread as that would cause a deadlock. It is also an
467            # error to join() a thread before it has been started and attempts
468            # to do so raises the same exception."
469            pass
470
471    def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
472        """Subscribe to another set of events from the server
473
474        Arguments:
475
476        * f: arvados.events.Filter | None --- One filter to subscribe to.
477
478        * last_log_id: Optional[int] --- Ignored; this argument exists for
479          API compatibility with `EventClient.subscribe`.
480        """
481        self.on_event({'status': 200})
482        self.filters.append(f)
483
484    def unsubscribe(self, f):
485        """Unsubscribe from an event stream
486
487        Arguments:
488
489        * f: arvados.events.Filter | None --- One event filter to stop
490        receiving events for.
491        """
492        del self.filters[self.filters.index(f)]
493
494
495def _subscribe_websocket(api, filters, on_event, last_log_id=None):
496    endpoint = api._rootDesc.get('websocketUrl', None)
497    if not endpoint:
498        raise errors.FeatureNotEnabledError(
499            "Server does not advertise a websocket endpoint")
500    uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
501    try:
502        client = EventClient(uri_with_token, filters, on_event, last_log_id)
503    except Exception:
504        _logger.warning("Failed to connect to websockets on %s" % endpoint)
505        raise
506    else:
507        return client
508
509def subscribe(
510        api: 'arvados.api_resources.ArvadosAPIClient',
511        filters: Optional[Filter],
512        on_event: EventCallback,
513        poll_fallback: float=15,
514        last_log_id: Optional[int]=None,
515) -> Union[EventClient, PollClient]:
516    """Start a thread to monitor events
517
518    This method tries to construct an `EventClient` to process Arvados
519    events via WebSockets. If that fails, or the
520    `ARVADOS_DISABLE_WEBSOCKETS` flag is set in user configuration, it falls
521    back to constructing a `PollClient` to process the events via API
522    polling.
523
524    Arguments:
525
526    * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
527      client used to query logs. It may be used in a separate thread,
528      so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
529      it should not be reused after this method returns.
530
531    * filters: arvados.events.Filter | None --- One event filter to
532      subscribe to after initializing the client. If not specified, the
533      client will subscribe to all events.
534
535    * on_event: arvados.events.EventCallback --- When the client receives an
536      event, it calls this function with the event object.
537
538    * poll_time: float --- The number of seconds to wait between querying
539      logs. If 0, this function will refuse to construct a `PollClient`.
540      Default 15.
541
542    * last_log_id: int | None --- If specified, start processing events with
543      at least this `id` value.
544    """
545    if not poll_fallback:
546        return _subscribe_websocket(api, filters, on_event, last_log_id)
547
548    try:
549        if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
550            return _subscribe_websocket(api, filters, on_event, last_log_id)
551        else:
552            _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
553    except Exception as e:
554        _logger.warning("Falling back to polling after websocket error: %s" % e)
555    p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
556    p.start()
557    return p
EventCallback = typing.Callable[[typing.Dict[str, typing.Any]], object]

Type signature for an event handler callback

FilterCondition = typing.List[typing.Union[NoneType, str, ForwardRef('Filter')]]

Type signature for a single filter condition

Filter = typing.List[typing.List[typing.Union[NoneType, str, ForwardRef('Filter')]]]

Type signature for an entire filter

class WSMethod(enum.Enum):
56class WSMethod(enum.Enum):
57    """Arvados WebSocket methods
58
59    This enum represents valid values for the `method` field in messages
60    sent to an Arvados WebSocket server.
61    """
62    SUBSCRIBE = 'subscribe'
63    SUB = SUBSCRIBE
64    UNSUBSCRIBE = 'unsubscribe'
65    UNSUB = UNSUBSCRIBE

Arvados WebSocket methods

This enum represents valid values for the method field in messages sent to an Arvados WebSocket server.

SUBSCRIBE = <WSMethod.SUBSCRIBE: 'subscribe'>
SUB = <WSMethod.SUBSCRIBE: 'subscribe'>
UNSUBSCRIBE = <WSMethod.UNSUBSCRIBE: 'unsubscribe'>
UNSUB = <WSMethod.UNSUBSCRIBE: 'unsubscribe'>
Inherited Members
enum.Enum
name
value
class EventClient(threading.Thread):
 68class EventClient(threading.Thread):
 69    """Follow Arvados events via WebSocket
 70
 71    EventClient follows events on Arvados cluster published by the WebSocket
 72    server. Users can select the events they want to follow and run their own
 73    callback function on each.
 74    """
 75    _USER_AGENT = 'Python/{}.{}.{} arvados.events/{}'.format(
 76        *sys.version_info[:3],
 77        __version__,
 78    )
 79
 80    def __init__(
 81            self,
 82            url: str,
 83            filters: Optional[Filter],
 84            on_event_cb: EventCallback,
 85            last_log_id: Optional[int]=None,
 86            *,
 87            insecure: Optional[bool]=None,
 88    ) -> None:
 89        """Initialize a WebSocket client
 90
 91        Constructor arguments:
 92
 93        * url: str --- The `wss` URL for an Arvados WebSocket server.
 94
 95        * filters: arvados.events.Filter | None --- One event filter to
 96          subscribe to after connecting to the WebSocket server. If not
 97          specified, the client will subscribe to all events.
 98
 99        * on_event_cb: arvados.events.EventCallback --- When the client
100          receives an event from the WebSocket server, it calls this
101          function with the event object.
102
103        * last_log_id: int | None --- If specified, this will be used as the
104          value for the `last_log_id` field in subscribe messages sent by
105          the client.
106
107        Constructor keyword arguments:
108
109        * insecure: bool | None --- If `True`, the client will not check the
110          validity of the server's TLS certificate. If not specified, uses
111          the value from the user's `ARVADOS_API_HOST_INSECURE` setting.
112        """
113        self.url = url
114        self.filters = [filters or []]
115        self.on_event_cb = on_event_cb
116        self.last_log_id = last_log_id
117        self.is_closed = threading.Event()
118        self._ssl_ctx = ssl.create_default_context(
119            purpose=ssl.Purpose.SERVER_AUTH,
120            cafile=util.ca_certs_path(),
121        )
122        if insecure is None:
123            insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
124        if insecure:
125            self._ssl_ctx.check_hostname = False
126            self._ssl_ctx.verify_mode = ssl.CERT_NONE
127        self._subscribe_lock = threading.Lock()
128        self._connect()
129        super().__init__(daemon=True)
130        self.start()
131
132    def _connect(self) -> None:
133        # There are no locks protecting this method. After the thread starts,
134        # it should only be called from inside.
135        self._client = ws_client.connect(
136            self.url,
137            logger=_logger,
138            ssl_context=self._ssl_ctx,
139            user_agent_header=self._USER_AGENT,
140        )
141        self._client_ok = True
142
143    def _subscribe(self, f: Filter, last_log_id: Optional[int]) -> None:
144        extra = {}
145        if last_log_id is not None:
146            extra['last_log_id'] = last_log_id
147        return self._update_sub(WSMethod.SUBSCRIBE, f, **extra)
148
149    def _update_sub(self, method: WSMethod, f: Filter, **extra: Any) -> None:
150        msg = json.dumps({
151            'method': method.value,
152            'filters': f,
153            **extra,
154        })
155        self._client.send(msg)
156
157    def close(self, code: int=1000, reason: str='', timeout: float=0) -> None:
158        """Close the WebSocket connection and stop processing events
159
160        Arguments:
161
162        * code: int --- The WebSocket close code sent to the server when
163          disconnecting. Default 1000.
164
165        * reason: str --- The WebSocket close reason sent to the server when
166          disconnecting. Default is an empty string.
167
168        * timeout: float --- How long to wait for the WebSocket server to
169          acknowledge the disconnection, in seconds. Default 0, which means
170          no timeout.
171        """
172        self.is_closed.set()
173        self._client.close_timeout = timeout or None
174        self._client.close(code, reason)
175
176    def run_forever(self) -> None:
177        """Run the WebSocket client indefinitely
178
179        This method blocks until the `close` method is called (e.g., from
180        another thread) or the client permanently loses its connection.
181        """
182        # Have to poll here to let KeyboardInterrupt get raised.
183        while not self.is_closed.wait(1):
184            pass
185
186    def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
187        """Subscribe to another set of events from the server
188
189        Arguments:
190
191        * f: arvados.events.Filter | None --- One filter to subscribe to
192          events for.
193
194        * last_log_id: int | None --- If specified, request events starting
195          from this id. If not specified, the server will only send events
196          that occur after processing the subscription.
197        """
198        with self._subscribe_lock:
199            self._subscribe(f, last_log_id)
200            self.filters.append(f)
201
202    def unsubscribe(self, f: Filter) -> None:
203        """Unsubscribe from an event stream
204
205        Arguments:
206
207        * f: arvados.events.Filter | None --- One event filter to stop
208        receiving events for.
209        """
210        with self._subscribe_lock:
211            try:
212                index = self.filters.index(f)
213            except ValueError:
214                raise ValueError(f"filter not subscribed: {f!r}") from None
215            self._update_sub(WSMethod.UNSUBSCRIBE, f)
216            del self.filters[index]
217
218    def on_closed(self) -> None:
219        """Handle disconnection from the WebSocket server
220
221        This method is called when the client loses its connection from
222        receiving events. This implementation tries to establish a new
223        connection if it was not closed client-side.
224        """
225        if self.is_closed.is_set():
226            return
227        _logger.warning("Unexpected close. Reconnecting.")
228        for _ in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
229            try:
230                self._connect()
231            except Exception as e:
232                _logger.warning("Error '%s' during websocket reconnect.", e)
233            else:
234                _logger.warning("Reconnect successful.")
235                break
236        else:
237            _logger.error("EventClient thread could not contact websocket server.")
238            self.is_closed.set()
239            _thread.interrupt_main()
240
241    def on_event(self, m: Dict[str, Any]) -> None:
242        """Handle an event from the WebSocket server
243
244        This method is called whenever the client receives an event from the
245        server. This implementation records the `id` field internally, then
246        calls the callback function provided at initialization time.
247
248        Arguments:
249
250        * m: Dict[str, Any] --- The event object, deserialized from JSON.
251        """
252        try:
253            self.last_log_id = m['id']
254        except KeyError:
255            pass
256        try:
257            self.on_event_cb(m)
258        except Exception:
259            _logger.exception("Unexpected exception from event callback.")
260            _thread.interrupt_main()
261
262    def run(self) -> None:
263        """Run the client loop
264
265        This method runs in a separate thread to receive and process events
266        from the server.
267        """
268        self.setName(f'ArvadosWebsockets-{self.ident}')
269        while self._client_ok and not self.is_closed.is_set():
270            try:
271                with self._subscribe_lock:
272                    for f in self.filters:
273                        self._subscribe(f, self.last_log_id)
274                for msg_s in self._client:
275                    if not self.is_closed.is_set():
276                        msg = json.loads(msg_s)
277                        self.on_event(msg)
278            except ws_exc.ConnectionClosed:
279                self._client_ok = False
280                self.on_closed()

Follow Arvados events via WebSocket

EventClient follows events on Arvados cluster published by the WebSocket server. Users can select the events they want to follow and run their own callback function on each.

EventClient( url: str, filters: Optional[List[List[Union[NoneType, str, List[List[Union[NoneType, str, ForwardRef('Filter')]]]]]]], on_event_cb: Callable[[Dict[str, Any]], object], last_log_id: Optional[int] = None, *, insecure: Optional[bool] = None)
 80    def __init__(
 81            self,
 82            url: str,
 83            filters: Optional[Filter],
 84            on_event_cb: EventCallback,
 85            last_log_id: Optional[int]=None,
 86            *,
 87            insecure: Optional[bool]=None,
 88    ) -> None:
 89        """Initialize a WebSocket client
 90
 91        Constructor arguments:
 92
 93        * url: str --- The `wss` URL for an Arvados WebSocket server.
 94
 95        * filters: arvados.events.Filter | None --- One event filter to
 96          subscribe to after connecting to the WebSocket server. If not
 97          specified, the client will subscribe to all events.
 98
 99        * on_event_cb: arvados.events.EventCallback --- When the client
100          receives an event from the WebSocket server, it calls this
101          function with the event object.
102
103        * last_log_id: int | None --- If specified, this will be used as the
104          value for the `last_log_id` field in subscribe messages sent by
105          the client.
106
107        Constructor keyword arguments:
108
109        * insecure: bool | None --- If `True`, the client will not check the
110          validity of the server's TLS certificate. If not specified, uses
111          the value from the user's `ARVADOS_API_HOST_INSECURE` setting.
112        """
113        self.url = url
114        self.filters = [filters or []]
115        self.on_event_cb = on_event_cb
116        self.last_log_id = last_log_id
117        self.is_closed = threading.Event()
118        self._ssl_ctx = ssl.create_default_context(
119            purpose=ssl.Purpose.SERVER_AUTH,
120            cafile=util.ca_certs_path(),
121        )
122        if insecure is None:
123            insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
124        if insecure:
125            self._ssl_ctx.check_hostname = False
126            self._ssl_ctx.verify_mode = ssl.CERT_NONE
127        self._subscribe_lock = threading.Lock()
128        self._connect()
129        super().__init__(daemon=True)
130        self.start()

Initialize a WebSocket client

Constructor arguments:

  • url: str — The wss URL for an Arvados WebSocket server.

  • filters: arvados.events.Filter | None — One event filter to subscribe to after connecting to the WebSocket server. If not specified, the client will subscribe to all events.

  • on_event_cb: arvados.events.EventCallback — When the client receives an event from the WebSocket server, it calls this function with the event object.

  • last_log_id: int | None — If specified, this will be used as the value for the last_log_id field in subscribe messages sent by the client.

Constructor keyword arguments:

  • insecure: bool | None — If True, the client will not check the validity of the server’s TLS certificate. If not specified, uses the value from the user’s ARVADOS_API_HOST_INSECURE setting.
url
filters
on_event_cb
last_log_id
is_closed
def close(self, code: int = 1000, reason: str = '', timeout: float = 0) -> None:
157    def close(self, code: int=1000, reason: str='', timeout: float=0) -> None:
158        """Close the WebSocket connection and stop processing events
159
160        Arguments:
161
162        * code: int --- The WebSocket close code sent to the server when
163          disconnecting. Default 1000.
164
165        * reason: str --- The WebSocket close reason sent to the server when
166          disconnecting. Default is an empty string.
167
168        * timeout: float --- How long to wait for the WebSocket server to
169          acknowledge the disconnection, in seconds. Default 0, which means
170          no timeout.
171        """
172        self.is_closed.set()
173        self._client.close_timeout = timeout or None
174        self._client.close(code, reason)

Close the WebSocket connection and stop processing events

Arguments:

  • code: int — The WebSocket close code sent to the server when disconnecting. Default 1000.

  • reason: str — The WebSocket close reason sent to the server when disconnecting. Default is an empty string.

  • timeout: float — How long to wait for the WebSocket server to acknowledge the disconnection, in seconds. Default 0, which means no timeout.

def run_forever(self) -> None:
176    def run_forever(self) -> None:
177        """Run the WebSocket client indefinitely
178
179        This method blocks until the `close` method is called (e.g., from
180        another thread) or the client permanently loses its connection.
181        """
182        # Have to poll here to let KeyboardInterrupt get raised.
183        while not self.is_closed.wait(1):
184            pass

Run the WebSocket client indefinitely

This method blocks until the close method is called (e.g., from another thread) or the client permanently loses its connection.

def subscribe( self, f: List[List[Union[NoneType, str, List[List[Union[NoneType, str, ForwardRef('Filter')]]]]]], last_log_id: Optional[int] = None) -> None:
186    def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
187        """Subscribe to another set of events from the server
188
189        Arguments:
190
191        * f: arvados.events.Filter | None --- One filter to subscribe to
192          events for.
193
194        * last_log_id: int | None --- If specified, request events starting
195          from this id. If not specified, the server will only send events
196          that occur after processing the subscription.
197        """
198        with self._subscribe_lock:
199            self._subscribe(f, last_log_id)
200            self.filters.append(f)

Subscribe to another set of events from the server

Arguments:

  • f: arvados.events.Filter | None — One filter to subscribe to events for.

  • last_log_id: int | None — If specified, request events starting from this id. If not specified, the server will only send events that occur after processing the subscription.

def unsubscribe( self, f: List[List[Union[NoneType, str, List[List[Union[NoneType, str, ForwardRef('Filter')]]]]]]) -> None:
202    def unsubscribe(self, f: Filter) -> None:
203        """Unsubscribe from an event stream
204
205        Arguments:
206
207        * f: arvados.events.Filter | None --- One event filter to stop
208        receiving events for.
209        """
210        with self._subscribe_lock:
211            try:
212                index = self.filters.index(f)
213            except ValueError:
214                raise ValueError(f"filter not subscribed: {f!r}") from None
215            self._update_sub(WSMethod.UNSUBSCRIBE, f)
216            del self.filters[index]

Unsubscribe from an event stream

Arguments:

def on_closed(self) -> None:
218    def on_closed(self) -> None:
219        """Handle disconnection from the WebSocket server
220
221        This method is called when the client loses its connection from
222        receiving events. This implementation tries to establish a new
223        connection if it was not closed client-side.
224        """
225        if self.is_closed.is_set():
226            return
227        _logger.warning("Unexpected close. Reconnecting.")
228        for _ in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
229            try:
230                self._connect()
231            except Exception as e:
232                _logger.warning("Error '%s' during websocket reconnect.", e)
233            else:
234                _logger.warning("Reconnect successful.")
235                break
236        else:
237            _logger.error("EventClient thread could not contact websocket server.")
238            self.is_closed.set()
239            _thread.interrupt_main()

Handle disconnection from the WebSocket server

This method is called when the client loses its connection from receiving events. This implementation tries to establish a new connection if it was not closed client-side.

def on_event(self, m: Dict[str, Any]) -> None:
241    def on_event(self, m: Dict[str, Any]) -> None:
242        """Handle an event from the WebSocket server
243
244        This method is called whenever the client receives an event from the
245        server. This implementation records the `id` field internally, then
246        calls the callback function provided at initialization time.
247
248        Arguments:
249
250        * m: Dict[str, Any] --- The event object, deserialized from JSON.
251        """
252        try:
253            self.last_log_id = m['id']
254        except KeyError:
255            pass
256        try:
257            self.on_event_cb(m)
258        except Exception:
259            _logger.exception("Unexpected exception from event callback.")
260            _thread.interrupt_main()

Handle an event from the WebSocket server

This method is called whenever the client receives an event from the server. This implementation records the id field internally, then calls the callback function provided at initialization time.

Arguments:

  • m: Dict[str, Any] — The event object, deserialized from JSON.
def run(self) -> None:
262    def run(self) -> None:
263        """Run the client loop
264
265        This method runs in a separate thread to receive and process events
266        from the server.
267        """
268        self.setName(f'ArvadosWebsockets-{self.ident}')
269        while self._client_ok and not self.is_closed.is_set():
270            try:
271                with self._subscribe_lock:
272                    for f in self.filters:
273                        self._subscribe(f, self.last_log_id)
274                for msg_s in self._client:
275                    if not self.is_closed.is_set():
276                        msg = json.loads(msg_s)
277                        self.on_event(msg)
278            except ws_exc.ConnectionClosed:
279                self._client_ok = False
280                self.on_closed()

Run the client loop

This method runs in a separate thread to receive and process events from the server.

Inherited Members
threading.Thread
start
join
name
ident
is_alive
daemon
isDaemon
setDaemon
getName
setName
native_id
class PollClient(threading.Thread):
283class PollClient(threading.Thread):
284    """Follow Arvados events via polling logs
285
286    PollClient follows events on Arvados cluster by periodically running
287    logs list API calls. Users can select the events they want to follow and
288    run their own callback function on each.
289    """
290    def __init__(
291            self,
292            api: 'arvados.api_resources.ArvadosAPIClient',
293            filters: Optional[Filter],
294            on_event: EventCallback,
295            poll_time: float=15,
296            last_log_id: Optional[int]=None,
297    ) -> None:
298        """Initialize a polling client
299
300        Constructor arguments:
301
302        * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
303          client used to query logs. It will be used in a separate thread,
304          so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
305          it should not be reused after the thread is started.
306
307        * filters: arvados.events.Filter | None --- One event filter to
308          subscribe to after connecting to the WebSocket server. If not
309          specified, the client will subscribe to all events.
310
311        * on_event: arvados.events.EventCallback --- When the client
312          receives an event from the WebSocket server, it calls this
313          function with the event object.
314
315        * poll_time: float --- The number of seconds to wait between querying
316          logs. Default 15.
317
318        * last_log_id: int | None --- If specified, queries will include a
319          filter for logs with an `id` at least this value.
320        """
321        super(PollClient, self).__init__()
322        self.api = api
323        if filters:
324            self.filters = [filters]
325        else:
326            self.filters = [[]]
327        self.on_event = on_event
328        self.poll_time = poll_time
329        self.daemon = True
330        self.last_log_id = last_log_id
331        self._closing = threading.Event()
332        self._closing_lock = threading.RLock()
333
334        if self.last_log_id != None:
335            # Caller supplied the last-seen event ID from a previous
336            # connection.
337            self._skip_old_events = [["id", ">", str(self.last_log_id)]]
338        else:
339            # We need to do a reverse-order query to find the most
340            # recent event ID (see "if not self._skip_old_events"
341            # in run()).
342            self._skip_old_events = False
343
344    def run(self):
345        """Run the client loop
346
347        This method runs in a separate thread to poll and process events
348        from the server.
349        """
350        self.on_event({'status': 200})
351
352        while not self._closing.is_set():
353            moreitems = False
354            for f in self.filters:
355                for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
356                    try:
357                        if not self._skip_old_events:
358                            # If the caller didn't provide a known
359                            # recent ID, our first request will ask
360                            # for the single most recent event from
361                            # the last 2 hours (the time restriction
362                            # avoids doing an expensive database
363                            # query, and leaves a big enough margin to
364                            # account for clock skew). If we do find a
365                            # recent event, we remember its ID but
366                            # then discard it (we are supposed to be
367                            # returning new/current events, not old
368                            # ones).
369                            #
370                            # Subsequent requests will get multiple
371                            # events in chronological order, and
372                            # filter on that same cutoff time, or
373                            # (once we see our first matching event)
374                            # the ID of the last-seen event.
375                            #
376                            # Note: self._skip_old_events must not be
377                            # set until the threshold is decided.
378                            # Otherwise, tests will be unreliable.
379                            filter_by_time = [[
380                                "created_at", ">=",
381                                time.strftime(
382                                    "%Y-%m-%dT%H:%M:%SZ",
383                                    time.gmtime(time.time()-7200))]]
384                            items = self.api.logs().list(
385                                order="id desc",
386                                limit=1,
387                                filters=f+filter_by_time).execute()
388                            if items["items"]:
389                                self._skip_old_events = [
390                                    ["id", ">", str(items["items"][0]["id"])]]
391                                items = {
392                                    "items": [],
393                                    "items_available": 0,
394                                }
395                            else:
396                                # No recent events. We can keep using
397                                # the same timestamp threshold until
398                                # we receive our first new event.
399                                self._skip_old_events = filter_by_time
400                        else:
401                            # In this case, either we know the most
402                            # recent matching ID, or we know there
403                            # were no matching events in the 2-hour
404                            # window before subscribing. Either way we
405                            # can safely ask for events in ascending
406                            # order.
407                            items = self.api.logs().list(
408                                order="id asc",
409                                filters=f+self._skip_old_events).execute()
410                        break
411                    except errors.ApiError as error:
412                        pass
413                    else:
414                        tries_left = 0
415                        break
416                if tries_left == 0:
417                    _logger.exception("PollClient thread could not contact API server.")
418                    with self._closing_lock:
419                        self._closing.set()
420                    _thread.interrupt_main()
421                    return
422                for i in items["items"]:
423                    self._skip_old_events = [["id", ">", str(i["id"])]]
424                    with self._closing_lock:
425                        if self._closing.is_set():
426                            return
427                        try:
428                            self.on_event(i)
429                        except Exception as e:
430                            _logger.exception("Unexpected exception from event callback.")
431                            _thread.interrupt_main()
432                if items["items_available"] > len(items["items"]):
433                    moreitems = True
434            if not moreitems:
435                self._closing.wait(self.poll_time)
436
437    def run_forever(self):
438        """Run the polling client indefinitely
439
440        This method blocks until the `close` method is called (e.g., from
441        another thread) or the client permanently loses its connection.
442        """
443        # Have to poll here, otherwise KeyboardInterrupt will never get processed.
444        while not self._closing.is_set():
445            self._closing.wait(1)
446
447    def close(self, code: Optional[int]=None, reason: Optional[str]=None, timeout: float=0) -> None:
448        """Stop polling and processing events
449
450        Arguments:
451
452        * code: Optional[int] --- Ignored; this argument exists for API
453          compatibility with `EventClient.close`.
454
455        * reason: Optional[str] --- Ignored; this argument exists for API
456          compatibility with `EventClient.close`.
457
458        * timeout: float --- How long to wait for the client thread to finish
459          processing events. Default 0, which means no timeout.
460        """
461        with self._closing_lock:
462            self._closing.set()
463        try:
464            self.join(timeout=timeout)
465        except RuntimeError:
466            # "join() raises a RuntimeError if an attempt is made to join the
467            # current thread as that would cause a deadlock. It is also an
468            # error to join() a thread before it has been started and attempts
469            # to do so raises the same exception."
470            pass
471
472    def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
473        """Subscribe to another set of events from the server
474
475        Arguments:
476
477        * f: arvados.events.Filter | None --- One filter to subscribe to.
478
479        * last_log_id: Optional[int] --- Ignored; this argument exists for
480          API compatibility with `EventClient.subscribe`.
481        """
482        self.on_event({'status': 200})
483        self.filters.append(f)
484
485    def unsubscribe(self, f):
486        """Unsubscribe from an event stream
487
488        Arguments:
489
490        * f: arvados.events.Filter | None --- One event filter to stop
491        receiving events for.
492        """
493        del self.filters[self.filters.index(f)]

Follow Arvados events via polling logs

PollClient follows events on Arvados cluster by periodically running logs list API calls. Users can select the events they want to follow and run their own callback function on each.

PollClient( api: arvados.api_resources.ArvadosAPIClient, filters: Optional[List[List[Union[NoneType, str, List[List[Union[NoneType, str, ForwardRef('Filter')]]]]]]], on_event: Callable[[Dict[str, Any]], object], poll_time: float = 15, last_log_id: Optional[int] = None)
290    def __init__(
291            self,
292            api: 'arvados.api_resources.ArvadosAPIClient',
293            filters: Optional[Filter],
294            on_event: EventCallback,
295            poll_time: float=15,
296            last_log_id: Optional[int]=None,
297    ) -> None:
298        """Initialize a polling client
299
300        Constructor arguments:
301
302        * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
303          client used to query logs. It will be used in a separate thread,
304          so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
305          it should not be reused after the thread is started.
306
307        * filters: arvados.events.Filter | None --- One event filter to
308          subscribe to after connecting to the WebSocket server. If not
309          specified, the client will subscribe to all events.
310
311        * on_event: arvados.events.EventCallback --- When the client
312          receives an event from the WebSocket server, it calls this
313          function with the event object.
314
315        * poll_time: float --- The number of seconds to wait between querying
316          logs. Default 15.
317
318        * last_log_id: int | None --- If specified, queries will include a
319          filter for logs with an `id` at least this value.
320        """
321        super(PollClient, self).__init__()
322        self.api = api
323        if filters:
324            self.filters = [filters]
325        else:
326            self.filters = [[]]
327        self.on_event = on_event
328        self.poll_time = poll_time
329        self.daemon = True
330        self.last_log_id = last_log_id
331        self._closing = threading.Event()
332        self._closing_lock = threading.RLock()
333
334        if self.last_log_id != None:
335            # Caller supplied the last-seen event ID from a previous
336            # connection.
337            self._skip_old_events = [["id", ">", str(self.last_log_id)]]
338        else:
339            # We need to do a reverse-order query to find the most
340            # recent event ID (see "if not self._skip_old_events"
341            # in run()).
342            self._skip_old_events = False

Initialize a polling client

Constructor arguments:

  • api: arvados.api_resources.ArvadosAPIClient — The Arvados API client used to query logs. It will be used in a separate thread, so if it is not an instance of arvados.safeapi.ThreadSafeApiCache it should not be reused after the thread is started.

  • filters: arvados.events.Filter | None — One event filter to subscribe to after connecting to the WebSocket server. If not specified, the client will subscribe to all events.

  • on_event: arvados.events.EventCallback — When the client receives an event from the WebSocket server, it calls this function with the event object.

  • poll_time: float — The number of seconds to wait between querying logs. Default 15.

  • last_log_id: int | None — If specified, queries will include a filter for logs with an id at least this value.

api
on_event
poll_time
daemon
1108    @property
1109    def daemon(self):
1110        """A boolean value indicating whether this thread is a daemon thread.
1111
1112        This must be set before start() is called, otherwise RuntimeError is
1113        raised. Its initial value is inherited from the creating thread; the
1114        main thread is not a daemon thread and therefore all threads created in
1115        the main thread default to daemon = False.
1116
1117        The entire Python program exits when only daemon threads are left.
1118
1119        """
1120        assert self._initialized, "Thread.__init__() not called"
1121        return self._daemonic

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

last_log_id
def run(self):
344    def run(self):
345        """Run the client loop
346
347        This method runs in a separate thread to poll and process events
348        from the server.
349        """
350        self.on_event({'status': 200})
351
352        while not self._closing.is_set():
353            moreitems = False
354            for f in self.filters:
355                for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
356                    try:
357                        if not self._skip_old_events:
358                            # If the caller didn't provide a known
359                            # recent ID, our first request will ask
360                            # for the single most recent event from
361                            # the last 2 hours (the time restriction
362                            # avoids doing an expensive database
363                            # query, and leaves a big enough margin to
364                            # account for clock skew). If we do find a
365                            # recent event, we remember its ID but
366                            # then discard it (we are supposed to be
367                            # returning new/current events, not old
368                            # ones).
369                            #
370                            # Subsequent requests will get multiple
371                            # events in chronological order, and
372                            # filter on that same cutoff time, or
373                            # (once we see our first matching event)
374                            # the ID of the last-seen event.
375                            #
376                            # Note: self._skip_old_events must not be
377                            # set until the threshold is decided.
378                            # Otherwise, tests will be unreliable.
379                            filter_by_time = [[
380                                "created_at", ">=",
381                                time.strftime(
382                                    "%Y-%m-%dT%H:%M:%SZ",
383                                    time.gmtime(time.time()-7200))]]
384                            items = self.api.logs().list(
385                                order="id desc",
386                                limit=1,
387                                filters=f+filter_by_time).execute()
388                            if items["items"]:
389                                self._skip_old_events = [
390                                    ["id", ">", str(items["items"][0]["id"])]]
391                                items = {
392                                    "items": [],
393                                    "items_available": 0,
394                                }
395                            else:
396                                # No recent events. We can keep using
397                                # the same timestamp threshold until
398                                # we receive our first new event.
399                                self._skip_old_events = filter_by_time
400                        else:
401                            # In this case, either we know the most
402                            # recent matching ID, or we know there
403                            # were no matching events in the 2-hour
404                            # window before subscribing. Either way we
405                            # can safely ask for events in ascending
406                            # order.
407                            items = self.api.logs().list(
408                                order="id asc",
409                                filters=f+self._skip_old_events).execute()
410                        break
411                    except errors.ApiError as error:
412                        pass
413                    else:
414                        tries_left = 0
415                        break
416                if tries_left == 0:
417                    _logger.exception("PollClient thread could not contact API server.")
418                    with self._closing_lock:
419                        self._closing.set()
420                    _thread.interrupt_main()
421                    return
422                for i in items["items"]:
423                    self._skip_old_events = [["id", ">", str(i["id"])]]
424                    with self._closing_lock:
425                        if self._closing.is_set():
426                            return
427                        try:
428                            self.on_event(i)
429                        except Exception as e:
430                            _logger.exception("Unexpected exception from event callback.")
431                            _thread.interrupt_main()
432                if items["items_available"] > len(items["items"]):
433                    moreitems = True
434            if not moreitems:
435                self._closing.wait(self.poll_time)

Run the client loop

This method runs in a separate thread to poll and process events from the server.

def run_forever(self):
437    def run_forever(self):
438        """Run the polling client indefinitely
439
440        This method blocks until the `close` method is called (e.g., from
441        another thread) or the client permanently loses its connection.
442        """
443        # Have to poll here, otherwise KeyboardInterrupt will never get processed.
444        while not self._closing.is_set():
445            self._closing.wait(1)

Run the polling client indefinitely

This method blocks until the close method is called (e.g., from another thread) or the client permanently loses its connection.

def close( self, code: Optional[int] = None, reason: Optional[str] = None, timeout: float = 0) -> None:
447    def close(self, code: Optional[int]=None, reason: Optional[str]=None, timeout: float=0) -> None:
448        """Stop polling and processing events
449
450        Arguments:
451
452        * code: Optional[int] --- Ignored; this argument exists for API
453          compatibility with `EventClient.close`.
454
455        * reason: Optional[str] --- Ignored; this argument exists for API
456          compatibility with `EventClient.close`.
457
458        * timeout: float --- How long to wait for the client thread to finish
459          processing events. Default 0, which means no timeout.
460        """
461        with self._closing_lock:
462            self._closing.set()
463        try:
464            self.join(timeout=timeout)
465        except RuntimeError:
466            # "join() raises a RuntimeError if an attempt is made to join the
467            # current thread as that would cause a deadlock. It is also an
468            # error to join() a thread before it has been started and attempts
469            # to do so raises the same exception."
470            pass

Stop polling and processing events

Arguments:

  • code: Optional[int] — Ignored; this argument exists for API compatibility with EventClient.close.

  • reason: Optional[str] — Ignored; this argument exists for API compatibility with EventClient.close.

  • timeout: float — How long to wait for the client thread to finish processing events. Default 0, which means no timeout.

def subscribe( self, f: List[List[Union[NoneType, str, List[List[Union[NoneType, str, ForwardRef('Filter')]]]]]], last_log_id: Optional[int] = None) -> None:
472    def subscribe(self, f: Filter, last_log_id: Optional[int]=None) -> None:
473        """Subscribe to another set of events from the server
474
475        Arguments:
476
477        * f: arvados.events.Filter | None --- One filter to subscribe to.
478
479        * last_log_id: Optional[int] --- Ignored; this argument exists for
480          API compatibility with `EventClient.subscribe`.
481        """
482        self.on_event({'status': 200})
483        self.filters.append(f)

Subscribe to another set of events from the server

Arguments:

def unsubscribe(self, f):
485    def unsubscribe(self, f):
486        """Unsubscribe from an event stream
487
488        Arguments:
489
490        * f: arvados.events.Filter | None --- One event filter to stop
491        receiving events for.
492        """
493        del self.filters[self.filters.index(f)]

Unsubscribe from an event stream

Arguments:

Inherited Members
threading.Thread
start
join
name
ident
is_alive
isDaemon
setDaemon
getName
setName
native_id
def subscribe( api: arvados.api_resources.ArvadosAPIClient, filters: Optional[List[List[Union[NoneType, str, List[List[Union[NoneType, str, ForwardRef('Filter')]]]]]]], on_event: Callable[[Dict[str, Any]], object], poll_fallback: float = 15, last_log_id: Optional[int] = None) -> Union[EventClient, PollClient]:
510def subscribe(
511        api: 'arvados.api_resources.ArvadosAPIClient',
512        filters: Optional[Filter],
513        on_event: EventCallback,
514        poll_fallback: float=15,
515        last_log_id: Optional[int]=None,
516) -> Union[EventClient, PollClient]:
517    """Start a thread to monitor events
518
519    This method tries to construct an `EventClient` to process Arvados
520    events via WebSockets. If that fails, or the
521    `ARVADOS_DISABLE_WEBSOCKETS` flag is set in user configuration, it falls
522    back to constructing a `PollClient` to process the events via API
523    polling.
524
525    Arguments:
526
527    * api: arvados.api_resources.ArvadosAPIClient --- The Arvados API
528      client used to query logs. It may be used in a separate thread,
529      so if it is not an instance of `arvados.safeapi.ThreadSafeApiCache`
530      it should not be reused after this method returns.
531
532    * filters: arvados.events.Filter | None --- One event filter to
533      subscribe to after initializing the client. If not specified, the
534      client will subscribe to all events.
535
536    * on_event: arvados.events.EventCallback --- When the client receives an
537      event, it calls this function with the event object.
538
539    * poll_time: float --- The number of seconds to wait between querying
540      logs. If 0, this function will refuse to construct a `PollClient`.
541      Default 15.
542
543    * last_log_id: int | None --- If specified, start processing events with
544      at least this `id` value.
545    """
546    if not poll_fallback:
547        return _subscribe_websocket(api, filters, on_event, last_log_id)
548
549    try:
550        if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
551            return _subscribe_websocket(api, filters, on_event, last_log_id)
552        else:
553            _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
554    except Exception as e:
555        _logger.warning("Falling back to polling after websocket error: %s" % e)
556    p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
557    p.start()
558    return p

Start a thread to monitor events

This method tries to construct an EventClient to process Arvados events via WebSockets. If that fails, or the ARVADOS_DISABLE_WEBSOCKETS flag is set in user configuration, it falls back to constructing a PollClient to process the events via API polling.

Arguments:

  • api: arvados.api_resources.ArvadosAPIClient — The Arvados API client used to query logs. It may be used in a separate thread, so if it is not an instance of arvados.safeapi.ThreadSafeApiCache it should not be reused after this method returns.

  • filters: arvados.events.Filter | None — One event filter to subscribe to after initializing the client. If not specified, the client will subscribe to all events.

  • on_event: arvados.events.EventCallback — When the client receives an event, it calls this function with the event object.

  • poll_time: float — The number of seconds to wait between querying logs. If 0, this function will refuse to construct a PollClient. Default 15.

  • last_log_id: int | None — If specified, start processing events with at least this id value.