Skip to content

Join 任务

Join 任务是 Taskflow 中的一个同步控制任务,用于汇合多个并行执行的分支。它通常与 Fork 任务配合使用,等待指定的任务完成后再继续执行工作流。

核心特性

  1. 分支同步

    • 多分支汇合
    • 条件等待机制
    • 完成状态同步
  2. 数据聚合

    • 结果合并处理
    • 输出数据整合
    • 状态信息汇总
  3. 控制流管理

    • 执行流程控制
    • 超时处理
    • 错误处理策略

任务参数

参数描述必需/可选
joinOn需要等待完成的任务引用名称列表必需
aggregateOutput是否聚合所有分支的输出数据可选,默认 true
timeout等待分支完成的超时时间(秒)可选

配置示例

1. 基础 Join 配置

{
  "name": "basic_join",
  "taskReferenceName": "basic_join_ref",
  "type": "JOIN",
  "joinOn": ["task1_ref", "task2_ref"],
  "inputParameters": {
    "timeout": 3600
  }
}

2. 带数据聚合的 Join

{
  "name": "aggregate_join",
  "taskReferenceName": "aggregate_join_ref",
  "type": "JOIN",
  "joinOn": ["process_ref", "validate_ref", "analyze_ref"],
  "inputParameters": {
    "aggregateOutput": true,
    "timeout": 7200
  }
}

3. 条件 Join

{
  "name": "conditional_join",
  "taskReferenceName": "conditional_join_ref",
  "type": "JOIN",
  "joinOn": ["${workflow.input.requiredTasks}"],
  "inputParameters": {
    "waitPolicy": "ANY",
    "minimumComplete": 2
  }
}

使用场景

1. 数据处理流程同步

{
  "name": "data_processing_join",
  "taskReferenceName": "data_proc_join_ref",
  "type": "JOIN",
  "joinOn": ["extract_ref", "transform_ref", "validate_ref"],
  "inputParameters": {
    "aggregateOutput": true,
    "requireAllComplete": true
  }
}

2. 并行验证汇总

{
  "name": "validation_join",
  "taskReferenceName": "validation_join_ref",
  "type": "JOIN",
  "joinOn": ["format_check_ref", "content_check_ref", "security_check_ref"],
  "inputParameters": {
    "aggregateResults": true,
    "failOnAnyError": true
  }
}

最佳实践

  1. 超时设置

    • 合理设置等待时间
    • 考虑任务复杂度
    • 预留缓冲时间
  2. 错误处理

    • 定义失败策略
    • 设置重试机制
    • 提供回滚方案
  3. 数据管理

    • 控制数据量大小
    • 优化聚合逻辑
    • 清理临时数据

性能优化

  1. 等待策略优化

    • 最小化等待时间
    • 避免不必要等待
    • 优化完成条件
  2. 资源管理

    • 控制内存使用
    • 优化数据结构
    • 及时释放资源

调试技巧

  1. 状态监控

    {
      "inputParameters": {
     "debug": {
       "joinTaskId": "${TASK_ID}",
       "startTime": "${system.currentTimeMillis}",
       "pendingTasks": "${workflow.input.joinOn}"
     }
      }
    }
  2. 完成追踪

    {
      "inputParameters": {
     "metrics": {
       "completedTasks": [],
       "pendingTasks": "${workflow.input.joinOn}",
       "elapsedTime": 0
     }
      }
    }

错误处理

常见错误类型

  1. 超时错误

    • 等待超时
    • 分支执行超时
    • 系统响应超时
  2. 数据错误

    • 数据不一致
    • 格式错误
    • 聚合失败

错误响应示例

{
  "status": "FAILED",
  "error": "JOIN_TASK_ERROR",
  "message": "Join task execution failed",
  "details": {
    "failedBranches": ["task1_ref"],
    "timeoutTasks": ["task2_ref"],
    "errorMessage": "Task execution timeout"
  }
}

监控指标

  1. 性能指标

    • 等待时间
    • 完成率
    • 资源使用
  2. 状态指标

    • 分支完成数
    • 错误率
    • 超时次数

安全考虑

  1. 数据安全

    • 敏感信息处理
    • 数据访问控制
    • 日志脱敏
  2. 资源保护

    • 限制等待时间
    • 控制资源使用
    • 防止死锁

UI 配置指南

  1. 基本配置

    • 选择等待任务
    • 设置超时时间
    • 配置聚合选项
  2. 高级设置

    • 错误处理策略
    • 资源限制
    • 监控选项
  3. 监控视图

    • 任务状态展示
    • 完成进度跟踪
    • 性能指标查看

飞流云