arvados

Arvados Python SDK

This module provides the entire Python SDK for Arvados. The most useful modules include:

Other submodules provide lower-level functionality.

  1# Copyright (C) The Arvados Authors. All rights reserved.
  2#
  3# SPDX-License-Identifier: Apache-2.0
  4"""Arvados Python SDK
  5
  6This module provides the entire Python SDK for Arvados. The most useful modules
  7include:
  8
  9* arvados.api - After you `import arvados`, you can call `arvados.api` as a
 10  shortcut to the client constructor function `arvados.api.api`.
 11
 12* arvados.collection - The `arvados.collection.Collection` class provides a
 13  high-level interface to read and write collections. It coordinates sending
 14  data to and from Keep, and synchronizing updates with the collection object.
 15
 16* arvados.util - Utility functions to use mostly in conjunction with the API
 17  client object and the results it returns.
 18
 19Other submodules provide lower-level functionality.
 20"""
 21
 22import logging as stdliblog
 23import os
 24import sys
 25import types
 26
 27from collections import UserDict
 28
 29from . import api, errors, util
 30from .api import api_from_config, http_cache
 31from .collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
 32from arvados.keep import *
 33from arvados.stream import *
 34from .arvfile import StreamFileReader
 35from .logging import log_format, log_date_format, log_handler
 36from .retry import RetryLoop
 37
 38# Previous versions of the PySDK used to say `from .api import api`.  This
 39# made it convenient to call the API client constructor, but difficult to
 40# access the rest of the `arvados.api` module. The magic below fixes that
 41# bug while retaining backwards compatibility: `arvados.api` is now the
 42# module and you can import it normally, but we make that module callable so
 43# all the existing code that says `arvados.api('v1', ...)` still works.
 44class _CallableAPIModule(api.__class__):
 45    __call__ = staticmethod(api.api)
 46api.__class__ = _CallableAPIModule
 47
 48# Override logging module pulled in via `from ... import *`
 49# so users can `import arvados.logging`.
 50logging = sys.modules['arvados.logging']
 51
 52# Set up Arvados logging based on the user's configuration.
 53# All Arvados code should log under the arvados hierarchy.
 54logger = stdliblog.getLogger('arvados')
 55logger.addHandler(log_handler)
 56logger.setLevel(stdliblog.DEBUG if config.get('ARVADOS_DEBUG')
 57                else stdliblog.WARNING)
 58
 59@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
 60def task_set_output(self, s, num_retries=5):
 61    for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
 62        try:
 63            return api('v1').job_tasks().update(
 64                uuid=self['uuid'],
 65                body={
 66                    'output':s,
 67                    'success':True,
 68                    'progress':1.0
 69                }).execute()
 70        except errors.ApiError as error:
 71            if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
 72                logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
 73            else:
 74                raise
 75
 76_current_task = None
 77@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
 78def current_task(num_retries=5):
 79    global _current_task
 80    if _current_task:
 81        return _current_task
 82
 83    for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
 84        try:
 85            task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
 86            task = UserDict(task)
 87            task.set_output = types.MethodType(task_set_output, task)
 88            task.tmpdir = os.environ['TASK_WORK']
 89            _current_task = task
 90            return task
 91        except errors.ApiError as error:
 92            if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
 93                logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
 94            else:
 95                raise
 96
 97_current_job = None
 98@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
 99def current_job(num_retries=5):
