Join 任务
Join 任务是 Taskflow 中的一个同步控制任务,用于汇合多个并行执行的分支。它通常与 Fork 任务配合使用,等待指定的任务完成后再继续执行工作流。
核心特性
分支同步
- 多分支汇合
- 条件等待机制
- 完成状态同步
数据聚合
- 结果合并处理
- 输出数据整合
- 状态信息汇总
控制流管理
- 执行流程控制
- 超时处理
- 错误处理策略
任务参数
参数 | 描述 | 必需/可选 |
---|---|---|
joinOn | 需要等待完成的任务引用名称列表 | 必需 |
aggregateOutput | 是否聚合所有分支的输出数据 | 可选,默认 true |
timeout | 等待分支完成的超时时间(秒) | 可选 |
配置示例
1. 基础 Join 配置
{
"name": "basic_join",
"taskReferenceName": "basic_join_ref",
"type": "JOIN",
"joinOn": ["task1_ref", "task2_ref"],
"inputParameters": {
"timeout": 3600
}
}
2. 带数据聚合的 Join
{
"name": "aggregate_join",
"taskReferenceName": "aggregate_join_ref",
"type": "JOIN",
"joinOn": ["process_ref", "validate_ref", "analyze_ref"],
"inputParameters": {
"aggregateOutput": true,
"timeout": 7200
}
}
3. 条件 Join
{
"name": "conditional_join",
"taskReferenceName": "conditional_join_ref",
"type": "JOIN",
"joinOn": ["${workflow.input.requiredTasks}"],
"inputParameters": {
"waitPolicy": "ANY",
"minimumComplete": 2
}
}
使用场景
1. 数据处理流程同步
{
"name": "data_processing_join",
"taskReferenceName": "data_proc_join_ref",
"type": "JOIN",
"joinOn": ["extract_ref", "transform_ref", "validate_ref"],
"inputParameters": {
"aggregateOutput": true,
"requireAllComplete": true
}
}
2. 并行验证汇总
{
"name": "validation_join",
"taskReferenceName": "validation_join_ref",
"type": "JOIN",
"joinOn": ["format_check_ref", "content_check_ref", "security_check_ref"],
"inputParameters": {
"aggregateResults": true,
"failOnAnyError": true
}
}
最佳实践
超时设置
- 合理设置等待时间
- 考虑任务复杂度
- 预留缓冲时间
错误处理
- 定义失败策略
- 设置重试机制
- 提供回滚方案
数据管理
- 控制数据量大小
- 优化聚合逻辑
- 清理临时数据
性能优化
等待策略优化
- 最小化等待时间
- 避免不必要等待
- 优化完成条件
资源管理
- 控制内存使用
- 优化数据结构
- 及时释放资源
调试技巧
状态监控
{ "inputParameters": { "debug": { "joinTaskId": "${TASK_ID}", "startTime": "${system.currentTimeMillis}", "pendingTasks": "${workflow.input.joinOn}" } } }
完成追踪
{ "inputParameters": { "metrics": { "completedTasks": [], "pendingTasks": "${workflow.input.joinOn}", "elapsedTime": 0 } } }
错误处理
常见错误类型
超时错误
- 等待超时
- 分支执行超时
- 系统响应超时
数据错误
- 数据不一致
- 格式错误
- 聚合失败
错误响应示例
{
"status": "FAILED",
"error": "JOIN_TASK_ERROR",
"message": "Join task execution failed",
"details": {
"failedBranches": ["task1_ref"],
"timeoutTasks": ["task2_ref"],
"errorMessage": "Task execution timeout"
}
}
监控指标
性能指标
- 等待时间
- 完成率
- 资源使用
状态指标
- 分支完成数
- 错误率
- 超时次数
安全考虑
数据安全
- 敏感信息处理
- 数据访问控制
- 日志脱敏
资源保护
- 限制等待时间
- 控制资源使用
- 防止死锁
UI 配置指南
基本配置
- 选择等待任务
- 设置超时时间
- 配置聚合选项
高级设置
- 错误处理策略
- 资源限制
- 监控选项
监控视图
- 任务状态展示
- 完成进度跟踪
- 性能指标查看