arvados.events
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5from __future__ import absolute_import 6from future import standard_library 7standard_library.install_aliases() 8from builtins import str 9from builtins import object 10import arvados 11from . import config 12from . import errors 13from .retry import RetryLoop 14 15import logging 16import json 17import _thread 18import threading 19import time 20import os 21import re 22import ssl 23from ws4py.client.threadedclient import WebSocketClient 24 25_logger = logging.getLogger('arvados.events') 26 27 28class _EventClient(WebSocketClient): 29 def __init__(self, url, filters, on_event, last_log_id, on_closed): 30 ssl_options = {'ca_certs': arvados.util.ca_certs_path()} 31 if config.flag_is_true('ARVADOS_API_HOST_INSECURE'): 32 ssl_options['cert_reqs'] = ssl.CERT_NONE 33 else: 34 ssl_options['cert_reqs'] = ssl.CERT_REQUIRED 35 36 # Warning: If the host part of url resolves to both IPv6 and 37 # IPv4 addresses (common with "localhost"), only one of them 38 # will be attempted -- and it might not be the right one. See 39 # ws4py's WebSocketBaseClient.__init__. 40 super(_EventClient, self).__init__(url, ssl_options=ssl_options) 41 42 self.filters = filters 43 self.on_event = on_event 44 self.last_log_id = last_log_id 45 self._closing_lock = threading.RLock() 46 self._closing = False 47 self._closed = threading.Event() 48 self.on_closed = on_closed 49 50 def opened(self): 51 for f in self.filters: 52 self.subscribe(f, self.last_log_id) 53 54 def closed(self, code, reason=None): 55 self._closed.set() 56 self.on_closed() 57 58 def received_message(self, m): 59 with self._closing_lock: 60 if not self._closing: 61 self.on_event(json.loads(str(m))) 62 63 def close(self, code=1000, reason='', timeout=0): 64 """Close event client and optionally wait for it to finish. 65 66 :timeout: is the number of seconds to wait for ws4py to 67 indicate that the connection has closed. 68 """ 69 super(_EventClient, self).close(code, reason) 70 with self._closing_lock: 71 # make sure we don't process any more messages. 72 self._closing = True 73 # wait for ws4py to tell us the connection is closed. 74 self._closed.wait(timeout=timeout) 75 76 def subscribe(self, f, last_log_id=None): 77 m = {"method": "subscribe", "filters": f} 78 if last_log_id is not None: 79 m["last_log_id"] = last_log_id 80 self.send(json.dumps(m)) 81 82 def unsubscribe(self, f): 83 self.send(json.dumps({"method": "unsubscribe", "filters": f})) 84 85 86class EventClient(object): 87 def __init__(self, url, filters, on_event_cb, last_log_id): 88 self.url = url 89 if filters: 90 self.filters = [filters] 91 else: 92 self.filters = [[]] 93 self.on_event_cb = on_event_cb 94 self.last_log_id = last_log_id 95 self.is_closed = threading.Event() 96 self._setup_event_client() 97 98 def _setup_event_client(self): 99 self.ec = _EventClient(self.url, self.filters, self.on_event, 100 self.last_log_id, self.on_closed) 101 self.ec.daemon = True 102 try: 103 self.ec.connect() 104 except Exception: 105 self.ec.close_connection() 106 raise 107 108 def subscribe(self, f, last_log_id=None): 109 self.filters.append(f) 110 self.ec.subscribe(f, last_log_id) 111 112 def unsubscribe(self, f): 113 del self.filters[self.filters.index(f)] 114 self.ec.unsubscribe(f) 115 116 def close(self, code=1000, reason='', timeout=0): 117 self.is_closed.set() 118 self.ec.close(code, reason, timeout) 119 120 def on_event(self, m): 121 if m.get('id') != None: 122 self.last_log_id = m.get('id') 123 try: 124 self.on_event_cb(m) 125 except Exception as e: 126 _logger.exception("Unexpected exception from event callback.") 127 _thread.interrupt_main() 128 129 def on_closed(self): 130 if not self.is_closed.is_set(): 131 _logger.warning("Unexpected close. Reconnecting.") 132 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15): 133 try: 134 self._setup_event_client() 135 _logger.warning("Reconnect successful.") 136 break 137 except Exception as e: 138 _logger.warning("Error '%s' during websocket reconnect.", e) 139 if tries_left == 0: 140 _logger.exception("EventClient thread could not contact websocket server.") 141 self.is_closed.set() 142 _thread.interrupt_main() 143 return 144 145 def run_forever(self): 146 # Have to poll here to let KeyboardInterrupt get raised. 147 while not self.is_closed.wait(1): 148 pass 149 150 151class PollClient(threading.Thread): 152 def __init__(self, api, filters, on_event, poll_time, last_log_id): 153 super(PollClient, self).__init__() 154 self.api = api 155 if filters: 156 self.filters = [filters] 157 else: 158 self.filters = [[]] 159 self.on_event = on_event 160 self.poll_time = poll_time 161 self.daemon = True 162 self.last_log_id = last_log_id 163 self._closing = threading.Event() 164 self._closing_lock = threading.RLock() 165 166 if self.last_log_id != None: 167 # Caller supplied the last-seen event ID from a previous 168 # connection. 169 self._skip_old_events = [["id", ">", str(self.last_log_id)]] 170 else: 171 # We need to do a reverse-order query to find the most 172 # recent event ID (see "if not self._skip_old_events" 173 # in run()). 174 self._skip_old_events = False 175 176 def run(self): 177 self.on_event({'status': 200}) 178 179 while not self._closing.is_set(): 180 moreitems = False 181 for f in self.filters: 182 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time): 183 try: 184 if not self._skip_old_events: 185 # If the caller didn't provide a known 186 # recent ID, our first request will ask 187 # for the single most recent event from 188 # the last 2 hours (the time restriction 189 # avoids doing an expensive database 190 # query, and leaves a big enough margin to 191 # account for clock skew). If we do find a 192 # recent event, we remember its ID but 193 # then discard it (we are supposed to be 194 # returning new/current events, not old 195 # ones). 196 # 197 # Subsequent requests will get multiple 198 # events in chronological order, and 199 # filter on that same cutoff time, or 200 # (once we see our first matching event) 201 # the ID of the last-seen event. 202 # 203 # Note: self._skip_old_events must not be 204 # set until the threshold is decided. 205 # Otherwise, tests will be unreliable. 206 filter_by_time = [[ 207 "created_at", ">=", 208 time.strftime( 209 "%Y-%m-%dT%H:%M:%SZ", 210 time.gmtime(time.time()-7200))]] 211 items = self.api.logs().list( 212 order="id desc", 213 limit=1, 214 filters=f+filter_by_time).execute() 215 if items["items"]: 216 self._skip_old_events = [ 217 ["id", ">", str(items["items"][0]["id"])]] 218 items = { 219 "items": [], 220 "items_available": 0, 221 } 222 else: 223 # No recent events. We can keep using 224 # the same timestamp threshold until 225 # we receive our first new event. 226 self._skip_old_events = filter_by_time 227 else: 228 # In this case, either we know the most 229 # recent matching ID, or we know there 230 # were no matching events in the 2-hour 231 # window before subscribing. Either way we 232 # can safely ask for events in ascending 233 # order. 234 items = self.api.logs().list( 235 order="id asc", 236 filters=f+self._skip_old_events).execute() 237 break 238 except errors.ApiError as error: 239 pass 240 else: 241 tries_left = 0 242 break 243 if tries_left == 0: 244 _logger.exception("PollClient thread could not contact API server.") 245 with self._closing_lock: 246 self._closing.set() 247 _thread.interrupt_main() 248 return 249 for i in items["items"]: 250 self._skip_old_events = [["id", ">", str(i["id"])]] 251 with self._closing_lock: 252 if self._closing.is_set(): 253 return 254 try: 255 self.on_event(i) 256 except Exception as e: 257 _logger.exception("Unexpected exception from event callback.") 258 _thread.interrupt_main() 259 if items["items_available"] > len(items["items"]): 260 moreitems = True 261 if not moreitems: 262 self._closing.wait(self.poll_time) 263 264 def run_forever(self): 265 # Have to poll here, otherwise KeyboardInterrupt will never get processed. 266 while not self._closing.is_set(): 267 self._closing.wait(1) 268 269 def close(self, code=None, reason=None, timeout=0): 270 """Close poll client and optionally wait for it to finish. 271 272 If an :on_event: handler is running in a different thread, 273 first wait (indefinitely) for it to return. 274 275 After closing, wait up to :timeout: seconds for the thread to 276 finish the poll request in progress (if any). 277 278 :code: and :reason: are ignored. They are present for 279 interface compatibility with EventClient. 280 """ 281 282 with self._closing_lock: 283 self._closing.set() 284 try: 285 self.join(timeout=timeout) 286 except RuntimeError: 287 # "join() raises a RuntimeError if an attempt is made to join the 288 # current thread as that would cause a deadlock. It is also an 289 # error to join() a thread before it has been started and attempts 290 # to do so raises the same exception." 291 pass 292 293 def subscribe(self, f): 294 self.on_event({'status': 200}) 295 self.filters.append(f) 296 297 def unsubscribe(self, f): 298 del self.filters[self.filters.index(f)] 299 300 301def _subscribe_websocket(api, filters, on_event, last_log_id=None): 302 endpoint = api._rootDesc.get('websocketUrl', None) 303 if not endpoint: 304 raise errors.FeatureNotEnabledError( 305 "Server does not advertise a websocket endpoint") 306 uri_with_token = "{}?api_token={}".format(endpoint, api.api_token) 307 try: 308 client = EventClient(uri_with_token, filters, on_event, last_log_id) 309 except Exception: 310 _logger.warning("Failed to connect to websockets on %s" % endpoint) 311 raise 312 else: 313 return client 314 315 316def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None): 317 """ 318 :api: 319 a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe(). 320 :filters: 321 Initial subscription filters. 322 :on_event: 323 The callback when a message is received. 324 :poll_fallback: 325 If websockets are not available, fall back to polling every N seconds. If poll_fallback=False, this will return None if websockets are not available. 326 :last_log_id: 327 Log rows that are newer than the log id 328 """ 329 330 if not poll_fallback: 331 return _subscribe_websocket(api, filters, on_event, last_log_id) 332 333 try: 334 if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'): 335 return _subscribe_websocket(api, filters, on_event, last_log_id) 336 else: 337 _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true") 338 except Exception as e: 339 _logger.warning("Falling back to polling after websocket error: %s" % e) 340 p = PollClient(api, filters, on_event, poll_fallback, last_log_id) 341 p.start() 342 return p
87class EventClient(object): 88 def __init__(self, url, filters, on_event_cb, last_log_id): 89 self.url = url 90 if filters: 91 self.filters = [filters] 92 else: 93 self.filters = [[]] 94 self.on_event_cb = on_event_cb 95 self.last_log_id = last_log_id 96 self.is_closed = threading.Event() 97 self._setup_event_client() 98 99 def _setup_event_client(self): 100 self.ec = _EventClient(self.url, self.filters, self.on_event, 101 self.last_log_id, self.on_closed) 102 self.ec.daemon = True 103 try: 104 self.ec.connect() 105 except Exception: 106 self.ec.close_connection() 107 raise 108 109 def subscribe(self, f, last_log_id=None): 110 self.filters.append(f) 111 self.ec.subscribe(f, last_log_id) 112 113 def unsubscribe(self, f): 114 del self.filters[self.filters.index(f)] 115 self.ec.unsubscribe(f) 116 117 def close(self, code=1000, reason='', timeout=0): 118 self.is_closed.set() 119 self.ec.close(code, reason, timeout) 120 121 def on_event(self, m): 122 if m.get('id') != None: 123 self.last_log_id = m.get('id') 124 try: 125 self.on_event_cb(m) 126 except Exception as e: 127 _logger.exception("Unexpected exception from event callback.") 128 _thread.interrupt_main() 129 130 def on_closed(self): 131 if not self.is_closed.is_set(): 132 _logger.warning("Unexpected close. Reconnecting.") 133 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15): 134 try: 135 self._setup_event_client() 136 _logger.warning("Reconnect successful.") 137 break 138 except Exception as e: 139 _logger.warning("Error '%s' during websocket reconnect.", e) 140 if tries_left == 0: 141 _logger.exception("EventClient thread could not contact websocket server.") 142 self.is_closed.set() 143 _thread.interrupt_main() 144 return 145 146 def run_forever(self): 147 # Have to poll here to let KeyboardInterrupt get raised. 148 while not self.is_closed.wait(1): 149 pass
130 def on_closed(self): 131 if not self.is_closed.is_set(): 132 _logger.warning("Unexpected close. Reconnecting.") 133 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=15): 134 try: 135 self._setup_event_client() 136 _logger.warning("Reconnect successful.") 137 break 138 except Exception as e: 139 _logger.warning("Error '%s' during websocket reconnect.", e) 140 if tries_left == 0: 141 _logger.exception("EventClient thread could not contact websocket server.") 142 self.is_closed.set() 143 _thread.interrupt_main() 144 return
152class PollClient(threading.Thread): 153 def __init__(self, api, filters, on_event, poll_time, last_log_id): 154 super(PollClient, self).__init__() 155 self.api = api 156 if filters: 157 self.filters = [filters] 158 else: 159 self.filters = [[]] 160 self.on_event = on_event 161 self.poll_time = poll_time 162 self.daemon = True 163 self.last_log_id = last_log_id 164 self._closing = threading.Event() 165 self._closing_lock = threading.RLock() 166 167 if self.last_log_id != None: 168 # Caller supplied the last-seen event ID from a previous 169 # connection. 170 self._skip_old_events = [["id", ">", str(self.last_log_id)]] 171 else: 172 # We need to do a reverse-order query to find the most 173 # recent event ID (see "if not self._skip_old_events" 174 # in run()). 175 self._skip_old_events = False 176 177 def run(self): 178 self.on_event({'status': 200}) 179 180 while not self._closing.is_set(): 181 moreitems = False 182 for f in self.filters: 183 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time): 184 try: 185 if not self._skip_old_events: 186 # If the caller didn't provide a known 187 # recent ID, our first request will ask 188 # for the single most recent event from 189 # the last 2 hours (the time restriction 190 # avoids doing an expensive database 191 # query, and leaves a big enough margin to 192 # account for clock skew). If we do find a 193 # recent event, we remember its ID but 194 # then discard it (we are supposed to be 195 # returning new/current events, not old 196 # ones). 197 # 198 # Subsequent requests will get multiple 199 # events in chronological order, and 200 # filter on that same cutoff time, or 201 # (once we see our first matching event) 202 # the ID of the last-seen event. 203 # 204 # Note: self._skip_old_events must not be 205 # set until the threshold is decided. 206 # Otherwise, tests will be unreliable. 207 filter_by_time = [[ 208 "created_at", ">=", 209 time.strftime( 210 "%Y-%m-%dT%H:%M:%SZ", 211 time.gmtime(time.time()-7200))]] 212 items = self.api.logs().list( 213 order="id desc", 214 limit=1, 215 filters=f+filter_by_time).execute() 216 if items["items"]: 217 self._skip_old_events = [ 218 ["id", ">", str(items["items"][0]["id"])]] 219 items = { 220 "items": [], 221 "items_available": 0, 222 } 223 else: 224 # No recent events. We can keep using 225 # the same timestamp threshold until 226 # we receive our first new event. 227 self._skip_old_events = filter_by_time 228 else: 229 # In this case, either we know the most 230 # recent matching ID, or we know there 231 # were no matching events in the 2-hour 232 # window before subscribing. Either way we 233 # can safely ask for events in ascending 234 # order. 235 items = self.api.logs().list( 236 order="id asc", 237 filters=f+self._skip_old_events).execute() 238 break 239 except errors.ApiError as error: 240 pass 241 else: 242 tries_left = 0 243 break 244 if tries_left == 0: 245 _logger.exception("PollClient thread could not contact API server.") 246 with self._closing_lock: 247 self._closing.set() 248 _thread.interrupt_main() 249 return 250 for i in items["items"]: 251 self._skip_old_events = [["id", ">", str(i["id"])]] 252 with self._closing_lock: 253 if self._closing.is_set(): 254 return 255 try: 256 self.on_event(i) 257 except Exception as e: 258 _logger.exception("Unexpected exception from event callback.") 259 _thread.interrupt_main() 260 if items["items_available"] > len(items["items"]): 261 moreitems = True 262 if not moreitems: 263 self._closing.wait(self.poll_time) 264 265 def run_forever(self): 266 # Have to poll here, otherwise KeyboardInterrupt will never get processed. 267 while not self._closing.is_set(): 268 self._closing.wait(1) 269 270 def close(self, code=None, reason=None, timeout=0): 271 """Close poll client and optionally wait for it to finish. 272 273 If an :on_event: handler is running in a different thread, 274 first wait (indefinitely) for it to return. 275 276 After closing, wait up to :timeout: seconds for the thread to 277 finish the poll request in progress (if any). 278 279 :code: and :reason: are ignored. They are present for 280 interface compatibility with EventClient. 281 """ 282 283 with self._closing_lock: 284 self._closing.set() 285 try: 286 self.join(timeout=timeout) 287 except RuntimeError: 288 # "join() raises a RuntimeError if an attempt is made to join the 289 # current thread as that would cause a deadlock. It is also an 290 # error to join() a thread before it has been started and attempts 291 # to do so raises the same exception." 292 pass 293 294 def subscribe(self, f): 295 self.on_event({'status': 200}) 296 self.filters.append(f) 297 298 def unsubscribe(self, f): 299 del self.filters[self.filters.index(f)]
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
153 def __init__(self, api, filters, on_event, poll_time, last_log_id): 154 super(PollClient, self).__init__() 155 self.api = api 156 if filters: 157 self.filters = [filters] 158 else: 159 self.filters = [[]] 160 self.on_event = on_event 161 self.poll_time = poll_time 162 self.daemon = True 163 self.last_log_id = last_log_id 164 self._closing = threading.Event() 165 self._closing_lock = threading.RLock() 166 167 if self.last_log_id != None: 168 # Caller supplied the last-seen event ID from a previous 169 # connection. 170 self._skip_old_events = [["id", ">", str(self.last_log_id)]] 171 else: 172 # We need to do a reverse-order query to find the most 173 # recent event ID (see "if not self._skip_old_events" 174 # in run()). 175 self._skip_old_events = False
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
1108 @property 1109 def daemon(self): 1110 """A boolean value indicating whether this thread is a daemon thread. 1111 1112 This must be set before start() is called, otherwise RuntimeError is 1113 raised. Its initial value is inherited from the creating thread; the 1114 main thread is not a daemon thread and therefore all threads created in 1115 the main thread default to daemon = False. 1116 1117 The entire Python program exits when only daemon threads are left. 1118 1119 """ 1120 assert self._initialized, "Thread.__init__() not called" 1121 return self._daemonic
A boolean value indicating whether this thread is a daemon thread.
This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.
The entire Python program exits when only daemon threads are left.
177 def run(self): 178 self.on_event({'status': 200}) 179 180 while not self._closing.is_set(): 181 moreitems = False 182 for f in self.filters: 183 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time): 184 try: 185 if not self._skip_old_events: 186 # If the caller didn't provide a known 187 # recent ID, our first request will ask 188 # for the single most recent event from 189 # the last 2 hours (the time restriction 190 # avoids doing an expensive database 191 # query, and leaves a big enough margin to 192 # account for clock skew). If we do find a 193 # recent event, we remember its ID but 194 # then discard it (we are supposed to be 195 # returning new/current events, not old 196 # ones). 197 # 198 # Subsequent requests will get multiple 199 # events in chronological order, and 200 # filter on that same cutoff time, or 201 # (once we see our first matching event) 202 # the ID of the last-seen event. 203 # 204 # Note: self._skip_old_events must not be 205 # set until the threshold is decided. 206 # Otherwise, tests will be unreliable. 207 filter_by_time = [[ 208 "created_at", ">=", 209 time.strftime( 210 "%Y-%m-%dT%H:%M:%SZ", 211 time.gmtime(time.time()-7200))]] 212 items = self.api.logs().list( 213 order="id desc", 214 limit=1, 215 filters=f+filter_by_time).execute() 216 if items["items"]: 217 self._skip_old_events = [ 218 ["id", ">", str(items["items"][0]["id"])]] 219 items = { 220 "items": [], 221 "items_available": 0, 222 } 223 else: 224 # No recent events. We can keep using 225 # the same timestamp threshold until 226 # we receive our first new event. 227 self._skip_old_events = filter_by_time 228 else: 229 # In this case, either we know the most 230 # recent matching ID, or we know there 231 # were no matching events in the 2-hour 232 # window before subscribing. Either way we 233 # can safely ask for events in ascending 234 # order. 235 items = self.api.logs().list( 236 order="id asc", 237 filters=f+self._skip_old_events).execute() 238 break 239 except errors.ApiError as error: 240 pass 241 else: 242 tries_left = 0 243 break 244 if tries_left == 0: 245 _logger.exception("PollClient thread could not contact API server.") 246 with self._closing_lock: 247 self._closing.set() 248 _thread.interrupt_main() 249 return 250 for i in items["items"]: 251 self._skip_old_events = [["id", ">", str(i["id"])]] 252 with self._closing_lock: 253 if self._closing.is_set(): 254 return 255 try: 256 self.on_event(i) 257 except Exception as e: 258 _logger.exception("Unexpected exception from event callback.") 259 _thread.interrupt_main() 260 if items["items_available"] > len(items["items"]): 261 moreitems = True 262 if not moreitems: 263 self._closing.wait(self.poll_time)
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
270 def close(self, code=None, reason=None, timeout=0): 271 """Close poll client and optionally wait for it to finish. 272 273 If an :on_event: handler is running in a different thread, 274 first wait (indefinitely) for it to return. 275 276 After closing, wait up to :timeout: seconds for the thread to 277 finish the poll request in progress (if any). 278 279 :code: and :reason: are ignored. They are present for 280 interface compatibility with EventClient. 281 """ 282 283 with self._closing_lock: 284 self._closing.set() 285 try: 286 self.join(timeout=timeout) 287 except RuntimeError: 288 # "join() raises a RuntimeError if an attempt is made to join the 289 # current thread as that would cause a deadlock. It is also an 290 # error to join() a thread before it has been started and attempts 291 # to do so raises the same exception." 292 pass
Close poll client and optionally wait for it to finish.
If an :on_event: handler is running in a different thread, first wait (indefinitely) for it to return.
After closing, wait up to :timeout: seconds for the thread to finish the poll request in progress (if any).
:code: and :reason: are ignored. They are present for interface compatibility with EventClient.
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- isDaemon
- setDaemon
- getName
- setName
- native_id
317def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None): 318 """ 319 :api: 320 a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe(). 321 :filters: 322 Initial subscription filters. 323 :on_event: 324 The callback when a message is received. 325 :poll_fallback: 326 If websockets are not available, fall back to polling every N seconds. If poll_fallback=False, this will return None if websockets are not available. 327 :last_log_id: 328 Log rows that are newer than the log id 329 """ 330 331 if not poll_fallback: 332 return _subscribe_websocket(api, filters, on_event, last_log_id) 333 334 try: 335 if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'): 336 return _subscribe_websocket(api, filters, on_event, last_log_id) 337 else: 338 _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true") 339 except Exception as e: 340 _logger.warning("Falling back to polling after websocket error: %s" % e) 341 p = PollClient(api, filters, on_event, poll_fallback, last_log_id) 342 p.start() 343 return p
:api: a client object retrieved from arvados.api(). The caller should not use this client object for anything else after calling subscribe(). :filters: Initial subscription filters. :on_event: The callback when a message is received. :poll_fallback: If websockets are not available, fall back to polling every N seconds. If poll_fallback=False, this will return None if websockets are not available. :last_log_id: Log rows that are newer than the log id