Skip to content

动态分支

动态分支任务用于并行运行任务,分支数量在运行时确定。

动态分支任务用于并行运行任务,分支行为(如分支数量)在运行时确定。这与分支/合并任务不同,后者的分支行为是在工作流创建时定义的。与分支/合并任务一样,动态分支任务后面跟着一个合并操作,该操作等待分支任务完成后再移动到下一个任务。此合并任务收集每个分支任务的输出。

与分支/合并任务不同,动态分支任务每个分支只能运行一个任务。如果每个分支需要多个任务,可以使用子工作流。

动态分支任务有两种运行方式:

  • 每个分支运行不同的任务—使用  dynamicForkTasksParam  和  dynamicForkTasksInputParamName
  • 所有分支运行相同的任务—对于任何任务类型,使用  forkTaskName  和  forkTaskInputs;对于子工作流任务,使用  forkTaskWorkflow  和  forkTaskInputs

任务参数

为动态分支任务配置以下参数。分支任务的输入负载应与其预期输入相对应。例如,如果分支任务是 HTTP 任务,其输入应包括方法和 URI。

对于分支任务:

运行不同的任务

动态分支执行  dynamicForkTasksParam  指定的数组中的每个任务,并使用  dynamicForkTasksInputParamName  指定的相应输入。在运行时创建的分支数量取决于指定的任务数组。

参数描述必需/可选
dynamicForkTasksParam其值将用于在不同分支上调度每个任务的输入参数键。例如,"dynamicTasks",然后将其指定为动态分支任务中的输入参数。必需
inputParameters.dynamicTasks将并行运行的任务数组。每个数组元素都是对应于单独分支分支的任务定义。它可以作为变量传递。必需
dynamicForkTasksInputParamName其值将用于传递每个分支任务所需输入参数的输入参数键。例如,"dynamicTasksInput",然后将其指定为动态分支任务中的输入参数。必需
inputParameters.dynamicTasksInput一个映射,其中键是每个分支的任务引用名称,值是将传递到其匹配任务的输入参数。它可以作为变量传递。必需

运行相同的任务(任何任务类型)

动态分支为  forkTaskInputs  的每个元素执行  forkTaskName  指定的任务。在运行时创建的分支数量取决于发送的  forkTaskInputs  数量。配置这些参数以执行除子工作流任务之外的任何任务类型。

参数描述必需/可选
inputParameters.forkTaskName将在每个分支中执行的任务名称。它可以作为变量传递。必需
inputParameters.forkTaskInputs每个分支分支的 JSON 输入数组。数组元素的数量决定了动态分支中的分支数量。它可以作为变量传递。必需

运行相同的任务(子工作流)

动态分支为  forkTaskInputs  的每个元素执行  forkTaskWorkflow  和  forkTaskWorkflowVersion  指定的工作流。在运行时创建的分支数量取决于发送的  forkTaskInputs  数量。配置这些参数以执行子工作流任务。

参数描述必需/可选
inputParameters.forkTaskWorkflow将在每个分支中执行的工作流名称。它可以作为变量传递。必需
inputParameters.forkTaskWorkflowVersion要执行的工作流版本。如果未指定,将使用最新版本。必需
inputParameters.forkTaskInputs每个分支分支的 JSON 输入数组。数组元素的数量决定了动态分支中的分支数量。它可以作为变量传递。必需

合并任务将在分支任务之后运行。配置合并任务以完成分支/合并操作。

对于合并任务:

参数描述必需/可选
joinOn合并任务在继续下一个任务之前等待完成的任务引用名称列表。必需
expression如果指定,控制合并任务如何完成的合并脚本。可选

任务配置

这是动态分支任务的任务配置。

运行不同的任务

