arvados.events

  1# Copyright (C) The Arvados Authors. All rights reserved.
  2#
  3# SPDX-License-Identifier: Apache-2.0
  4
  5from __future__ import absolute_import
  6from future import standard_library
  7standard_library.install_aliases()
  8from builtins import str
  9from builtins import object
 10import arvados
 11from . import config
 12from . import errors
 13from .retry import RetryLoop
 14
 15import logging
 16import json
 17import _thread
 18import threading
 19import time
 20import os
 21import re
 22import ssl
 23from ws4py.client.threadedclient import WebSocketClient
 24
 25_logger = logging.getLogger('arvados.events')
 26
 27
 28class _EventClient(WebSocketClient):
 29    def __init__(self, url, filters, on_event, last_log_id, on_closed):
 30        ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
 31        if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
 32            ssl_options['cert_reqs'] = ssl.CERT_NONE
 33        else:
 34            ssl_options['cert_reqs'] = ssl.CERT_REQUIRED
 35
 36        # Warning: If the host part of url resolves to both IPv6 and
 37        # IPv4 addresses (common with "localhost"), only one of them
 38        # will be attempted -- and it might not be the right one. See
 39        # ws4py's WebSocketBaseClient.__init__.
 40        super(_EventClient, self).__init__(url, ssl_options=ssl_options)
 41
 42        self.filters = filters
 43        self.on_event = on_event
 44        self.last_log_id = last_log_id
 45        self._closing_lock = threading.RLock()
 46        self._closing = False
 47        self._closed = threading.Event()
 48        self.on_closed = on_closed
 49
 50    def opened(self):
 51        for f in self.filters:
 52            self.subscribe(f, self.last_log_id)
 53
 54    def closed(self, code, reason=None):
 55        self._closed.set()
 56        self.on_closed()
 57
 58    def received_message(self, m):
 59        with self._closing_lock:
 60            if not self._closing:
 61                self.on_event(json.loads(str(m)))
 62
 63    def close(self, code=1000, reason='', timeout=0):
 64        """Close event client and optionally wait for it to finish.
 65
 66        :timeout: is the number of seconds to wait for ws4py to
 67        indicate that the connection has closed.
 68        """
 69        super(_EventClient, self).close(code, reason)
 70        with self._closing_lock:
 71            # make sure we don't process any more messages.
 72            self._closing = True
 73        # wait for ws4py to tell us the connection is closed.
 74        self._closed.wait(timeout=timeout)
 75
 76    def subscribe(self, f, last_log_id=None):
 77        m = {"method": "subscribe", "filters": f}
 78        if last_log_id is not None:
 79            m["last_log_id"] = last_log_id
 80        self.send(json.dumps(m))
 81
 82    def unsubscribe(self, f):
 83        self.send(json.dumps({"method": "unsubscribe", "filters": f}))
 84
 85
 86class EventClient(object):
 87    def __init__(self, url, filters, on_event_cb, last_log_id):
 88        self.url = url
 89        if filters:
 90            self.filters = [filters]
 91        else:
 92            self.filters = [[]]
 93        self.on_event_cb = on_event_cb
 94        self.last_log_id = last_log_id
 95        self.is_closed = threading.Event()
 96        self._setup_event_client()
 97
 98    def _setup_event_client(self):
 99        self.ec = _EventClient(self.url, self.filters, self.on_event,
100                               self.last_log_id, self.on_closed)
101        self.ec.daemon = True
102        try:
103            self.ec.connect()
104        except Exception:
105            self.ec.close_connection()
106            raise
107
108    def subscribe(self, f, last_log_id=None):
109        self.filters.append(f)
110        self.ec.subscribe(f, last_log_id)
111
112    def unsubscribe(self, f):
113        del self.filters[self.filters.index(f)]
114        self.ec.unsubscribe(f)
115
116    def close(self, code=1000, reason='', timeout=0):
117        self.is_closed.set()
118        self.ec.close(code, reason, timeout)
119
120    def on_event(self, m):
121        if m.get('id') != None:
122            self.last_log_id = m.get('id')
123        try:
124            self.on_event_cb(m)
125        except Exception as e:
126            _logger.exception("Unexpected exception from event callback.")
127            _thread.interrupt_main()
128
129    def on_closed(self):
130        if not self.is_closed.is_set():
131            _logger.warning("Unexpected close. Reconnecting.")
132            for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
133                try:
134                    self._setup_event_client()
135                    _logger.warning("Reconnect successful.")
136                    break
137                except Exception as e:
138                    _logger.warning("Error '%s' during websocket reconnect.", e)
139            if tries_left == 0:
140                _logger.exception("EventClient thread could not contact websocket server.")
141                self.is_closed.set()
142                _thread.interrupt_main()
143                return
144
145    def run_forever(self):
146        # Have to poll here to let KeyboardInterrupt get raised.
147        while not self.is_closed.wait(1):
148            pass
149
150
151class PollClient(threading.Thread):
152    def __init__(self, api, filters, on_event, poll_time, last_log_id):
153        super(PollClient, self).__init__()
154        self.api = api
155        if filters:
156            self.filters = [filters]
157        else:
158            self.filters = [[]]
159        self.on_event = on_event
160        self.poll_time = poll_time
161        self.daemon = True
162        self.last_log_id = last_log_id
163        self._closing = threading.Event()
164        self._closing_lock = threading.RLock()
165
166        if self.last_log_id != None:
167            # Caller supplied the last-seen event ID from a previous
168            # connection.
169            self._skip_old_events = [["id", ">", str(self.last_log_id)]]
170        else:
171            # We need to do a reverse-order query to find the most
172            # recent event ID (see "if not self._skip_old_events"
173            # in run()).
174            self._skip_old_events = False
175
176    def run(self):
177        self.on_event({'status': 200})
178
179        while not self._closing.is_set():
180            moreitems = False
181            for f in self.filters:
182                for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
183                    try:
184                        if not self._skip_old_events:
185                            # If the caller didn't provide a known
186                            # recent ID, our first request will ask
187                            # for the single most recent event from
188                            # the last 2 hours (the time restriction
189                            # avoids doing an expensive database
190                            # query, and leaves a big enough margin to
191                            # account for clock skew). If we do find a
192                            # recent event, we remember its ID but
193                            # then discard it (we are supposed to be
194                            # returning new/current events, not old
195                            # ones).
196                            #
197                            # Subsequent requests will get multiple
198                            # events in chronological order, and
199                            # filter on that same cutoff time, or
200                            # (once we see our first matching event)
201                            # the ID of the last-seen event.
202                            #
203                            # Note: self._skip_old_events must not be
204                            # set until the threshold is decided.
205                            # Otherwise, tests will be unreliable.
206                            filter_by_time = [[
207                                "created_at", ">=",
208                                time.strftime(
209                                    "%Y-%m-%dT%H:%M:%SZ",
210                                    time.gmtime(time.time()-7200))]]
211                            items = self.api.logs().list(
212                                order="id desc",
213                                limit=1,
214                                filters=f+filter_by_time).execute()
215                            if items["items"]:
216                                self._skip_old_events = [
217                                    ["id", ">", str(items["items"][0]["id"])]]
218                                items = {
219                                    "items": [],
220                                    "items_available": 0,
221                                }
222                            else:
223                                # No recent events. We can keep using
224                                # the same timestamp threshold until
225                                # we receive our first new event.
226                                self._skip_old_events = filter_by_time
227                        else:
228                            # In this case, either we know the most
229                            # recent matching ID, or we know there
230                            # were no matching events in the 2-hour
231                            # window before subscribing. Either way we
232                            # can safely ask for events in ascending
233                            # order.
234                            items = self.api.logs().list(
235                                order="id asc",
236                                filters=f+self._skip_old_events).execute()
237                        break
238                    except errors.ApiError as error:
239                        pass
240                    else:
241                        tries_left = 0
242                        break
243                if tries_left == 0:
244                    _logger.exception("PollClient thread could not contact API server.")
245                    with self._closing_lock:
246                        self._closing.set()
247                    _thread.interrupt_main()
248                    return
249                for i in items["items"]:
250                    self._skip_old_events = [["id", ">", str(i["id"])]]
251                    with self._closing_lock:
252                        if self._closing.is_set():
253                            return
254                        try:
255                            self.on_event(i)
256                        except Exception as e:
257                            _logger.exception("Unexpected exception from event callback.")
258                            _thread.interrupt_main()
259                if items["items_available"] > len(items["items"]):
260                    moreitems = True
261            if not moreitems:
262                self._closing.wait(self.poll_time)
263
264    def run_forever(self):
265        # Have to poll here, otherwise KeyboardInterrupt will never get processed.
266        while not self._closing.is_set():
267            self._closing.wait(1)
268
269    def close(self, code=None, reason=None, timeout=0):
270        """Close poll client and optionally wait for it to finish.
271
272        If an :on_event: handler is running in a different thread,
273        first wait (indefinitely) for it to return.
274
275        After closing, wait up to :timeout: seconds for the thread to
276        finish the poll request in progress (if any).
277
278        :code: and :reason: are ignored. They are present for
279        interface compatibility with EventClient.
280        """
281
282        with self._closing_lock:
283            self._closing.set()
284        try:
285            self.join(timeout=timeout)
286        except RuntimeError:
287            # "join() raises a RuntimeError if an attempt is made to join the
288            # current thread as that would cause a deadlock. It is also an
289            # error to join() a thread before it has been started and attempts
290            # to do so raises the same exception."
291            pass
292
293    def subscribe(self, f):
294        self.on_event({'status': 200})
295        self.filters.append(f)
296
297    def unsubscribe(self, f):
298        del self.filters[self.filters.index(f)]
299
300
301def _subscribe_websocket(api, filters, on_event, last_log_id=None):
302    endpoint = api._rootDesc.get('websocketUrl', None)
303    if not endpoint:
304        raise errors.FeatureNotEnabledError(
305            "Server does not advertise a websocket endpoint")
306    uri_with_token = "{}?api_token={}".format(endpoint, api.api_token)
307    try:
308        client = EventClient(uri_with_token, filters, on_event, last_log_id)
309    except Exception:
310        _logger.warning("Failed to connect to websockets on %s" % endpoint)
311        raise
312    else:
313        return client
314
315
316def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
317    """
318    :api:
319      a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
320    :filters:
321      Initial subscription filters.
322    :on_event:
323      The callback when a message is received.
324    :poll_fallback:
325      If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
326    :last_log_id:
327      Log rows that are newer than the log id
328    """
329
330    if not poll_fallback:
331        return _subscribe_websocket(api, filters, on_event, last_log_id)
332
333    try:
334        if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
335            return _subscribe_websocket(api, filters, on_event, last_log_id)
336        else:
337            _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
338    except Exception as e:
339        _logger.warning("Falling back to polling after websocket error: %s" % e)
340    p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
341    p.start()
342    return p
class EventClient:
 87class EventClient(object):
 88    def __init__(self, url, filters, on_event_cb, last_log_id):
 89        self.url = url
 90        if filters:
 91            self.filters = [filters]
 92        else:
 93            self.filters = [[]]
 94        self.on_event_cb = on_event_cb
 95        self.last_log_id = last_log_id
 96        self.is_closed = threading.Event()
 97        self._setup_event_client()
 98
 99    def _setup_event_client(self):
