Skip to content

Wait For Webhook 任务

Wait For Webhook 是 Taskflow 中的一个重要系统任务,用于实现工作流与外部系统的异步集成。它允许工作流暂停执行,等待外部系统通过 webhook 回调来继续流程。

核心特性

  1. 异步集成

    • 支持长时间运行的操作
    • 无阻塞等待机制
    • 灵活的超时控制
  2. 回调处理

    • HTTP Webhook 支持
    • 自动状态管理
    • 数据传递机制
  3. 安全机制

    • 回调认证
    • 请求验证
    • 数据加密

任务参数

参数描述必需/可选
workflowId工作流实例 ID,用于生成唯一的回调 URL。默认为 ${workflow.workflowId}必需
taskId任务实例 ID,用于生成唯一的回调 URL。默认为 ${TASK_ID}必需
timeoutSeconds等待回调的超时时间(秒)。默认为 0(永不超时)。可选
headers期望在回调请求中包含的 HTTP 头。用于请求验证。可选

回调 URL 格式

http(s)://{taskflow-server}/api/tasks/webhook/{workflowId}/{taskRefName}/{taskId}

配置示例

1. 基础配置

{
  "name": "wait_for_callback",
  "taskReferenceName": "wait_callback_ref",
  "type": "WAIT_FOR_WEBHOOK",
  "inputParameters": {
    "timeoutSeconds": 3600,
    "headers": {
      "x-api-key": "${workflow.input.apiKey}"
    }
  }
}

2. 带验证的配置

{
  "name": "secure_webhook",
  "taskReferenceName": "secure_webhook_ref",
  "type": "WAIT_FOR_WEBHOOK",
  "inputParameters": {
    "timeoutSeconds": 7200,
    "headers": {
      "x-api-key": "${workflow.input.apiKey}",
      "x-signature": "${workflow.input.signature}",
      "x-timestamp": "${workflow.input.timestamp}"
    }
  }
}

3. 自定义回调处理

{
  "name": "custom_webhook",
  "taskReferenceName": "custom_webhook_ref",
  "type": "WAIT_FOR_WEBHOOK",
  "inputParameters": {
    "timeoutSeconds": 1800,
    "headers": {
      "content-type": "application/json",
      "x-callback-type": "CUSTOM",
      "x-correlation-id": "${workflow.correlationId}"
    }
  }
}

回调请求格式

成功回调

POST /api/tasks/webhook/{workflowId}/{taskRefName}/{taskId}
Content-Type: application/json

{
  "status": "COMPLETED",
  "output": {
    "result": "success",
    "data": {
      "key": "value"
    }
  }
}

失败回调

POST /api/tasks/webhook/{workflowId}/{taskRefName}/{taskId}
Content-Type: application/json

{
  "status": "FAILED",
  "output": {
    "error": "处理失败",
    "reason": "无效的输入数据"
  }
}

最佳实践

  1. 超时设置

    • 设置合理的超时时间
    • 实现超时处理逻辑
    • 考虑重试机制
  2. 安全性

    • 使用 HTTPS 端点
    • 实现请求签名
    • 验证回调来源
  3. 错误处理

    • 处理超时情况
    • 实现失败重试
    • 记录详细日志

使用场景

1. 外部审批流程

{
  "name": "approval_process",
  "taskReferenceName": "wait_approval",
  "type": "WAIT_FOR_WEBHOOK",
  "inputParameters": {
    "timeoutSeconds": 86400,
    "headers": {
      "x-approval-type": "manager_review",
      "x-request-id": "${workflow.workflowId}"
    }
  }
}

2. 异步处理结果

{
  "name": "async_process",
  "taskReferenceName": "wait_process",
  "type": "WAIT_FOR_WEBHOOK",
  "inputParameters": {
    "timeoutSeconds": 3600,
    "headers": {
      "x-process-id": "${workflow.input.processId}",
      "x-callback-token": "${workflow.input.callbackToken}"
    }
  }
}

3. 第三方集成

{
  "name": "third_party_integration",
  "taskReferenceName": "wait_integration",
  "type": "WAIT_FOR_WEBHOOK",
  "inputParameters": {
    "timeoutSeconds": 7200,
    "headers": {
      "x-integration-id": "${workflow.input.integrationId}",
      "x-api-version": "v1"
    }
  }
}

错误处理

常见错误

  1. 超时错误

    • 回调未在指定时间内到达
    • 外部系统处理时间过长
    • 网络延迟问题
  2. 验证错误

    • 无效的认证信息
    • 签名验证失败
    • 请求头缺失
  3. 数据错误

    • 无效的回调数据格式
    • 缺少必需字段
    • 数据类型不匹配

错误响应

{
  "status": "FAILED",
  "error": "WEBHOOK_TIMEOUT",
  "message": "Webhook callback not received within timeout period",
  "details": {
    "timeoutSeconds": 3600,
    "startTime": "2024-01-01T00:00:00Z",
    "endTime": "2024-01-01T01:00:00Z"
  }
}

监控和调试

  1. 日志记录

    • 记录回调请求
    • 追踪处理流程
    • 记录错误信息
  2. 指标收集

    • 回调响应时间
    • 成功/失败率
    • 超时统计
  3. 告警设置

    • 超时告警
    • 错误率告警
    • 系统状态监控

UI 配置指南

  1. 基本配置

    • 设置任务名称和引用名
    • 配置超时时间
    • 添加请求头
  2. 回调设置

    • 查看回调 URL
    • 配置验证规则
    • 设置重试策略
  3. 监控视图

    • 查看任务状态
    • 监控回调情况
    • 查看错误日志

安全建议

  1. 认证机制

    • 使用强认证方式
    • 实现请求签名
    • 验证调用方身份
  2. 数据保护

    • 使用 HTTPS
    • 加密敏感数据
    • 实施访问控制
  3. 审计日志

    • 记录所有操作
    • 保存详细信息
    • 支持追踪分析

飞流云