Federated Multi-Cluster Workflows

To support running analysis on geographically dispersed data (avoiding expensive data transfers by sending the computation to the data), and “hybrid cloud” configurations where an on-premise cluster can expand its capabilities by delegating work to a cloud-hosted cluster, Arvados supports federated workflows. In a federated workflow, different steps of a workflow may execute on different clusters. Arvados manages data transfer and delegation of credentials, so that all that is required is adding arv:ClusterTarget hints to your existing workflow.

For more information, visit the architecture and admin sections about Arvados federation.

Get the example files

The tutorial files are located in the documentation section of the Arvados source repository: or see below

~$ git clone https://github.com/arvados/arvados
~$ cd arvados/doc/user/cwl/federated

Run example

Note:

At this time, remote steps of a workflow on Workbench are not displayed. As a workaround, you can find the UUIDs of the remote steps in the live logs of the workflow runner (the “Logs” tab). You may visit the remote cluster’s workbench and enter the UUID into the search box to view the details of the remote step. This will be fixed in a future version of workbench.

Run it like any other workflow:

~$ arvados-cwl-runner federated.cwl shards.cwl

You can also run a workflow on a remote federated cluster .

Federated scatter/gather example

In this following example, an analysis task is executed on three different clusters with different data, then the results are combined to produce the final output.

#
# Demonstrate Arvados federation features.  This performs a parallel
# scatter over some arbitrary number of files and federated clusters,
# then joins the results.
#
cwlVersion: v1.0
class: Workflow
$namespaces:
  # When using Arvados extensions to CWL, must declare the 'arv' namespace
  arv: "http://arvados.org/cwl#"

requirements:
  InlineJavascriptRequirement: {}
  ScatterFeatureRequirement: {}
  StepInputExpressionRequirement: {}

  DockerRequirement:
    # Replace this with your own Docker container
    dockerPull: arvados/jobs

  # Define a record type so we can conveniently associate the input
  # file, the cluster on which the file lives, and the project on that
  # cluster that will own the container requests and intermediate
  # outputs.
  SchemaDefRequirement:
    types:
      - name: FileOnCluster
        type: record
        fields:
          file: File
          cluster: string
          project: string

inputs:
  # Expect an array of FileOnCluster records (defined above)
  # as our input.
  shards:
    type:
      type: array
      items: FileOnCluster

outputs:
  # Will produce an output file with the results of the distributed
  # analysis jobs joined together.
  joined:
    type: File
    outputSource: gather-results/joined

steps:
  distributed-analysis:
    in:
      # Take "shards" array as input, we scatter over it below.
      shard: shards

      # Use an expression to extract the "file" field to assign to the
      # "inp" parameter of the tool.
      inp: {valueFrom: $(inputs.shard.file)}

    # Scatter over shards, this means creating a parallel job for each
    # element in the "shards" array.  Expressions are evaluated for
    # each element.
    scatter: shard

    # Specify the cluster target for this job.  This means each
    # separate scatter job will execute on the cluster that was
    # specified in the "cluster" field.
    #
    # Arvados handles streaming data between clusters, for example,
    # the Docker image containing the code for a particular tool will
    # be fetched on demand, as long as it is available somewhere in
    # the federation.
    hints:
      arv:ClusterTarget:
        cluster_id: $(inputs.shard.cluster)
        project_uuid: $(inputs.shard.project)

    out: [out]
    run: md5sum.cwl

  # Collect the results of the distributed step and join them into a
  # single output file.  Arvados handles streaming inputs,
  # intermediate results, and outputs between clusters on demand.
  gather-results:
    in:
      inp: distributed-analysis/out
    out: [joined]
    run: cat.cwl

Example input document:

shards:
  - cluster: clsr1
    project: clsr1-j7d0g-qxc4jcji7n4lafx
    file:
      class: File
      location: keep:485df2c5cec3207a32f49c42f1cdcca9+61/file-on-clsr1.dat

  - cluster: clsr2
    project: clsr2-j7d0g-ivdrm1hyym21vkq
    file:
      class: File
      location: keep:ae6e9c3e9bfa52a0122ecb489d8198ff+61/file-on-clsr2.dat

  - cluster: clsr3
    project: clsr3-j7d0g-e3njz2s53lyb0ka
    file:
      class: File
      location: keep:0b43a0ef9ea592d5d7b299978dfa8643+61/file-on-clsr3.dat


Previous: Writing a CWL workflow Next: Best Practices for writing CWL

The content of this documentation is licensed under the Creative Commons Attribution-Share Alike 3.0 United States licence.
Code samples in this documentation are licensed under the Apache License, Version 2.0.