100        self.ec = _EventClient(self.url, self.filters, self.on_event,
101                               self.last_log_id, self.on_closed)
102        self.ec.daemon = True
103        try:
104            self.ec.connect()
105        except Exception:
106            self.ec.close_connection()
107            raise
108
109    def subscribe(self, f, last_log_id=None):
110        self.filters.append(f)
111        self.ec.subscribe(f, last_log_id)
112
113    def unsubscribe(self, f):
114        del self.filters[self.filters.index(f)]
115        self.ec.unsubscribe(f)
116
117    def close(self, code=1000, reason='', timeout=0):
118        self.is_closed.set()
119        self.ec.close(code, reason, timeout)
120
121    def on_event(self, m):
122        if m.get('id') != None:
123            self.last_log_id = m.get('id')
124        try:
125            self.on_event_cb(m)
126        except Exception as e:
127            _logger.exception("Unexpected exception from event callback.")
128            _thread.interrupt_main()
129
130    def on_closed(self):
131        if not self.is_closed.is_set():
132            _logger.warning("Unexpected close. Reconnecting.")
133            for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
134                try:
135                    self._setup_event_client()
136                    _logger.warning("Reconnect successful.")
137                    break
138                except Exception as e:
139                    _logger.warning("Error '%s' during websocket reconnect.", e)
140            if tries_left == 0:
141                _logger.exception("EventClient thread could not contact websocket server.")
142                self.is_closed.set()
143                _thread.interrupt_main()
144                return
145
146    def run_forever(self):
147        # Have to poll here to let KeyboardInterrupt get raised.
148        while not self.is_closed.wait(1):
149            pass
EventClient(url, filters, on_event_cb, last_log_id)
88    def __init__(self, url, filters, on_event_cb, last_log_id):
89        self.url = url
90        if filters:
91            self.filters = [filters]
92        else:
93            self.filters = [[]]
94        self.on_event_cb = on_event_cb
95        self.last_log_id = last_log_id
96        self.is_closed = threading.Event()
97        self._setup_event_client()
url
on_event_cb
last_log_id
is_closed
def subscribe(self, f, last_log_id=None):
109    def subscribe(self, f, last_log_id=None):
110        self.filters.append(f)
111        self.ec.subscribe(f, last_log_id)
def unsubscribe(self, f):
113    def unsubscribe(self, f):
114        del self.filters[self.filters.index(f)]
115        self.ec.unsubscribe(f)
def close(self, code=1000, reason='', timeout=0):
117    def close(self, code=1000, reason='', timeout=0):
118        self.is_closed.set()
119        self.ec.close(code, reason, timeout)
def on_event(self, m):
121    def on_event(self, m):
122        if m.get('id') != None:
123            self.last_log_id = m.get('id')
124        try:
125            self.on_event_cb(m)
126        except Exception as e:
127            _logger.exception("Unexpected exception from event callback.")
128            _thread.interrupt_main()
def on_closed(self):
130    def on_closed(self):
131        if not self.is_closed.is_set():
132            _logger.warning("Unexpected close. Reconnecting.")
133            for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
134                try:
135                    self._setup_event_client()
136                    _logger.warning("Reconnect successful.")
137                    break
138                except Exception as e:
139                    _logger.warning("Error '%s' during websocket reconnect.", e)
140            if tries_left == 0:
141                _logger.exception("EventClient thread could not contact websocket server.")
142                self.is_closed.set()
143                _thread.interrupt_main()
144                return
def run_forever(self):
146    def run_forever(self):
147        # Have to poll here to let KeyboardInterrupt get raised.
148        while not self.is_closed.wait(1):
149            pass
class PollClient(threading.Thread):
152class PollClient(threading.Thread):
153    def __init__(self, api, filters, on_event, poll_time, last_log_id):
154        super(PollClient, self).__init__()
155        self.api = api
156        if filters:
157            self.filters = [filters]
158        else:
159            self.filters = [[]]
160        self.on_event = on_event
161        self.poll_time = poll_time
162        self.daemon = True
163        self.last_log_id = last_log_id
164        self._closing = threading.Event()
165        self._closing_lock = threading.RLock()
166
167        if self.last_log_id != None:
168            # Caller supplied the last-seen event ID from a previous
169            # connection.
170            self._skip_old_events = [["id", ">", str(self.last_log_id)]]
171        else:
172            # We need to do a reverse-order query to find the most
173            # recent event ID (see "if not self._skip_old_events"
174            # in run()).
175            self._skip_old_events = False
176
177    def run(self):
178        self.on_event({'status': 200})
179
180        while not self._closing.is_set():
181            moreitems = False
182            for f in self.filters:
183                for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
184                    try:
185                        if not self._skip_old_events:
186                            # If the caller didn't provide a known
187                            # recent ID, our first request will ask
188                            # for the single most recent event from
189                            # the last 2 hours (the time restriction
190                            # avoids doing an expensive database
191                            # query, and leaves a big enough margin to
192                            # account for clock skew). If we do find a
193                            # recent event, we remember its ID but
194                            # then discard it (we are supposed to be
195                            # returning new/current events, not old
196                            # ones).
197                            #
198                            # Subsequent requests will get multiple
199                            # events in chronological order, and
200                            # filter on that same cutoff time, or
201                            # (once we see our first matching event)
202                            # the ID of the last-seen event.
203                            #
204                            # Note: self._skip_old_events must not be
205                            # set until the threshold is decided.
206                            # Otherwise, tests will be unreliable.
207                            filter_by_time = [[
208                                "created_at", ">=",
209                                time.strftime(
210                                    "%Y-%m-%dT%H:%M:%SZ",
211                                    time.gmtime(time.time()-7200))]]
212                            items = self.api.logs().list(
213                                order="id desc",
214                                limit=1,
215                                filters=f+filter_by_time).execute()
216                            if items["items"]:
217                                self._skip_old_events = [
218                                    ["id", ">", str(items["items"][0]["id"])]]
219                                items = {
220                                    "items": [],
221                                    "items_available": 0,
222                                }
223                            else:
224                                # No recent events. We can keep using
225                                # the same timestamp threshold until
226                                # we receive our first new event.
227                                self._skip_old_events = filter_by_time
228                        else:
229                            # In this case, either we know the most
230                            # recent matching ID, or we know there
231                            # were no matching events in the 2-hour
232                            # window before subscribing. Either way we
233                            # can safely ask for events in ascending
234                            # order.
235                            items = self.api.logs().list(
236                                order="id asc",
237                                filters=f+self._skip_old_events).execute()
238                        break
239                    except errors.ApiError as error:
240                        pass
241                    else:
242                        tries_left = 0
243                        break
244                if tries_left == 0:
245                    _logger.exception("PollClient thread could not contact API server.")
246                    with self._closing_lock:
247                        self._closing.set()
248                    _thread.interrupt_main()
249                    return
250                for i in items["items"]:
251                    self._skip_old_events = [["id", ">", str(i["id"])]]
252                    with self._closing_lock:
253                        if self._closing.is_set():
254                            return
255                        try:
256                            self.on_event(i)
257                        except Exception as e:
258                            _logger.exception("Unexpected exception from event callback.")
259                            _thread.interrupt_main()
260                if items["items_available"] > len(items["items"]):
261                    moreitems = True
262            if not moreitems:
263                self._closing.wait(self.poll_time)
264
265    def run_forever(self):
266        # Have to poll here, otherwise KeyboardInterrupt will never get processed.
267        while not self._closing.is_set():
268            self._closing.wait(1)
269
270    def close(self, code=None, reason=None, timeout=0):
271        """Close poll client and optionally wait for it to finish.
272
273        If an :on_event: handler is running in a different thread,
274        first wait (indefinitely) for it to return.
275
276        After closing, wait up to :timeout: seconds for the thread to
277        finish the poll request in progress (if any).
278
279        :code: and :reason: are ignored. They are present for
280        interface compatibility with EventClient.
281        """
282
283        with self._closing_lock:
284            self._closing.set()
285        try:
286            self.join(timeout=timeout)
287        except RuntimeError:
288            # "join() raises a RuntimeError if an attempt is made to join the
289            # current thread as that would cause a deadlock. It is also an
290            # error to join() a thread before it has been started and attempts
291            # to do so raises the same exception."
292            pass
293
294    def subscribe(self, f):
295        self.on_event({'status': 200})
296        self.filters.append(f)
297
298    def unsubscribe(self, f):
299        del self.filters[self.filters.index(f)]

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

