Skip to content

Sub Workflow 任务

Sub Workflow 任务用于在当前工作流中嵌入和执行另一个工作流。与 Start Workflow 任务不同,Sub Workflow 任务会等待子工作流完成后才继续执行主工作流。

核心特性

  1. 同步执行

    • 等待子工作流完成
    • 状态同步
    • 结果传递
  2. 数据流管理

    • 参数传递
    • 结果获取
    • 上下文共享
  3. 错误处理

    • 子工作流失败处理
    • 超时管理
    • 重试机制

任务参数

参数描述必需/可选
subWorkflowParam子工作流参数对象必需
subWorkflowParam.name子工作流的名称必需
subWorkflowParam.version子工作流的版本可选,默认为最新版本
subWorkflowParam.taskToDomain任务到域的映射可选
waitForCompletion是否等待子工作流完成可选,默认为 true

配置示例

1. 基础子工作流

{
  "name": "sub_workflow",
  "taskReferenceName": "sub_workflow_ref",
  "type": "SUB_WORKFLOW",
  "inputParameters": {
    "subWorkflowParam": {
      "name": "process_data",
      "version": 1
    },
    "processData": "${workflow.input.data}"
  }
}

2. 带条件的子工作流

{
  "name": "conditional_sub_workflow",
  "taskReferenceName": "conditional_sub_workflow_ref",
  "type": "SUB_WORKFLOW",
  "inputParameters": {
    "subWorkflowParam": {
      "name": "${workflow.input.amount > 1000 ? 'premium_process' : 'standard_process'}",
      "version": 1
    },
    "input": {
      "amount": "${workflow.input.amount}",
      "customerId": "${workflow.input.customerId}"
    }
  }
}

3. 复杂子工作流

{
  "name": "complex_sub_workflow",
  "taskReferenceName": "complex_sub_workflow_ref",
  "type": "SUB_WORKFLOW",
  "inputParameters": {
    "subWorkflowParam": {
      "name": "data_processing",
      "version": 2,
      "taskToDomain": {
        "task1": "domain1",
        "task2": "domain2"
      }
    },
    "input": {
      "sourceData": "${workflow.input.data}",
      "config": {
        "processingType": "advanced",
        "validation": true,
        "enrichment": true
      }
    }
  }
}

使用场景

1. 数据处理流程

{
  "name": "data_processing_workflow",
  "taskReferenceName": "data_processing_ref",
  "type": "SUB_WORKFLOW",
  "inputParameters": {
    "subWorkflowParam": {
      "name": "data_transformation",
      "version": 1
    },
    "input": {
      "sourceData": "${workflow.input.data}",
      "transformationRules": [
        "validate",
        "transform",
        "enrich"
      ]
    }
  }
}

2. 审批流程

{
  "name": "approval_workflow",
  "taskReferenceName": "approval_workflow_ref",
  "type": "SUB_WORKFLOW",
  "inputParameters": {
    "subWorkflowParam": {
      "name": "multi_level_approval",
      "version": 1
    },
    "input": {
      "requestId": "${workflow.input.requestId}",
      "amount": "${workflow.input.amount}",
      "approvers": "${workflow.input.approverList}"
    }
  }
}

最佳实践

  1. 版本管理

    • 明确指定版本
    • 版本兼容性检查
    • 升级策略规划
  2. 参数传递

    • 验证必需参数
    • 类型检查
    • 默认值处理
  3. 错误处理

    • 失败重试策略
    • 超时处理
    • 回滚机制

性能优化

  1. 资源管理

    • 控制嵌套深度
    • 优化数据传输
    • 监控资源使用
  2. 执行优化

    • 减少等待时间
    • 并行处理
    • 缓存利用

调试技巧

  1. 工作流追踪

    {
      "inputParameters": {
     "debug": {
       "parentWorkflowId": "${workflow.workflowId}",
       "subWorkflowName": "${workflow.input.subWorkflowParam.name}",
       "startTime": "${system.currentTimeMillis}"
     }
      }
    }
  2. 状态监控

    {
      "inputParameters": {
     "monitor": {
       "subWorkflowId": "",
       "status": "RUNNING",
       "startTime": "${system.currentTimeMillis}",
       "timeout": 3600
     }
      }
    }