100    global _current_job
101    if _current_job:
102        return _current_job
103
104    for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
105        try:
106            job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
107            job = UserDict(job)
108            job.tmpdir = os.environ['JOB_WORK']
109            _current_job = job
110            return job
111        except errors.ApiError as error:
112            if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
113                logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
114            else:
115                raise
116
117@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
118def getjobparam(*args):
119    return current_job()['script_parameters'].get(*args)
120
121@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
122def get_job_param_mount(*args):
123    return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
124
125@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
126def get_task_param_mount(*args):
127    return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
128
129class JobTask(object):
130    @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
131    def __init__(self, parameters=dict(), runtime_constraints=dict()):
132        print("init jobtask %s %s" % (parameters, runtime_constraints))
133
134class job_setup(object):
135    @staticmethod
136    @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
137    def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
138        if if_sequence != current_task()['sequence']:
139            return
140
141        if not api_client:
142            api_client = api('v1')
143
144        job_input = current_job()['script_parameters']['input']
145        cr = CollectionReader(job_input, api_client=api_client)
146        cr.normalize()
147        for s in cr.all_streams():
148            for f in s.all_files():
149                if input_as_path:
150                    task_input = os.path.join(job_input, s.name(), f.name())
151                else:
152                    task_input = f.as_manifest()
153                new_task_attrs = {
154                    'job_uuid': current_job()['uuid'],
155                    'created_by_job_task_uuid': current_task()['uuid'],
156                    'sequence': if_sequence + 1,
157                    'parameters': {
158                        'input':task_input
159                        }
160                    }
161                api_client.job_tasks().create(body=new_task_attrs).execute()
162        if and_end_task:
163            api_client.job_tasks().update(uuid=current_task()['uuid'],
164                                       body={'success':True}
165                                       ).execute()
166            exit(0)
167
168    @staticmethod
169    @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
170    def one_task_per_input_stream(if_sequence=0, and_end_task=True):
171        if if_sequence != current_task()['sequence']:
172            return
173        job_input = current_job()['script_parameters']['input']
174        cr = CollectionReader(job_input)
175        for s in cr.all_streams():
176            task_input = s.tokens()
177            new_task_attrs = {
178                'job_uuid': current_job()['uuid'],
179                'created_by_job_task_uuid': current_task()['uuid'],
180                'sequence': if_sequence + 1,
181                'parameters': {
182                    'input':task_input
183                    }
184                }
185            api('v1').job_tasks().create(body=new_task_attrs).execute()
186        if and_end_task:
187            api('v1').job_tasks().update(uuid=current_task()['uuid'],
188                                       body={'success':True}
189                                       ).execute()
190            exit(0)
logger = <Logger arvados (WARNING)>
@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def task_set_output(self, s, num_retries=5):
60@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
61def task_set_output(self, s, num_retries=5):
62    for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
63        try:
64            return api('v1').job_tasks().update(
65                uuid=self['uuid'],
66                body={
67                    'output':s,
68                    'success':True,
69                    'progress':1.0
70                }).execute()
71        except errors.ApiError as error:
72            if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
73                logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
74            else:
75                raise
@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def current_task(num_retries=5):
78@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
79def current_task(num_retries=5):
80    global _current_task
81    if _current_task:
82        return _current_task
83
84    for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
85        try:
86            task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
87            task = UserDict(task)
88            task.set_output = types.MethodType(task_set_output, task)
89            task.tmpdir = os.environ['TASK_WORK']
90            _current_task = task
91            return task
92        except errors.ApiError as error:
93            if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
94                logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
95            else:
96                raise
@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def current_job(num_retries=5):
 99@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