PollClient(api, filters, on_event, poll_time, last_log_id)
153    def __init__(self, api, filters, on_event, poll_time, last_log_id):
154        super(PollClient, self).__init__()
155        self.api = api
156        if filters:
157            self.filters = [filters]
158        else:
159            self.filters = [[]]
160        self.on_event = on_event
161        self.poll_time = poll_time
162        self.daemon = True
163        self.last_log_id = last_log_id
164        self._closing = threading.Event()
165        self._closing_lock = threading.RLock()
166
167        if self.last_log_id != None:
168            # Caller supplied the last-seen event ID from a previous
169            # connection.
170            self._skip_old_events = [["id", ">", str(self.last_log_id)]]
171        else:
172            # We need to do a reverse-order query to find the most
173            # recent event ID (see "if not self._skip_old_events"
174            # in run()).
175            self._skip_old_events = False

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

api
on_event
poll_time
daemon
1108    @property
1109    def daemon(self):
1110        """A boolean value indicating whether this thread is a daemon thread.
1111
1112        This must be set before start() is called, otherwise RuntimeError is
1113        raised. Its initial value is inherited from the creating thread; the
1114        main thread is not a daemon thread and therefore all threads created in
1115        the main thread default to daemon = False.
1116
1117        The entire Python program exits when only daemon threads are left.
1118
1119        """
1120        assert self._initialized, "Thread.__init__() not called"
1121        return self._daemonic

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

