arvados.commands.ws

  1# Copyright (C) The Arvados Authors. All rights reserved.
  2#
  3# SPDX-License-Identifier: Apache-2.0
  4
  5from __future__ import print_function
  6import sys
  7import logging
  8import argparse
  9import arvados
 10import json
 11from arvados.events import subscribe
 12from arvados._version import __version__
 13from . import _util as arv_cmd
 14import signal
 15
 16def main(arguments=None):
 17    logger = logging.getLogger('arvados.arv-ws')
 18
 19    parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
 20    parser.add_argument('--version', action='version',
 21                        version="%s %s" % (sys.argv[0], __version__),
 22                        help='Print version and exit.')
 23    parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
 24    parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
 25    parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
 26    parser.add_argument('-i', '--id', type=int, default=None, help="Start from given log id.")
 27
 28    group = parser.add_mutually_exclusive_group()
 29    group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
 30    group.add_argument('--no-poll', action='store_false', dest='poll_interval', help="Do not poll if websockets are not available, just fail")
 31
 32    group = parser.add_mutually_exclusive_group()
 33    group.add_argument('-p', '--pipeline', type=str, default="", help="Supply pipeline uuid, print log output from pipeline and its jobs")
 34    group.add_argument('-j', '--job', type=str, default="", help="Supply job uuid, print log output from jobs")
 35
 36    args = parser.parse_args(arguments)
 37
 38    global filters
 39    global known_component_jobs
 40    global ws
 41
 42    filters = []
 43    known_component_jobs = set()
 44    ws = None
 45
 46    def update_subscribed_components(components):
 47        global known_component_jobs
 48        global filters
 49        pipeline_jobs = set()
 50        for c in components:
 51            if "job" in components[c]:
 52                pipeline_jobs.add(components[c]["job"]["uuid"])
 53        if known_component_jobs != pipeline_jobs:
 54            new_filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
 55            ws.subscribe(new_filters)
 56            ws.unsubscribe(filters)
 57            filters = new_filters
 58            known_component_jobs = pipeline_jobs
 59
 60    api = arvados.api('v1', num_retries=args.retries)
 61
 62    if args.uuid:
 63        filters += [ ['object_uuid', '=', args.uuid] ]
 64
 65    if args.filters:
 66        filters += json.loads(args.filters)
 67
 68    if args.job:
 69        filters += [ ['object_uuid', '=', args.job] ]
 70
 71    if args.pipeline:
 72        filters += [ ['object_uuid', '=', args.pipeline] ]
 73
 74    if args.start_time:
 75        last_log_id = 1
 76        filters += [ ['created_at', '>=', args.start_time] ]
 77    else:
 78        last_log_id = None
 79
 80    if args.id:
 81        last_log_id = args.id-1
 82
 83    def on_message(ev):
 84        global filters
 85        global ws
 86
 87        logger.debug(ev)
 88        if 'event_type' in ev and (args.pipeline or args.job):
 89            if ev['event_type'] in ('stderr', 'stdout'):
 90                sys.stdout.write(ev["properties"]["text"])
 91            elif ev["event_type"] in ("create", "update"):
 92                if ev["object_kind"] == "arvados#pipelineInstance":
 93                    c = api.pipeline_instances().get(uuid=ev["object_uuid"]).execute()
 94                    update_subscribed_components(c["components"])
 95
 96                if ev["object_kind"] == "arvados#pipelineInstance" and args.pipeline:
 97                    if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Paused"):
 98                        ws.close()
 99