100def current_job(num_retries=5):
101    global _current_job
102    if _current_job:
103        return _current_job
104
105    for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
106        try:
107            job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
108            job = UserDict(job)
109            job.tmpdir = os.environ['JOB_WORK']
110            _current_job = job
111            return job
112        except errors.ApiError as error:
113            if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
114                logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
115            else:
116                raise
@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def getjobparam(*args):
118@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
119def getjobparam(*args):
120    return current_job()['script_parameters'].get(*args)
@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def get_job_param_mount(*args):
122@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
123def get_job_param_mount(*args):
124    return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def get_task_param_mount(*args):
126@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
127def get_task_param_mount(*args):
128    return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
class JobTask:
130class JobTask(object):
131    @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
132    def __init__(self, parameters=dict(), runtime_constraints=dict()):
133        print("init jobtask %s %s" % (parameters, runtime_constraints))
@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
JobTask(parameters={}, runtime_constraints={})
131    @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
132    def __init__(self, parameters=dict(), runtime_constraints=dict()):
133        print("init jobtask %s %s" % (parameters, runtime_constraints))
class job_setup:
135class job_setup(object):
136    @staticmethod
137    @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
138    def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
139        if if_sequence != current_task()['sequence']:
140            return
141
142        if not api_client:
143            api_client = api('v1')
144
145        job_input = current_job()['script_parameters']['input']
146        cr = CollectionReader(job_input, api_client=api_client)
147        cr.normalize()
148        for s in cr.all_streams():
149            for f in s.all_files():
150                if input_as_path:
151                    task_input = os.path.join(job_input, s.name(), f.name())
152                else:
153                    task_input = f.as_manifest()
154                new_task_attrs = {
155                    'job_uuid': current_job()['uuid'],
156                    'created_by_job_task_uuid': current_task()['uuid'],
157                    'sequence': if_sequence + 1,
158                    'parameters': {
159                        'input':task_input
160                        }
161                    }
162                api_client.job_tasks().create(body=new_task_attrs).execute()
163        if and_end_task:
164            api_client.job_tasks().update(uuid=current_task()['uuid'],
165                                       body={'success':True}
166                                       ).execute()
167            exit(0)
168
169    @staticmethod
170    @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
171    def one_task_per_input_stream(if_sequence=0, and_end_task=True):
172        if if_sequence != current_task()['sequence']:
173            return
174        job_input = current_job()['script_parameters']['input']
175        cr = CollectionReader(job_input)
176        for s in cr.all_streams():
177            task_input = s.tokens()
178            new_task_attrs = {
179                'job_uuid': current_job()['uuid'],
180                'created_by_job_task_uuid': current_task()['uuid'],
181                'sequence': if_sequence + 1,
182                'parameters': {
183                    'input':task_input
184                    }
185                }
186            api('v1').job_tasks().create(body=new_task_attrs).execute()
187        if and_end_task:
188            api('v1').job_tasks().update(uuid=current_task()['uuid'],
189                                       body={'success':True}
190                                       ).execute()
191            exit(0)
@staticmethod
@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def one_task_per_input_file( if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
136    @staticmethod
137    @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
138    def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
139        if if_sequence != current_task()['sequence']:
140            return
141
142        if not api_client:
143            api_client = api('v1')
144
145        job_input = current_job()['script_parameters']['input']
146        cr = CollectionReader(job_input, api_client=api_client)
147        cr.normalize()
148        for s in cr.all_streams():
149            for f in s.all_files():
150                if input_as_path:
151                    task_input = os.path.join(job_input, s.name(), f.name())
152                else:
153                    task_input = f.as_manifest()
154                new_task_attrs = {
155                    'job_uuid': current_job()['uuid'],
156                    'created_by_job_task_uuid': current_task()['uuid'],
157                    'sequence': if_sequence + 1,
158                    'parameters': {
159                        'input':task_input
160                        }
161                    }
162                api_client.job_tasks().create(body=new_task_attrs).execute()
163        if and_end_task:
164            api_client.job_tasks().update(uuid=current_task()['uuid'],
165                                       body={'success':True}
166                                       ).execute()
167            exit(0)
@staticmethod
@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def one_task_per_input_stream(if_sequence=0, and_end_task=True):
169    @staticmethod
170    @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
171    def one_task_per_input_stream(if_sequence=0, and_end_task=True):
172        if if_sequence != current_task()['sequence']:
173            return
174        job_input = current_job()['script_parameters']['input']
175        cr = CollectionReader(job_input)
176        for s in cr.all_streams():
177            task_input = s.tokens()
178            new_task_attrs = {
179                'job_uuid': current_job()['uuid'],
180                'created_by_job_task_uuid': current_task()['uuid'],
181                'sequence': if_sequence + 1,
182                'parameters': {
183                    'input':task_input
184                    }
185                }
186            api('v1').job_tasks().create(body=new_task_attrs).execute()
187        if and_end_task:
188            api('v1').job_tasks().update(uuid=current_task()['uuid'],
189                                       body={'success':True}
190                                       ).execute()
191            exit(0)