动态分支
动态分支任务用于并行运行任务,分支数量在运行时确定。
动态分支任务用于并行运行任务,分支行为(如分支数量)在运行时确定。这与分支/合并任务不同,后者的分支行为是在工作流创建时定义的。与分支/合并任务一样,动态分支任务后面跟着一个合并操作,该操作等待分支任务完成后再移动到下一个任务。此合并任务收集每个分支任务的输出。
与分支/合并任务不同,动态分支任务每个分支只能运行一个任务。如果每个分支需要多个任务,可以使用子工作流。
动态分支任务有两种运行方式:
- 每个分支运行不同的任务—使用
dynamicForkTasksParam
和dynamicForkTasksInputParamName
。 - 所有分支运行相同的任务—对于任何任务类型,使用
forkTaskName
和forkTaskInputs
;对于子工作流任务,使用forkTaskWorkflow
和forkTaskInputs
。
任务参数
为动态分支任务配置以下参数。分支任务的输入负载应与其预期输入相对应。例如,如果分支任务是 HTTP 任务,其输入应包括方法和 URI。
对于分支任务:
运行不同的任务
动态分支执行 dynamicForkTasksParam
指定的数组中的每个任务,并使用 dynamicForkTasksInputParamName
指定的相应输入。在运行时创建的分支数量取决于指定的任务数组。
参数 | 描述 | 必需/可选 |
---|---|---|
dynamicForkTasksParam | 其值将用于在不同分支上调度每个任务的输入参数键。例如,"dynamicTasks",然后将其指定为动态分支任务中的输入参数。 | 必需 |
inputParameters.dynamicTasks | 将并行运行的任务数组。每个数组元素都是对应于单独分支分支的任务定义。它可以作为变量传递。 | 必需 |
dynamicForkTasksInputParamName | 其值将用于传递每个分支任务所需输入参数的输入参数键。例如,"dynamicTasksInput",然后将其指定为动态分支任务中的输入参数。 | 必需 |
inputParameters.dynamicTasksInput | 一个映射,其中键是每个分支的任务引用名称,值是将传递到其匹配任务的输入参数。它可以作为变量传递。 | 必需 |
运行相同的任务(任何任务类型)
动态分支为 forkTaskInputs
的每个元素执行 forkTaskName
指定的任务。在运行时创建的分支数量取决于发送的 forkTaskInputs
数量。配置这些参数以执行除子工作流任务之外的任何任务类型。
参数 | 描述 | 必需/可选 |
---|---|---|
inputParameters.forkTaskName | 将在每个分支中执行的任务名称。它可以作为变量传递。 | 必需 |
inputParameters.forkTaskInputs | 每个分支分支的 JSON 输入数组。数组元素的数量决定了动态分支中的分支数量。它可以作为变量传递。 | 必需 |
运行相同的任务(子工作流)
动态分支为 forkTaskInputs
的每个元素执行 forkTaskWorkflow
和 forkTaskWorkflowVersion
指定的工作流。在运行时创建的分支数量取决于发送的 forkTaskInputs
数量。配置这些参数以执行子工作流任务。
参数 | 描述 | 必需/可选 |
---|---|---|
inputParameters.forkTaskWorkflow | 将在每个分支中执行的工作流名称。它可以作为变量传递。 | 必需 |
inputParameters.forkTaskWorkflowVersion | 要执行的工作流版本。如果未指定,将使用最新版本。 | 必需 |
inputParameters.forkTaskInputs | 每个分支分支的 JSON 输入数组。数组元素的数量决定了动态分支中的分支数量。它可以作为变量传递。 | 必需 |
合并任务将在分支任务之后运行。配置合并任务以完成分支/合并操作。
对于合并任务:
参数 | 描述 | 必需/可选 |
---|---|---|
joinOn | 合并任务在继续下一个任务之前等待完成的任务引用名称列表。 | 必需 |
expression | 如果指定,控制合并任务如何完成的合并脚本。 | 可选 |
任务配置
这是动态分支任务的任务配置。
运行不同的任务
// 动态分支任务的 JSON 架构
{
"name": "fork_join_dynamic",
"taskReferenceName": "fork_join_dynamic_ref",
"inputParameters": {
"dynamicTasks": [ // 要执行的任务名称
{
"name": "http",
"taskReferenceName": "http_ref",
"type": "HTTP",
"inputParameters": {
"uri": "https://www.taskflow.cn/api"
}
},
{ // 另一个任务定义 }
],
"dynamicTasksInput": { // 任务的输入
"taskReferenceName" : {
"key": "value",
"key": "value"
},
"anotherTaskReferenceName" : {
"key": "value",
"key": "value"
}
}
},
"type": "FORK_JOIN_DYNAMIC",
"dynamicForkTasksParam": "dynamicTasks", // 将保存要执行的任务名称的输入参数键
"dynamicForkTasksInputParamName": "dynamicTasksInput" // 将保存每个任务的输入参数的输入参数键
}
// 合并任务的 JSON 架构
{
"name": "join",
"taskReferenceName": "join_ref",
"inputParameters": {},
"type": "JOIN",
"joinOn": []
}
运行相同的任务(任何任务类型)
// 动态分支任务的 JSON 架构
{
"name": "fork_join_dynamic",
"taskReferenceName": "fork_join_dynamic_ref",
"inputParameters": {
"forkTaskName": "",
"forkTaskInputs": []
},
"type": "FORK_JOIN_DYNAMIC"
}
// 合并任务的 JSON 架构
{
"name": "join",
"taskReferenceName": "join_ref",
"inputParameters": {},
"type": "JOIN",
"joinOn": []
}
运行相同的任务(子工作流)
// 动态分支任务的 JSON 架构
{
"name": "fork_join_dynamic",
"taskReferenceName": "fork_join_dynamic_ref",
"inputParameters": {
"forkTaskWorkflow": "",
"forkTaskWorkflowVersion": "",
"forkTaskInputs": []
},
"type": "FORK_JOIN_DYNAMIC"
}
// 合并任务的 JSON 架构
{
"name": "join",
"taskReferenceName": "join_ref",
"inputParameters": {},
"type": "JOIN",
"joinOn": []
}
即使任务定义中存在 dynamicForkTasksParam 和 dynamicForkTasksInputParamName,forkTaskName 和 forkTaskInputs 也会优先。
在 UI 中添加动态分支任务
添加动态分支任务的步骤:
运行不同的任务
- 在工作流中,选择 (+) 图标并添加 动态分支 任务。
- 在 输入参数 中,将 dynamicTasks 和 dynamicTasksInput 的参数类型设置为 对象/数组。
- 将 dynamicTask 参数配置为任务定义数组。
- 将 dynamicTasksInput 参数配置为每个任务的输入参数映射。
- 选择合并任务并配置其设置以完成分支/合并操作。
运行相同的任务
在工作流中,选择 (+) 图标并添加 动态分支 任务。
在 输入参数 中,删除所有当前参数并添加以下参数及其值:
- 对于子工作流任务,添加
forkTaskWorkflow
、forkTaskWorkflowVersion
和forkTaskInputs
。 - 对于所有其他任务类型,添加
forkTaskName
和forkTaskInputs
。
- 对于子工作流任务,添加
选择合并任务并配置其设置以完成分支/合并操作。
示例
以下是使用动态分支任务的一些示例。
运行不同的任务
要在动态分支中每个分支运行不同的任务,必须使用 dynamicForkTasksParam
和 dynamicForkTasksInputParamName
。以下是运行不同任务的动态分支任务的工作流示例。
// 工作流定义
{
"name": "DynamicForkExample",
"description": "此工作流在动态分支中运行不同的任务。",
"version": 1,
"tasks": [
{
"name": "fork_join_dynamic",
"taskReferenceName": "fork_join_dynamic_ref",
"inputParameters": {
"dynamicTasks": [
{
"name": "inline",
"taskReferenceName": "task1",
"type": "INLINE",
"inputParameters": {
"expression": "(function () {\n return $.input;\n})();",
"evaluatorType": "graaljs"
}
},
{
"name": "http",
"taskReferenceName": "task2",
"type": "HTTP",
"inputParameters": {
"method": "GET",
"connectionTimeOut": 3000,
"readTimeOut": "3000",
"accept": "application/json",
"contentType": "application/json",
"encode": true,
"uri": "https://www.taskflow.cn/api"
}
},
{
"name": "x_test_worker_0",
"taskReferenceName": "simple_ref",
"type": "SIMPLE"
}
],
"dynamicTasksInput": {
"task1": {
"input": "one"
},
"task2": {
"uri": "https://www.taskflow.cn/api"
},
"task3": {
"input": {
"someKey": "someValue"
}
}
}
},
"type": "FORK_JOIN_DYNAMIC",
"dynamicForkTasksParam": "dynamicTasks",
"dynamicForkTasksInputParamName": "dynamicTasksInput"
},
{
"name": "join",
"taskReferenceName": "join_ref",
"inputParameters": {},
"type": "JOIN",
"joinOn": []
}
],
"inputParameters": [],
"outputParameters": {},
"schemaVersion": 2
}
运行相同的任务 — 简单任务
在此示例工作流中,动态分支任务并行运行名为 update_fruit_list_task
的工作者任务。任务输入从工作流输入中获取,其中包含新水果的数量。
// 工作流定义
{
"name": "dynamic_workflow_array_simple",
"description": "更新水果列表",
"version": 1,
"tasks": [
{
"name": "fork_join_dynamic",
"taskReferenceName": "fork_join_dynamic_ref",
"inputParameters": {
"forkTaskName": "update_fruit_list_task",
"forkTaskInputs": "${workflow.input.fruits}"
},
"type": "FORK_JOIN_DYNAMIC"
},
{
"name": "join",
"taskReferenceName": "join_ref",
"inputParameters": {},
"type": "JOIN",
"joinOn": []
}
],
"inputParameters": ["fruits"],
"outputParameters": {},
"schemaVersion": 2
}
在这里,forkTaskInputs
是一个变量数组输入,它决定了分支的数量。在运行时,如果 "fruits" 的输入负载包含三个 JSON 对象,将创建三个分支:
// 工作流输入负载
{
"fruits": [
{
"inventoryNo": 5,
"fruit": "apple"
},
{
"inventoryNo": 20,
"fruit": "orange"
},
{
"inventoryNo": 3,
"fruit": "kiwi"
}
]
}
在执行期间,Conductor 将在每个 JSON 对象中插入一个名为 index
的附加参数,用于表示其数组索引。
// 运行时动态分支任务的一个输入实例
{
"fruit": "kiwi",
"inventoryNo": 3,
"__index": 2 // 源数组中元素的索引
}
如果在 forkTaskInputs
中使用简单值,例如 "fruits" = ["apple", "orange", "kiwi"]
,Conductor 将在名为 input
的参数中设置每个数组元素,如下所示:
// 运行时动态分支任务的一个输入实例
{
"input": "apple", // 输入值
"__index": 0 // 源数组中元素的索引
}
运行相同的任务 — HTTP 任务
在此示例中,动态分支并行运行 HTTP 任务。forkTaskInputs
中提供的输入包含 HTTP 任务中预期的典型负载。
// 工作流定义
{
"name": "dynamic_workflow_array_http",
"description": "动态工作流数组 - 运行 HTTP 任务",
"tasks": [
{
"name": "dynamic_workflow_array_http",
"taskReferenceName": "dynamic_workflow_array_http_ref",
"inputParameters": {
"forkTaskName": "HTTP",
"forkTaskInputs": [
{
"uri": "https://www.taskflow.cn/api"
},
{
"uri": "https://www.taskflow.cn/api",
"method": "GET"
}
]
},
"type": "FORK_JOIN_DYNAMIC"
},
{
"name": "dynamic_workflow_array_http_join",
"taskReferenceName": "dynamic_workflow_array_http_join_ref",
"type": "JOIN"
}
]
}
:::tip 如果 HTTP 调用是 GET,则参数 method
的默认值为 GET,无需指定。 :::
运行相同的任务 — 子工作流任务
在此示例中,动态分支并行运行子工作流任务。
// 工作流定义
{
"name": "dynamic_workflow_array_sub_workflow",
"description": "动态工作流数组 - 运行子工作流任务",
"tasks": [
{
"name": "dynamic_workflow_array_sub_workflow",
"taskReferenceName": "dynamic_workflow_array_sub_workflow_ref",
"inputParameters": {
"forkTaskWorkflow": "extract_user",
"forkTaskWorkflowVersion": "1",
"forkTaskInputs": [
{
"input": "value1"
},
{
"input": "value2"
}
]
},
"type": "FORK_JOIN_DYNAMIC"
},
{
"name": "dynamic_workflow_array_sub_workflow_join",
"taskReferenceName": "dynamic_workflow_array_sub_workflow_join_ref",
"type": "JOIN"
}
]
}