Skip to content

Fork Join 任务

Fork Join 是 Taskflow 中的一个重要控制流任务,用于实现并行执行和任务编排。它允许工作流将执行流程分成多个并行分支,然后在所有分支完成后重新汇合。

核心特性

  1. 并行执行

    • 多分支并行处理
    • 独立执行上下文
    • 资源并行利用
  2. 同步控制

    • 分支同步管理
    • 完成条件控制
    • 超时处理机制
  3. 数据流管理

    • 分支数据隔离
    • 结果聚合处理
    • 上下文数据共享

任务参数

参数描述必需/可选
forkTasks并行执行的任务分支列表。每个分支可以包含一个或多个任务。必需
joinOn指定需要等待完成的任务引用名列表。必需

配置示例

1. 基础并行处理

{
  "name": "parallel_processing",
  "taskReferenceName": "parallel_ref",
  "type": "FORK_JOIN",
  "forkTasks": [
    [
      {
        "name": "task1",
        "taskReferenceName": "task1_ref",
        "type": "SIMPLE"
      }
    ],
    [
      {
        "name": "task2",
        "taskReferenceName": "task2_ref",
        "type": "SIMPLE"
      }
    ]
  ],
  "joinOn": ["task1_ref", "task2_ref"]
}

2. 复杂并行流程

{
  "name": "complex_parallel",
  "taskReferenceName": "complex_parallel_ref",
  "type": "FORK_JOIN",
  "forkTasks": [
    [
      {
        "name": "data_processing",
        "taskReferenceName": "data_proc_ref",
        "type": "SIMPLE",
        "inputParameters": {
          "data": "${workflow.input.data1}"
        }
      },
      {
        "name": "data_validation",
        "taskReferenceName": "data_valid_ref",
        "type": "SIMPLE"
      }
    ],
    [
      {
        "name": "notification",
        "taskReferenceName": "notify_ref",
        "type": "SIMPLE",
        "inputParameters": {
          "recipients": "${workflow.input.notifyList}"
        }
      }
    ]
  ],
  "joinOn": ["data_valid_ref", "notify_ref"]
}

3. 动态分支

{
  "name": "dynamic_parallel",
  "taskReferenceName": "dynamic_parallel_ref",
  "type": "FORK_JOIN_DYNAMIC",
  "inputParameters": {
    "dynamicTasks": "${workflow.input.parallelTasks}",
    "dynamicTasksInput": "${workflow.input.tasksInput}"
  },
  "dynamicForkTasksParam": "dynamicTasks",
  "dynamicForkTasksInputParamName": "dynamicTasksInput"
}

使用场景

1. 数据处理管道

{
  "name": "data_pipeline",
  "taskReferenceName": "pipeline_ref",
  "type": "FORK_JOIN",
  "forkTasks": [
    [
      {
        "name": "extract_data",
        "taskReferenceName": "extract_ref",
        "type": "SIMPLE"
      }
    ],
    [
      {
        "name": "prepare_storage",
        "taskReferenceName": "storage_ref",
        "type": "SIMPLE"
      }
    ],
    [
      {
        "name": "update_metadata",
        "taskReferenceName": "metadata_ref",
        "type": "SIMPLE"
      }
    ]
  ],
  "joinOn": ["extract_ref", "storage_ref", "metadata_ref"]
}

2. 并行验证

{
  "name": "parallel_validation",
  "taskReferenceName": "validation_ref",
  "type": "FORK_JOIN",
  "forkTasks": [
    [
      {
        "name": "validate_format",
        "taskReferenceName": "format_ref",
        "type": "SIMPLE"
      }
    ],
    [
      {
        "name": "validate_content",
        "taskReferenceName": "content_ref",
        "type": "SIMPLE"
      }
    ],
    [
      {
        "name": "validate_security",
        "taskReferenceName": "security_ref",
        "type": "SIMPLE"
      }
    ]
  ],
  "joinOn": ["format_ref", "content_ref", "security_ref"]
}

最佳实践

  1. 分支设计

    • 合理划分任务
    • 控制并行度
    • 避免分支依赖
  2. 资源管理

    • 评估资源需求
    • 控制并发数量
    • 监控资源使用
  3. 错误处理

    • 实现分支重试
    • 处理部分失败
    • 提供回滚机制

性能优化

  1. 并行度控制

    • 根据资源调整
    • 避免过度并行
    • 监控系统负载
  2. 数据流优化

    • 减少数据传输
    • 优化数据结构
    • 使用数据缓存

调试技巧

  1. 分支监控

    {
      "inputParameters": {
     "debug": {
       "branchId": "${BRANCH_ID}",
       "startTime": "${system.currentTimeMillis}",
       "context": "${workflow.input}"
     }
      }
    }
  2. 性能追踪

    {
      "inputParameters": {
     "metrics": {
       "executionTime": 0,
       "startTime": "${system.currentTimeMillis}",
       "branchCount": "${workflow.input.branches?size}"
     }
      }
    }

错误处理

常见错误

  1. 分支失败

    • 单个任务失败
    • 资源不足
    • 超时问题
  2. 同步问题

    • 分支未完成
    • 数据不一致
    • 死锁情况

错误响应

{
  "status": "FAILED",
  "error": "FORK_JOIN_ERROR",
  "message": "Fork join execution failed",
  "details": {
    "failedTasks": ["task1_ref"],
    "successfulTasks": ["task2_ref"],
    "errorMessage": "Task execution timeout"
  }
}

监控建议

  1. 执行指标

    • 分支完成率
    • 执行时间分布
    • 资源使用情况
  2. 性能指标

    • 并行度统计
    • 响应时间
    • 吞吐量

安全考虑

  1. 资源隔离

    • 分支资源限制
    • 数据访问控制
    • 执行环境隔离
  2. 数据安全

    • 分支间数据隔离
    • 敏感信息保护
    • 访问权限控制

UI 配置指南

  1. 基本配置

    • 设置分支任务
    • 配置同步条件
    • 设置超时时间
  2. 高级选项

    • 重试策略
    • 资源限制
    • 监控设置
  3. 可视化视图

    • 分支执行状态
    • 性能指标图表
    • 资源使用监控

飞流云