last_log_id
def run(self):
177    def run(self):
178        self.on_event({'status': 200})
179
180        while not self._closing.is_set():
181            moreitems = False
182            for f in self.filters:
183                for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
184                    try:
185                        if not self._skip_old_events:
186                            # If the caller didn't provide a known
187                            # recent ID, our first request will ask
188                            # for the single most recent event from
189                            # the last 2 hours (the time restriction
190                            # avoids doing an expensive database
191                            # query, and leaves a big enough margin to
192                            # account for clock skew). If we do find a
193                            # recent event, we remember its ID but
194                            # then discard it (we are supposed to be
195                            # returning new/current events, not old
196                            # ones).
197                            #
198                            # Subsequent requests will get multiple
199                            # events in chronological order, and
200                            # filter on that same cutoff time, or
201                            # (once we see our first matching event)
202                            # the ID of the last-seen event.
203                            #
204                            # Note: self._skip_old_events must not be
205                            # set until the threshold is decided.
206                            # Otherwise, tests will be unreliable.
207                            filter_by_time = [[
208                                "created_at", ">=",
209                                time.strftime(
210                                    "%Y-%m-%dT%H:%M:%SZ",
211                                    time.gmtime(time.time()-7200))]]
212                            items = self.api.logs().list(
213                                order="id desc",
214                                limit=1,
215                                filters=f+filter_by_time).execute()
216                            if items["items"]:
217                                self._skip_old_events = [
218                                    ["id", ">", str(items["items"][0]["id"])]]
219                                items = {
220                                    "items": [],
221                                    "items_available": 0,
222                                }
223                            else:
224                                # No recent events. We can keep using
225                                # the same timestamp threshold until
226                                # we receive our first new event.
227                                self._skip_old_events = filter_by_time
228                        else:
229                            # In this case, either we know the most
230                            # recent matching ID, or we know there
231                            # were no matching events in the 2-hour
232                            # window before subscribing. Either way we
233                            # can safely ask for events in ascending
234                            # order.
235                            items = self.api.logs().list(
236                                order="id asc",
237                                filters=f+self._skip_old_events).execute()
238                        break
239                    except errors.ApiError as error:
240                        pass
241                    else:
242                        tries_left = 0
243                        break
244                if tries_left == 0:
245                    _logger.exception("PollClient thread could not contact API server.")
246                    with self._closing_lock:
247                        self._closing.set()
248                    _thread.interrupt_main()
249                    return
250                for i in items["items"]:
251                    self._skip_old_events = [["id", ">", str(i["id"])]]
252                    with self._closing_lock:
253                        if self._closing.is_set():
254                            return
255                        try:
256                            self.on_event(i)
257                        except Exception as e:
258                            _logger.exception("Unexpected exception from event callback.")
259                            _thread.interrupt_main()
260                if items["items_available"] > len(items["items"]):
261                    moreitems = True
262            if not moreitems:
263                self._closing.wait(self.poll_time)

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