错误处理

常见错误

  1. 启动错误

    • 工作流不存在
    • 版本不兼容
    • 参数无效
  2. 执行错误

    • 子工作流失败
    • 超时
    • 资源不足

错误响应

{
  "status": "FAILED",
  "error": "SUB_WORKFLOW_ERROR",
  "message": "Sub workflow execution failed",
  "details": {
    "subWorkflowId": "abc123",
    "errorCode": "TASK_ERROR",
    "errorMessage": "Task execution failed in sub workflow"
  }
}

监控建议

  1. 执行指标

    • 完成时间
    • 成功率
    • 资源使用
  2. 性能指标

    • 响应时间
    • 等待时间
    • 吞吐量

安全考虑

  1. 访问控制

    • 权限验证
    • 资源限制
    • 数据隔离
  2. 数据安全

    • 敏感数据处理
    • 传输加密
    • 审计日志

UI 配置指南

  1. 基本配置

    • 选择子工作流
    • 设置版本
    • 配置参数
  2. 高级选项

    • 域配置
    • 超时设置
    • 重试策略
  3. 监控面板

    • 执行状态
    • 性能指标
    • 日志查看

任务配置

这是 Sub Workflow 任务的任务配置。

{
  "name": "sub_workflow",
  "taskReferenceName": "sub_workflow_ref",
  "inputParameters": {
    // 子工作流的输入参数},
    "type": "SUB_WORKFLOW",
    "subWorkflowParam": {
      "name": "subworkflowName",
      "version": 3,
      "priority": 5,
      "idempotencyKey": "someKey",
      "idempotencyStrategy": "RETURN_EXISTING",
      "taskToDomain": {
        "someTask": "someDomain"
      }
    }
  }
}

任务输出

Sub Workflow 任务将返回以下参数。

参数描述
subWorkflowId运行子工作流时生成的子工作流执行 ID。

除了执行 ID 外,子工作流的工作流输出也将作为 Sub Workflow 任务的输出提供。

在 UI 中添加 Sub Workflow 任务

添加 Sub Workflow 任务的步骤:

  1. 在工作流中,选择  (+)  图标并添加  Sub Workflow  任务。
  2. 输入工作流名称版本。 一旦选择,如果有任何预定义的输入参数,子工作流的输入参数将自动出现。
  3. (可选)输入幂等性键并选择幂等性策略
  4. (可选)为子工作流添加任何其他输入参数。
  5. (可选)为子工作流任务添加任务到域的映射。

要查看父工作流中的子工作流任务,您可以勾选展开以在可视化图表编辑器中显示它们。

示例

以下是使用 Sub Workflow 任务的一些示例。

在工作流中使用 Sub Workflow 任务

假设您有一个非常长的工作流 "payment_for_subscription",它处理订阅的付款,如下所示:

要将此 "payment_for_subscription" 工作流添加到更大的订阅工作流中,可以复制并粘贴工作流 JSON 定义。但是,每当 "payment_for_subscription" 工作流更新时,它不会反映在您添加它的工作流中。更好的处理方式是调用 "payment_for_subscription" 工作流作为更广泛的订阅工作流中的子工作流,这样对该工作流的任何更新都会反映在其所有父工作流中。

每当需要实现付款流程时,您可以在所需的工作流中添加此子工作流:

这是一个订阅工作流,其中包含多个需要实现付款流程的实例。在这里,之前创建的付款工作流被添加为子工作流。

常见问题

如何从特定任务重试子工作流?

您可以使用以下 API 从特定任务重试子工作流。从 Conductor UI 中,检查任务级别开始时间以验证前面的任务没有重新运行。此外,您可以在所需的重新运行起点之前插入一个 WAIT 任务,并确认它不会转换到该 WAIT 任务的状态。

     curl -X POST 'https://<conductor_server_dns>/api/workflow/<workflow_id>/rerun' \
       -H 'Content-Type: application/json' \
       -d '{
        "reRunFromTaskId": "<task_execution_id>"
      }' \
       -H 'x-authorization: <auth_token>'

飞流云