// 动态分支任务的 JSON 架构
{
  "name": "fork_join_dynamic",
  "taskReferenceName": "fork_join_dynamic_ref",
  "inputParameters": {
    "dynamicTasks": [ // 要执行的任务名称
      {
        "name": "http",
        "taskReferenceName": "http_ref",
        "type": "HTTP",
        "inputParameters": {
          "uri": "https://www.taskflow.cn/api"
        }
      },
      { // 另一个任务定义 }

    ],
    "dynamicTasksInput": { // 任务的输入
      "taskReferenceName" : {
        "key": "value",
        "key": "value"
      },
      "anotherTaskReferenceName" : {
        "key": "value",
        "key": "value"
      }
    }
  },
  "type": "FORK_JOIN_DYNAMIC",
  "dynamicForkTasksParam": "dynamicTasks", // 将保存要执行的任务名称的输入参数键
  "dynamicForkTasksInputParamName": "dynamicTasksInput" // 将保存每个任务的输入参数的输入参数键
}


// 合并任务的 JSON 架构
{
  "name": "join",
  "taskReferenceName": "join_ref",
  "inputParameters": {},
  "type": "JOIN",
  "joinOn": []
}

运行相同的任务(任何任务类型)

// 动态分支任务的 JSON 架构
{
  "name": "fork_join_dynamic",
  "taskReferenceName": "fork_join_dynamic_ref",
  "inputParameters": {
    "forkTaskName": "",
    "forkTaskInputs": []
  },
  "type": "FORK_JOIN_DYNAMIC"
}

// 合并任务的 JSON 架构
{
  "name": "join",
  "taskReferenceName": "join_ref",
  "inputParameters": {},
  "type": "JOIN",
  "joinOn": []
}

运行相同的任务(子工作流)

// 动态分支任务的 JSON 架构
{
  "name": "fork_join_dynamic",
  "taskReferenceName": "fork_join_dynamic_ref",
  "inputParameters": {
    "forkTaskWorkflow": "",
    "forkTaskWorkflowVersion": "",
    "forkTaskInputs": []
  },
  "type": "FORK_JOIN_DYNAMIC"
}

// 合并任务的 JSON 架构
{
  "name": "join",
  "taskReferenceName": "join_ref",
  "inputParameters": {},
  "type": "JOIN",
  "joinOn": []
}

即使任务定义中存在 dynamicForkTasksParam 和 dynamicForkTasksInputParamName,forkTaskName 和 forkTaskInputs 也会优先。

在 UI 中添加动态分支任务

添加动态分支任务的步骤:

运行不同的任务

  1. 在工作流中,选择  (+)  图标并添加  动态分支  任务。
  2. 在  输入参数  中,将 dynamicTasks 和 dynamicTasksInput 的参数类型设置为  对象/数组
  3. 将 dynamicTask 参数配置为任务定义数组。
  4. 将 dynamicTasksInput 参数配置为每个任务的输入参数映射。
  5. 选择合并任务并配置其设置以完成分支/合并操作。

运行相同的任务

  1. 在工作流中,选择  (+)  图标并添加  动态分支  任务。

  2. 在  输入参数  中,删除所有当前参数并添加以下参数及其值:

    • 对于子工作流任务,添加  forkTaskWorkflowforkTaskWorkflowVersion  和  forkTaskInputs
    • 对于所有其他任务类型,添加  forkTaskName  和  forkTaskInputs
  3. 选择合并任务并配置其设置以完成分支/合并操作。

示例

以下是使用动态分支任务的一些示例。

运行不同的任务

要在动态分支中每个分支运行不同的任务,必须使用  dynamicForkTasksParam  和  dynamicForkTasksInputParamName。以下是运行不同任务的动态分支任务的工作流示例。

// 工作流定义

{
  "name": "DynamicForkExample",
  "description": "此工作流在动态分支中运行不同的任务。",
  "version": 1,
  "tasks": [
    {
      "name": "fork_join_dynamic",
      "taskReferenceName": "fork_join_dynamic_ref",
      "inputParameters": {
        "dynamicTasks": [
          {
            "name": "inline",
            "taskReferenceName": "task1",
            "type": "INLINE",
            "inputParameters": {
              "expression": "(function () {\n  return $.input;\n})();",
              "evaluatorType": "graaljs"
            }
          },
          {
            "name": "http",
            "taskReferenceName": "task2",
            "type": "HTTP",
            "inputParameters": {
              "method": "GET",
              "connectionTimeOut": 3000,
              "readTimeOut": "3000",
              "accept": "application/json",
              "contentType": "application/json",
              "encode": true,
              "uri": "https://www.taskflow.cn/api"
            }
          },
          {
            "name": "x_test_worker_0",
            "taskReferenceName": "simple_ref",
            "type": "SIMPLE"
          }
        ],
        "dynamicTasksInput": {
          "task1": {
            "input": "one"
          },
          "task2": {
            "uri": "https://www.taskflow.cn/api"
          },
          "task3": {
            "input": {
              "someKey": "someValue"
            }
          }
        }
      },
      "type": "FORK_JOIN_DYNAMIC",
      "dynamicForkTasksParam": "dynamicTasks",
      "dynamicForkTasksInputParamName": "dynamicTasksInput"
    },
    {
      "name": "join",
      "taskReferenceName": "join_ref",
      "inputParameters": {},
      "type": "JOIN",
      "joinOn": []
    }
  ],
  "inputParameters": [],
  "outputParameters": {},
  "schemaVersion": 2
}

运行相同的任务 — 简单任务

在此示例工作流中,动态分支任务并行运行名为  update_fruit_list_task  的工作者任务。任务输入从工作流输入中获取,其中包含新水果的数量。

// 工作流定义

{
  "name": "dynamic_workflow_array_simple",
  "description": "更新水果列表",
  "version": 1,
  "tasks": [
    {
      "name": "fork_join_dynamic",
      "taskReferenceName": "fork_join_dynamic_ref",
      "inputParameters": {
        "forkTaskName": "update_fruit_list_task",
        "forkTaskInputs": "${workflow.input.fruits}"
      },
      "type": "FORK_JOIN_DYNAMIC"
    },
    {
      "name": "join",
      "taskReferenceName": "join_ref",
      "inputParameters": {},
      "type": "JOIN",
      "joinOn": []
    }
  ],
  "inputParameters": ["fruits"],
  "outputParameters": {},
  "schemaVersion": 2
}

在这里,forkTaskInputs  是一个变量数组输入,它决定了分支的数量。在运行时,如果 "fruits" 的输入负载包含三个 JSON 对象,将创建三个分支:

// 工作流输入负载

{
  "fruits": [
    {
      "inventoryNo": 5,
      "fruit": "apple"
    },
    {
      "inventoryNo": 20,
      "fruit": "orange"
    },
    {
      "inventoryNo": 3,
      "fruit": "kiwi"
    }
  ]
}

在执行期间,Conductor 将在每个 JSON 对象中插入一个名为  index  的附加参数,用于表示其数组索引。

// 运行时动态分支任务的一个输入实例

{
  "fruit": "kiwi",
  "inventoryNo": 3,
  "__index": 2 // 源数组中元素的索引
}

如果在  forkTaskInputs  中使用简单值,例如  "fruits" = ["apple", "orange", "kiwi"],Conductor 将在名为  input  的参数中设置每个数组元素,如下所示:

// 运行时动态分支任务的一个输入实例

{
  "input": "apple", // 输入值
  "__index": 0 // 源数组中元素的索引
}

运行相同的任务 — HTTP 任务

在此示例中,动态分支并行运行 HTTP 任务。forkTaskInputs  中提供的输入包含 HTTP 任务中预期的典型负载。

// 工作流定义

{
  "name": "dynamic_workflow_array_http",
  "description": "动态工作流数组 - 运行 HTTP 任务",
  "tasks": [
    {
      "name": "dynamic_workflow_array_http",
      "taskReferenceName": "dynamic_workflow_array_http_ref",
      "inputParameters": {
        "forkTaskName": "HTTP",
        "forkTaskInputs": [
          {
            "uri": "https://www.taskflow.cn/api"
          },
          {
            "uri": "https://www.taskflow.cn/api",
            "method": "GET"
          }
        ]
      },
      "type": "FORK_JOIN_DYNAMIC"
    },
    {
      "name": "dynamic_workflow_array_http_join",
      "taskReferenceName": "dynamic_workflow_array_http_join_ref",
      "type": "JOIN"
    }
  ]
}

:::tip 如果 HTTP 调用是 GET,则参数  method  的默认值为 GET,无需指定。 :::

运行相同的任务 — 子工作流任务

在此示例中,动态分支并行运行子工作流任务。

    // 工作流定义

    {
      "name": "dynamic_workflow_array_sub_workflow",
      "description": "动态工作流数组 - 运行子工作流任务",
      "tasks": [
        {
          "name": "dynamic_workflow_array_sub_workflow",
          "taskReferenceName": "dynamic_workflow_array_sub_workflow_ref",
          "inputParameters": {
            "forkTaskWorkflow": "extract_user",
            "forkTaskWorkflowVersion": "1",
            "forkTaskInputs": [
              {
                "input": "value1"
              },
              {
                "input": "value2"
              }
            ]
          },
          "type": "FORK_JOIN_DYNAMIC"
        },
        {
          "name": "dynamic_workflow_array_sub_workflow_join",
          "taskReferenceName": "dynamic_workflow_array_sub_workflow_join_ref",
          "type": "JOIN"
        }
      ]
    }

飞流云