Package arvados
[hide private]
[frames] | no frames]

Source Code for Package arvados

  1  # Copyright (C) The Arvados Authors. All rights reserved. 
  2  # 
  3  # SPDX-License-Identifier: Apache-2.0 
  4   
  5  from __future__ import print_function 
  6  from __future__ import absolute_import 
  7  from future import standard_library 
  8  standard_library.install_aliases() 
  9  from builtins import object 
 10  import bz2 
 11  import fcntl 
 12  import hashlib 
 13  import http.client 
 14  import httplib2 
 15  import json 
 16  import logging 
 17  import os 
 18  import pprint 
 19  import re 
 20  import string 
 21  import sys 
 22  import time 
 23  import types 
 24  import zlib 
 25   
 26  if sys.version_info >= (3, 0): 
 27      from collections import UserDict 
 28  else: 
 29      from UserDict import UserDict 
 30   
 31  from .api import api, api_from_config, http_cache 
 32  from .collection import CollectionReader, CollectionWriter, ResumableCollectionWriter 
 33  from arvados.keep import * 
 34  from arvados.stream import * 
 35  from .arvfile import StreamFileReader 
 36  from .retry import RetryLoop 
 37  import arvados.errors as errors 
 38  import arvados.util as util 
 39   
 40  # Set up Arvados logging based on the user's configuration. 
 41  # All Arvados code should log under the arvados hierarchy. 
 42  log_format = '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s' 
 43  log_date_format = '%Y-%m-%d %H:%M:%S' 
 44  log_handler = logging.StreamHandler() 
 45  log_handler.setFormatter(logging.Formatter(log_format, log_date_format)) 
 46  logger = logging.getLogger('arvados') 
 47  logger.addHandler(log_handler) 
 48  logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG') 
 49                  else logging.WARNING) 
50 51 -def task_set_output(self, s, num_retries=5):
52 for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0): 53 try: 54 return api('v1').job_tasks().update( 55 uuid=self['uuid'], 56 body={ 57 'output':s, 58 'success':True, 59 'progress':1.0 60 }).execute() 61 except errors.ApiError as error: 62 if retry.check_http_response_success(error.resp.status) is None and tries_left > 0: 63 logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left)) 64 else: 65 raise
66 67 _current_task = None
68 -def current_task(num_retries=5):
69 global _current_task 70 if _current_task: 71 return _current_task 72 73 for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2): 74 try: 75 task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute() 76 task = UserDict(task) 77 task.set_output = types.MethodType(task_set_output, task) 78 task.tmpdir = os.environ['TASK_WORK'] 79 _current_task = task 80 return task 81 except errors.ApiError as error: 82 if retry.check_http_response_success(error.resp.status) is None and tries_left > 0: 83 logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left)) 84 else: 85 raise
86 87 _current_job = None
88 -def current_job(num_retries=5):
89 global _current_job 90 if _current_job: 91 return _current_job 92 93 for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2): 94 try: 95 job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute() 96 job = UserDict(job) 97 job.tmpdir = os.environ['JOB_WORK'] 98 _current_job = job 99 return job 100 except errors.ApiError as error: 101 if retry.check_http_response_success(error.resp.status) is None and tries_left > 0: 102 logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left)) 103 else: 104 raise
105
106 -def getjobparam(*args):
107 return current_job()['script_parameters'].get(*args)
108
109 -def get_job_param_mount(*args):
110 return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
111
112 -def get_task_param_mount(*args):
113 return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
114
115 -class JobTask(object):
116 - def __init__(self, parameters=dict(), runtime_constraints=dict()):
117 print("init jobtask %s %s" % (parameters, runtime_constraints))
118
119 -class job_setup(object):
120 @staticmethod
121 - def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
122 if if_sequence != current_task()['sequence']: 123 return 124 125 if not api_client: 126 api_client = api('v1') 127 128 job_input = current_job()['script_parameters']['input'] 129 cr = CollectionReader(job_input, api_client=api_client) 130 cr.normalize() 131 for s in cr.all_streams(): 132 for f in s.all_files(): 133 if input_as_path: 134 task_input = os.path.join(job_input, s.name(), f.name()) 135 else: 136 task_input = f.as_manifest() 137 new_task_attrs = { 138 'job_uuid': current_job()['uuid'], 139 'created_by_job_task_uuid': current_task()['uuid'], 140 'sequence': if_sequence + 1, 141 'parameters': { 142 'input':task_input 143 } 144 } 145 api_client.job_tasks().create(body=new_task_attrs).execute() 146 if and_end_task: 147 api_client.job_tasks().update(uuid=current_task()['uuid'], 148 body={'success':True} 149 ).execute() 150 exit(0)
151 152 @staticmethod
153 - def one_task_per_input_stream(if_sequence=0, and_end_task=True):
154 if if_sequence != current_task()['sequence']: 155 return 156 job_input = current_job()['script_parameters']['input'] 157 cr = CollectionReader(job_input) 158 for s in cr.all_streams(): 159 task_input = s.tokens() 160 new_task_attrs = { 161 'job_uuid': current_job()['uuid'], 162 'created_by_job_task_uuid': current_task()['uuid'], 163 'sequence': if_sequence + 1, 164 'parameters': { 165 'input':task_input 166 } 167 } 168 api('v1').job_tasks().create(body=new_task_attrs).execute() 169 if and_end_task: 170 api('v1').job_tasks().update(uuid=current_task()['uuid'], 171 body={'success':True} 172 ).execute() 173 exit(0)
174