100                if ev["object_kind"] == "arvados#job" and args.job:
101                    if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
102                        ws.close()
103        elif 'status' in ev and ev['status'] == 200:
104            pass
105        else:
106            print(json.dumps(ev))
107
108    try:
109        ws = subscribe(arvados.api('v1'), filters, on_message, poll_fallback=args.poll_interval, last_log_id=last_log_id)
110        if ws:
111            if args.pipeline:
112                c = api.pipeline_instances().get(uuid=args.pipeline).execute()
113                update_subscribed_components(c["components"])
114                if c["state"] in ("Complete", "Failed", "Paused"):
115                    ws.close()
116            ws.run_forever()
117    except KeyboardInterrupt:
118        pass
119    except Exception as e:
120        logger.error(e)
121    finally:
122        if ws:
123            ws.close()
def main(arguments=None):
 17def main(arguments=None):
 18    logger = logging.getLogger('arvados.arv-ws')
 19
 20    parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
 21    parser.add_argument('--version', action='version',
 22                        version="%s %s" % (sys.argv[0], __version__),
 23                        help='Print version and exit.')
 24    parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
 25    parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
 26    parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
 27    parser.add_argument('-i', '--id', type=int, default=None, help="Start from given log id.")
 28
 29    group = parser.add_mutually_exclusive_group()
 30    group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
 31    group.add_argument('--no-poll', action='store_false', dest='poll_interval', help="Do not poll if websockets are not available, just fail")
 32
 33    group = parser.add_mutually_exclusive_group()
 34    group.add_argument('-p', '--pipeline', type=str, default="", help="Supply pipeline uuid, print log output from pipeline and its jobs")
 35    group.add_argument('-j', '--job', type=str, default="", help="Supply job uuid, print log output from jobs")
 36
 37    args = parser.parse_args(arguments)
 38
 39    global filters
 40    global known_component_jobs
 41    global ws
 42
 43    filters = []
 44    known_component_jobs = set()
 45    ws = None
 46
 47    def update_subscribed_components(components):
 48        global known_component_jobs
 49        global filters
 50        pipeline_jobs = set()
 51        for c in components:
 52            if "job" in components[c]:
 53                pipeline_jobs.add(components[c]["job"]["uuid"])
 54        if known_component_jobs != pipeline_jobs:
 55            new_filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
 56            ws.subscribe(new_filters)
 57            ws.unsubscribe(filters)
 58            filters = new_filters
 59            known_component_jobs = pipeline_jobs
 60
 61    api = arvados.api('v1', num_retries=args.retries)
 62
 63    if args.uuid:
 64        filters += [ ['object_uuid', '=', args.uuid] ]
 65
 66    if args.filters:
 67        filters += json.loads(args.filters)
 68
 69    if args.job:
 70        filters += [ ['object_uuid', '=', args.job] ]
 71
 72    if args.pipeline:
 73        filters += [ ['object_uuid', '=', args.pipeline] ]
 74
 75    if args.start_time:
 76        last_log_id = 1
 77        filters += [ ['created_at', '>=', args.start_time] ]
 78    else:
 79        last_log_id = None
 80
 81    if args.id:
 82        last_log_id = args.id-1
 83
 84    def on_message(ev):
 85        global filters
 86        global ws
 87
 88        logger.debug(ev)
 89        if 'event_type' in ev and (args.pipeline or args.job):
 90            if ev['event_type'] in ('stderr', 'stdout'):
 91                sys.stdout.write(ev["properties"]["text"])
 92            elif ev["event_type"] in ("create", "update"):
 93                if ev["object_kind"] == "arvados#pipelineInstance":
 94                    c = api.pipeline_instances().get(uuid=ev["object_uuid"]).execute()
 95                    update_subscribed_components(c["components"])
 96
 97                if ev["object_kind"] == "arvados#pipelineInstance" and args.pipeline:
 98                    if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Paused"):
 99                        ws.close()
100
101                if ev["object_kind"] == "arvados#job" and args.job:
102                    if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
103                        ws.close()
104        elif 'status' in ev and ev['status'] == 200:
105            pass
106        else:
107            print(json.dumps(ev))
108
109    try:
110        ws = subscribe(arvados.api('v1'), filters, on_message, poll_fallback=args.poll_interval, last_log_id=last_log_id)
111        if ws:
112            if args.pipeline:
113                c = api.pipeline_instances().get(uuid=args.pipeline).execute()
114                update_subscribed_components(c["components"])
115                if c["state"] in ("Complete", "Failed", "Paused"):
116                    ws.close()
117            ws.run_forever()
118    except KeyboardInterrupt:
119        pass
120    except Exception as e:
121        logger.error(e)
122    finally:
123        if ws:
124            ws.close()