Concurrent Crunch tasks

Note:

Arvados pipeline templates are deprecated. The recommended way to develop new workflows for Arvados is using the Common Workflow Language.

In the previous tutorials, we used arvados.job_setup.one_task_per_input_file() to automatically create concurrent jobs by creating a separate task per file. For some types of jobs, you may need to split the work up differently, for example creating tasks to process different segments of a single large file. This tutorial will demonstrate how to create Crunch tasks directly.

Start by entering the crunch_scripts directory of your Git repository:

~$ cd $USER/crunch_scripts

Next, using nano or your favorite Unix text editor, create a new file called concurrent-hash.py in the crunch_scripts directory.

~/$USER/crunch_scripts$ nano concurrent-hash.py

Add the following code to compute the MD5 hash of each file in a collection:

#!/usr/bin/env python
#<Liquid::Comment:0x000055f36e200438>

import hashlib
import os
import arvados

# Jobs consist of one or more tasks.  A task is a single invocation of
# a crunch script.

# Get the current task
this_task = arvados.current_task()

# Tasks have a sequence number for ordering.  All tasks
# with the current sequence number must finish successfully
# before tasks in the next sequence are started.
# The first task has sequence number 0
if this_task['sequence'] == 0:
    # Get the "input" field from "script_parameters" on the task object
    job_input = arvados.current_job()['script_parameters']['input']

    # Create a collection reader to read the input
    cr = arvados.CollectionReader(job_input)

    # Loop over each stream in the collection (a stream is a subset of
    # files that logically represents a directory)
    for s in cr.all_streams():

        # Loop over each file in the stream
        for f in s.all_files():

            # Synthesize a manifest for just this file
            task_input = f.as_manifest()

            # Set attributes for a new task:
            # 'job_uuid' the job that this task is part of
            # 'created_by_job_task_uuid' this task that is creating the new task
            # 'sequence' the sequence number of the new task
            # 'parameters' the parameters to be passed to the new task
            new_task_attrs = {
                'job_uuid': arvados.current_job()['uuid'],
                'created_by_job_task_uuid': arvados.current_task()['uuid'],
                'sequence': 1,
                'parameters': {
                    'input':task_input
                    }
                }

            # Ask the Arvados API server to create a new task, running the same
            # script as the parent task specified in 'created_by_job_task_uuid'
            arvados.api().job_tasks().create(body=new_task_attrs).execute()

    # Now tell the Arvados API server that this task executed successfully,
    # even though it doesn't have any output.
    this_task.set_output(None)
else:
    # The task sequence was not 0, so it must be a parallel worker task
    # created by the first task

    # Instead of getting "input" from the "script_parameters" field of
    # the job object, we get it from the "parameters" field of the
    # task object
    this_task_input = this_task['parameters']['input']

    collection = arvados.CollectionReader(this_task_input)

    # There should only be one file in the collection, so get the
    # first one from the all files iterator.
    input_file = next(collection.all_files())
    output_path = os.path.normpath(os.path.join(input_file.stream_name(),
                                                input_file.name))

    # Everything after this is the same as the first tutorial.
    digestor = hashlib.new('md5')
    for buf in input_file.readall():
        digestor.update(buf)

    out = arvados.CollectionWriter()
    with out.open('md5sum.txt') as out_file:
        out_file.write("{} {}\n".format(digestor.hexdigest(), output_path))

    this_task.set_output(out.finish())

# Done!

Make the file executable:

~/$USER/crunch_scripts$ chmod +x concurrent-hash.py

Add the file to the Git staging area, commit, and push:

~/$USER/crunch_scripts$ git add concurrent-hash.py
~/$USER/crunch_scripts$ git commit -m"concurrent hash"
~/$USER/crunch_scripts$ git push origin master

You should now be able to run your new script using Crunch, with “script” referring to our new “concurrent-hash.py” script. We will use a different input from our previous examples. We will use 887cd41e9c613463eab2f0d885c6dd96+83 which consists of three files, “alice.txt”, “bob.txt” and “carol.txt” (the example collection used previously in fetching data from Arvados using Keep).

~/$USER/crunch_scripts$ cat >~/the_job <<EOF
{
 "script": "concurrent-hash.py",
 "repository": "$USER/$USER",
 "script_version": "master",
 "script_parameters":
 {
  "input": "887cd41e9c613463eab2f0d885c6dd96+83"
 }
}
EOF
~/$USER/crunch_scripts$ arv job create --job "$(cat ~/the_job)"
{
 ...
 "uuid":"qr1hi-xxxxx-xxxxxxxxxxxxxxx"
 ...
}
~/$USER/crunch_scripts$ arv job get --uuid qr1hi-xxxxx-xxxxxxxxxxxxxxx
{
 ...
 "output":"e2ccd204bca37c77c0ba59fc470cd0f7+162",
 ...
}

(Your shell should automatically fill in $USER with your login name. The job JSON that gets saved should have "repository" pointed at your personal Git repository.)

Because the job ran in concurrent, each instance of concurrent-hash creates a separate md5sum.txt as output. Arvados automatically collates theses files into a single collection, which is the output of the job:

~/$USER/crunch_scripts$ arv keep ls e2ccd204bca37c77c0ba59fc470cd0f7+162
./md5sum.txt
~/$USER/crunch_scripts$ arv keep get e2ccd204bca37c77c0ba59fc470cd0f7+162/md5sum.txt
0f1d6bcf55c34bed7f92a805d2d89bbf alice.txt
504938460ef369cd275e4ef58994cffe bob.txt
8f3b36aff310e06f3c5b9e95678ff77a carol.txt

Previous: Running on an Arvados cluster Next: run-command reference

The content of this documentation is licensed under the Creative Commons Attribution-Share Alike 3.0 United States licence.
Code samples in this documentation are licensed under the Apache License, Version 2.0.