Expand source code
class job_setup(object):
@staticmethod
def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
if if_sequence != current_task()['sequence']:
return
if not api_client:
api_client = api('v1')
job_input = current_job()['script_parameters']['input']
cr = CollectionReader(job_input, api_client=api_client)
cr.normalize()
for s in cr.all_streams():
for f in s.all_files():
if input_as_path:
task_input = os.path.join(job_input, s.name(), f.name())
else:
task_input = f.as_manifest()
new_task_attrs = {
'job_uuid': current_job()['uuid'],
'created_by_job_task_uuid': current_task()['uuid'],
'sequence': if_sequence + 1,
'parameters': {
'input':task_input
}
}
api_client.job_tasks().create(body=new_task_attrs).execute()
if and_end_task:
api_client.job_tasks().update(uuid=current_task()['uuid'],
body={'success':True}
).execute()
exit(0)
@staticmethod
def one_task_per_input_stream(if_sequence=0, and_end_task=True):
if if_sequence != current_task()['sequence']:
return
job_input = current_job()['script_parameters']['input']
cr = CollectionReader(job_input)
for s in cr.all_streams():
task_input = s.tokens()
new_task_attrs = {
'job_uuid': current_job()['uuid'],
'created_by_job_task_uuid': current_task()['uuid'],
'sequence': if_sequence + 1,
'parameters': {
'input':task_input
}
}
api('v1').job_tasks().create(body=new_task_attrs).execute()
if and_end_task:
api('v1').job_tasks().update(uuid=current_task()['uuid'],
body={'success':True}
).execute()
exit(0)