arvados.commands.ws
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5from __future__ import print_function 6import sys 7import logging 8import argparse 9import arvados 10import json 11from arvados.events import subscribe 12from arvados._version import __version__ 13from . import _util as arv_cmd 14import signal 15 16def main(arguments=None): 17 logger = logging.getLogger('arvados.arv-ws') 18 19 parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt]) 20 parser.add_argument('--version', action='version', 21 version="%s %s" % (sys.argv[0], __version__), 22 help='Print version and exit.') 23 parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid") 24 parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)") 25 parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss") 26 parser.add_argument('-i', '--id', type=int, default=None, help="Start from given log id.") 27 28 group = parser.add_mutually_exclusive_group() 29 group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds") 30 group.add_argument('--no-poll', action='store_false', dest='poll_interval', help="Do not poll if websockets are not available, just fail") 31 32 group = parser.add_mutually_exclusive_group() 33 group.add_argument('-p', '--pipeline', type=str, default="", help="Supply pipeline uuid, print log output from pipeline and its jobs") 34 group.add_argument('-j', '--job', type=str, default="", help="Supply job uuid, print log output from jobs") 35 36 args = parser.parse_args(arguments) 37 38 global filters 39 global known_component_jobs 40 global ws 41 42 filters = [] 43 known_component_jobs = set() 44 ws = None 45 46 def update_subscribed_components(components): 47 global known_component_jobs 48 global filters 49 pipeline_jobs = set() 50 for c in components: 51 if "job" in components[c]: 52 pipeline_jobs.add(components[c]["job"]["uuid"]) 53 if known_component_jobs != pipeline_jobs: 54 new_filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]] 55 ws.subscribe(new_filters) 56 ws.unsubscribe(filters) 57 filters = new_filters 58 known_component_jobs = pipeline_jobs 59 60 api = arvados.api('v1', num_retries=args.retries) 61 62 if args.uuid: 63 filters += [ ['object_uuid', '=', args.uuid] ] 64 65 if args.filters: 66 filters += json.loads(args.filters) 67 68 if args.job: 69 filters += [ ['object_uuid', '=', args.job] ] 70 71 if args.pipeline: 72 filters += [ ['object_uuid', '=', args.pipeline] ] 73 74 if args.start_time: 75 last_log_id = 1 76 filters += [ ['created_at', '>=', args.start_time] ] 77 else: 78 last_log_id = None 79 80 if args.id: 81 last_log_id = args.id-1 82 83 def on_message(ev): 84 global filters 85 global ws 86 87 logger.debug(ev) 88 if 'event_type' in ev and (args.pipeline or args.job): 89 if ev['event_type'] in ('stderr', 'stdout'): 90 sys.stdout.write(ev["properties"]["text"]) 91 elif ev["event_type"] in ("create", "update"): 92 if ev["object_kind"] == "arvados#pipelineInstance": 93 c = api.pipeline_instances().get(uuid=ev["object_uuid"]).execute() 94 update_subscribed_components(c["components"]) 95 96 if ev["object_kind"] == "arvados#pipelineInstance" and args.pipeline: 97 if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Paused"): 98 ws.close() 99 100 if ev["object_kind"] == "arvados#job" and args.job: 101 if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"): 102 ws.close() 103 elif 'status' in ev and ev['status'] == 200: 104 pass 105 else: 106 print(json.dumps(ev)) 107 108 try: 109 ws = subscribe(arvados.api('v1'), filters, on_message, poll_fallback=args.poll_interval, last_log_id=last_log_id) 110 if ws: 111 if args.pipeline: 112 c = api.pipeline_instances().get(uuid=args.pipeline).execute() 113 update_subscribed_components(c["components"]) 114 if c["state"] in ("Complete", "Failed", "Paused"): 115 ws.close() 116 ws.run_forever() 117 except KeyboardInterrupt: 118 pass 119 except Exception as e: 120 logger.error(e) 121 finally: 122 if ws: 123 ws.close()
def
main(arguments=None):
17def main(arguments=None): 18 logger = logging.getLogger('arvados.arv-ws') 19 20 parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt]) 21 parser.add_argument('--version', action='version', 22 version="%s %s" % (sys.argv[0], __version__), 23 help='Print version and exit.') 24 parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid") 25 parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)") 26 parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss") 27 parser.add_argument('-i', '--id', type=int, default=None, help="Start from given log id.") 28 29 group = parser.add_mutually_exclusive_group() 30 group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds") 31 group.add_argument('--no-poll', action='store_false', dest='poll_interval', help="Do not poll if websockets are not available, just fail") 32 33 group = parser.add_mutually_exclusive_group() 34 group.add_argument('-p', '--pipeline', type=str, default="", help="Supply pipeline uuid, print log output from pipeline and its jobs") 35 group.add_argument('-j', '--job', type=str, default="", help="Supply job uuid, print log output from jobs") 36 37 args = parser.parse_args(arguments) 38 39 global filters 40 global known_component_jobs 41 global ws 42 43 filters = [] 44 known_component_jobs = set() 45 ws = None 46 47 def update_subscribed_components(components): 48 global known_component_jobs 49 global filters 50 pipeline_jobs = set() 51 for c in components: 52 if "job" in components[c]: 53 pipeline_jobs.add(components[c]["job"]["uuid"]) 54 if known_component_jobs != pipeline_jobs: 55 new_filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]] 56 ws.subscribe(new_filters) 57 ws.unsubscribe(filters) 58 filters = new_filters 59 known_component_jobs = pipeline_jobs 60 61 api = arvados.api('v1', num_retries=args.retries) 62 63 if args.uuid: 64 filters += [ ['object_uuid', '=', args.uuid] ] 65 66 if args.filters: 67 filters += json.loads(args.filters) 68 69 if args.job: 70 filters += [ ['object_uuid', '=', args.job] ] 71 72 if args.pipeline: 73 filters += [ ['object_uuid', '=', args.pipeline] ] 74 75 if args.start_time: 76 last_log_id = 1 77 filters += [ ['created_at', '>=', args.start_time] ] 78 else: 79 last_log_id = None 80 81 if args.id: 82 last_log_id = args.id-1 83 84 def on_message(ev): 85 global filters 86 global ws 87 88 logger.debug(ev) 89 if 'event_type' in ev and (args.pipeline or args.job): 90 if ev['event_type'] in ('stderr', 'stdout'): 91 sys.stdout.write(ev["properties"]["text"]) 92 elif ev["event_type"] in ("create", "update"): 93 if ev["object_kind"] == "arvados#pipelineInstance": 94 c = api.pipeline_instances().get(uuid=ev["object_uuid"]).execute() 95 update_subscribed_components(c["components"]) 96 97 if ev["object_kind"] == "arvados#pipelineInstance" and args.pipeline: 98 if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Paused"): 99 ws.close() 100 101 if ev["object_kind"] == "arvados#job" and args.job: 102 if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"): 103 ws.close() 104 elif 'status' in ev and ev['status'] == 200: 105 pass 106 else: 107 print(json.dumps(ev)) 108 109 try: 110 ws = subscribe(arvados.api('v1'), filters, on_message, poll_fallback=args.poll_interval, last_log_id=last_log_id) 111 if ws: 112 if args.pipeline: 113 c = api.pipeline_instances().get(uuid=args.pipeline).execute() 114 update_subscribed_components(c["components"]) 115 if c["state"] in ("Complete", "Failed", "Paused"): 116 ws.close() 117 ws.run_forever() 118 except KeyboardInterrupt: 119 pass 120 except Exception as e: 121 logger.error(e) 122 finally: 123 if ws: 124 ws.close()