Module arvados.events

Functions

def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None)

:api: a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe(). :filters: Initial subscription filters. :on_event: The callback when a message is received. :poll_fallback: If websockets are not available, fall back to polling every N seconds. If poll_fallback=False, this will return None if websockets are not available. :last_log_id: Log rows that are newer than the log id

Classes

class EventClient (url, filters, on_event_cb, last_log_id)
Expand source code
class EventClient(object):
    def __init__(self, url, filters, on_event_cb, last_log_id):
        self.url = url
        if filters:
            self.filters = [filters]
        else:
            self.filters = [[]]
        self.on_event_cb = on_event_cb
        self.last_log_id = last_log_id
        self.is_closed = threading.Event()
        self._setup_event_client()

    def _setup_event_client(self):
        self.ec = _EventClient(self.url, self.filters, self.on_event,
                               self.last_log_id, self.on_closed)
        self.ec.daemon = True
        try:
            self.ec.connect()
        except Exception:
            self.ec.close_connection()
            raise

    def subscribe(self, f, last_log_id=None):
        self.filters.append(f)
        self.ec.subscribe(f, last_log_id)

    def unsubscribe(self, f):
        del self.filters[self.filters.index(f)]
        self.ec.unsubscribe(f)

    def close(self, code=1000, reason='', timeout=0):
        self.is_closed.set()
        self.ec.close(code, reason, timeout)

    def on_event(self, m):
        if m.get('id') != None:
            self.last_log_id = m.get('id')
        try:
            self.on_event_cb(m)
        except Exception as e:
            _logger.exception("Unexpected exception from event callback.")
            _thread.interrupt_main()

    def on_closed(self):
        if not self.is_closed.is_set():
            _logger.warning("Unexpected close. Reconnecting.")
            for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15):
                try:
                    self._setup_event_client()
                    _logger.warning("Reconnect successful.")
                    break
                except Exception as e:
                    _logger.warning("Error '%s' during websocket reconnect.", e)
            if tries_left == 0:
                _logger.exception("EventClient thread could not contact websocket server.")
                self.is_closed.set()
                _thread.interrupt_main()
                return

    def run_forever(self):
        # Have to poll here to let KeyboardInterrupt get raised.
        while not self.is_closed.wait(1):
            pass

Methods

def close(self, code=1000, reason='', timeout=0)
def on_closed(self)
def on_event(self, m)
def run_forever(self)
def subscribe(self, f, last_log_id=None)
def unsubscribe(self, f)
class PollClient (api, filters, on_event, poll_time, last_log_id)

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.

Expand source code
class PollClient(threading.Thread):
    def __init__(self, api, filters, on_event, poll_time, last_log_id):
        super(PollClient, self).__init__()
        self.api = api
        if filters:
            self.filters = [filters]
        else:
            self.filters = [[]]
        self.on_event = on_event
        self.poll_time = poll_time
        self.daemon = True
        self.last_log_id = last_log_id
        self._closing = threading.Event()
        self._closing_lock = threading.RLock()

        if self.last_log_id != None:
            # Caller supplied the last-seen event ID from a previous
            # connection.
            self._skip_old_events = [["id", ">", str(self.last_log_id)]]
        else:
            # We need to do a reverse-order query to find the most
            # recent event ID (see "if not self._skip_old_events"
            # in run()).
            self._skip_old_events = False

    def run(self):
        self.on_event({'status': 200})

        while not self._closing.is_set():
            moreitems = False
            for f in self.filters:
                for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
                    try:
                        if not self._skip_old_events:
                            # If the caller didn't provide a known
                            # recent ID, our first request will ask
                            # for the single most recent event from
                            # the last 2 hours (the time restriction
                            # avoids doing an expensive database
                            # query, and leaves a big enough margin to
                            # account for clock skew). If we do find a
                            # recent event, we remember its ID but
                            # then discard it (we are supposed to be
                            # returning new/current events, not old
                            # ones).
                            #
                            # Subsequent requests will get multiple
                            # events in chronological order, and
                            # filter on that same cutoff time, or
                            # (once we see our first matching event)
                            # the ID of the last-seen event.
                            #
                            # Note: self._skip_old_events must not be
                            # set until the threshold is decided.
                            # Otherwise, tests will be unreliable.
                            filter_by_time = [[
                                "created_at", ">=",
                                time.strftime(
                                    "%Y-%m-%dT%H:%M:%SZ",
                                    time.gmtime(time.time()-7200))]]
                            items = self.api.logs().list(
                                order="id desc",
                                limit=1,
                                filters=f+filter_by_time).execute()
                            if items["items"]:
                                self._skip_old_events = [
                                    ["id", ">", str(items["items"][0]["id"])]]
                                items = {
                                    "items": [],
                                    "items_available": 0,
                                }
                            else:
                                # No recent events. We can keep using
                                # the same timestamp threshold until
                                # we receive our first new event.
                                self._skip_old_events = filter_by_time
                        else:
                            # In this case, either we know the most
                            # recent matching ID, or we know there
                            # were no matching events in the 2-hour
                            # window before subscribing. Either way we
                            # can safely ask for events in ascending
                            # order.
                            items = self.api.logs().list(
                                order="id asc",
                                filters=f+self._skip_old_events).execute()
                        break
                    except errors.ApiError as error:
                        pass
                    else:
                        tries_left = 0
                        break
                if tries_left == 0:
                    _logger.exception("PollClient thread could not contact API server.")
                    with self._closing_lock:
                        self._closing.set()
                    _thread.interrupt_main()
                    return
                for i in items["items"]:
                    self._skip_old_events = [["id", ">", str(i["id"])]]
                    with self._closing_lock:
                        if self._closing.is_set():
                            return
                        try:
                            self.on_event(i)
                        except Exception as e:
                            _logger.exception("Unexpected exception from event callback.")
                            _thread.interrupt_main()
                if items["items_available"] > len(items["items"]):
                    moreitems = True
            if not moreitems:
                self._closing.wait(self.poll_time)

    def run_forever(self):
        # Have to poll here, otherwise KeyboardInterrupt will never get processed.
        while not self._closing.is_set():
            self._closing.wait(1)

    def close(self, code=None, reason=None, timeout=0):
        """Close poll client and optionally wait for it to finish.

        If an :on_event: handler is running in a different thread,
        first wait (indefinitely) for it to return.

        After closing, wait up to :timeout: seconds for the thread to
        finish the poll request in progress (if any).

        :code: and :reason: are ignored. They are present for
        interface compatibility with EventClient.
        """

        with self._closing_lock:
            self._closing.set()
        try:
            self.join(timeout=timeout)
        except RuntimeError:
            # "join() raises a RuntimeError if an attempt is made to join the
            # current thread as that would cause a deadlock. It is also an
            # error to join() a thread before it has been started and attempts
            # to do so raises the same exception."
            pass

    def subscribe(self, f):
        self.on_event({'status': 200})
        self.filters.append(f)

    def unsubscribe(self, f):
        del self.filters[self.filters.index(f)]

Ancestors

  • threading.Thread

Methods

def close(self, code=None, reason=None, timeout=0)

Close poll client and optionally wait for it to finish.

If an :on_event: handler is running in a different thread, first wait (indefinitely) for it to return.

After closing, wait up to :timeout: seconds for the thread to finish the poll request in progress (if any).

:code: and :reason: are ignored. They are present for interface compatibility with EventClient.

def run(self)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

def run_forever(self)
def subscribe(self, f)
def unsubscribe(self, f)