def run_forever(self):
265    def run_forever(self):
266        # Have to poll here, otherwise KeyboardInterrupt will never get processed.
267        while not self._closing.is_set():
268            self._closing.wait(1)
def close(self, code=None, reason=None, timeout=0):
270    def close(self, code=None, reason=None, timeout=0):
271        """Close poll client and optionally wait for it to finish.
272
273        If an :on_event: handler is running in a different thread,
274        first wait (indefinitely) for it to return.
275
276        After closing, wait up to :timeout: seconds for the thread to
277        finish the poll request in progress (if any).
278
279        :code: and :reason: are ignored. They are present for
280        interface compatibility with EventClient.
281        """
282
283        with self._closing_lock:
284            self._closing.set()
285        try:
286            self.join(timeout=timeout)
287        except RuntimeError:
288            # "join() raises a RuntimeError if an attempt is made to join the
289            # current thread as that would cause a deadlock. It is also an
290            # error to join() a thread before it has been started and attempts
291            # to do so raises the same exception."
292            pass

Close poll client and optionally wait for it to finish.

If an :on_event: handler is running in a different thread, first wait (indefinitely) for it to return.

After closing, wait up to :timeout: seconds for the thread to finish the poll request in progress (if any).

:code: and :reason: are ignored. They are present for interface compatibility with EventClient.

def subscribe(self, f):
294    def subscribe(self, f):
295        self.on_event({'status': 200})
296        self.filters.append(f)
def unsubscribe(self, f):
298    def unsubscribe(self, f):
299        del self.filters[self.filters.index(f)]
Inherited Members
threading.Thread
start
join
name
ident
is_alive
isDaemon
setDaemon
getName
setName
native_id
def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
317def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
318    """
319    :api:
320      a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe().
321    :filters:
322      Initial subscription filters.
323    :on_event:
324      The callback when a message is received.
325    :poll_fallback:
326      If websockets are not available, fall back to polling every N seconds.  If poll_fallback=False, this will return None if websockets are not available.
327    :last_log_id:
328      Log rows that are newer than the log id
329    """
330
331    if not poll_fallback:
332        return _subscribe_websocket(api, filters, on_event, last_log_id)
333
334    try:
335        if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
336            return _subscribe_websocket(api, filters, on_event, last_log_id)
337        else:
338            _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
339    except Exception as e:
340        _logger.warning("Falling back to polling after websocket error: %s" % e)
341    p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
342    p.start()
343    return p

:api: a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe(). :filters: Initial subscription filters. :on_event: The callback when a message is received. :poll_fallback: If websockets are not available, fall back to polling every N seconds. If poll_fallback=False, this will return None if websockets are not available. :last_log_id: Log rows that are newer than the log id