工作流选项

通过 WorkflowOptions 配置工作流的执行行为、超时和重试策略。

核心概念

WorkflowOptions 允许您配置工作流的执行参数,包括任务队列、工作流 ID、超时设置等。 这些选项在启动工作流时指定,影响整个工作流的生命周期。

基本配置

创建 WorkflowOptions 对象并配置基本参数。

基本配置示例Java
import io.temporal.client.WorkflowOptions;
import java.time.Duration;

WorkflowOptions options = WorkflowOptions.newBuilder()
    // 指定任务队列(必需)
    .setTaskQueue("my-task-queue")
    
    // 设置工作流 ID(可选,默认自动生成)
    .setWorkflowId("workflow-123")
    
    // 执行超时(从开始到完成的总时间)
    .setExecutionTimeout(Duration.ofMinutes(10))
    
    // 工作流运行超时(单个执行的超时)
    .setWorkflowRunTimeout(Duration.ofMinutes(5))
    
    // 工作流任务超时(单个工作流任务的超时)
    .setWorkflowTaskTimeout(Duration.ofSeconds(10))
    
    .build();

任务队列

任务队列是连接客户端和 Worker 的通道。

配置任务队列Java
// 指定任务队列
WorkflowOptions options = WorkflowOptions.newBuilder()
    .setTaskQueue("payment-task-queue")
    .build();

// Worker 需要监听相同的任务队列
Worker worker = factory.newWorker("payment-task-queue");
任务队列的作用
  • • 连接客户端和 Worker
  • • 实现任务分发
  • • 支持多版本 Worker
  • • 实现任务优先级
最佳实践
  • • 按业务域区分任务队列
  • • 使用语义化的命名
  • • 支持 Worker 版本升级
  • • 实现灰度发布

工作流 ID

工作流 ID 是工作流的唯一标识符。

工作流 ID 配置Java
// 设置工作流 ID
WorkflowOptions options = WorkflowOptions.newBuilder()
    .setTaskQueue("order-task-queue")
    // 使用业务 ID 作为工作流 ID
    .setWorkflowId("order-" + orderId)
    .build();

// 使用业务 ID 的好处:
// 1. 避免重复执行相同业务逻辑
// 2. 便于查询和追踪
// 3. 支持幂等性

// 不指定工作流 ID 时,Temporal 自动生成
WorkflowOptions autoId = WorkflowOptions.newBuilder()
    .setTaskQueue("order-task-queue")
    // workflowId 未设置,自动生成
    .build();

超时配置

配置不同级别的超时,确保工作流不会无限期运行。

ExecutionTimeout

工作流从开始到完成的总超时时间,包括所有重试和重试

WorkflowRunTimeout

单个工作流运行的超时时间,不包括重试

WorkflowTaskTimeout

单个工作流任务的超时时间,Worker 处理任务的时间限制

超时配置示例Java
import java.time.Duration;

WorkflowOptions options = WorkflowOptions.newBuilder()
    .setTaskQueue("payment-task-queue")
    .setExecutionTimeout(Duration.ofHours(24))      // 总超时:24小时
    .setWorkflowRunTimeout(Duration.ofMinutes(10))  // 运行超时:10分钟
    .setWorkflowTaskTimeout(Duration.ofSeconds(10)) // 任务超时:10秒
    .build();

// 超时关系:
// ExecutionTimeout >= WorkflowRunTimeout
// WorkflowRunTimeout >= WorkflowTaskTimeout
// WorkflowTaskTimeout >= 活动超时

重试策略

配置工作流失败时的重试策略。

重试策略配置Java
import io.temporal.common.RetryOptions;
import java.time.Duration;

RetryOptions retryOptions = RetryOptions.newBuilder()
    // 初始重试间隔
    .setInitialInterval(Duration.ofSeconds(1))
    
    // 回退系数(每次重试间隔乘以该系数)
    .setBackoffCoefficient(2.0)
    
    // 最大重试间隔
    .setMaximumInterval(Duration.ofSeconds(10))
    
    // 最大重试次数
    .setMaximumAttempts(5)
    
    // 不重试的异常类型
    .setNonRetryableExceptions(
        IllegalArgumentException.class
    )
    
    .build();

WorkflowOptions options = WorkflowOptions.newBuilder()
    .setTaskQueue("payment-task-queue")
    .setRetryOptions(retryOptions)
    .build();

Memo 和 Search Attributes

在启动工作流时设置元数据和可搜索属性。

Memo 配置Java
import java.util.HashMap;
import java.util.Map;

// Memo:保存工作流的元数据(不可搜索)
Map<String, Object> memo = new HashMap<>();
memo.put("customerId", "cust-123");
memo.put("orderValue", 99.99);

WorkflowOptions options = WorkflowOptions.newBuilder()
    .setTaskQueue("order-task-queue")
    .setMemo(memo)
    .build();
Search Attributes 配置Java
import io.temporal.common.SearchAttributes;
import java.util.HashMap;
import java.util.Map;

// Search Attributes:可搜索的属性
Map<String, Object> searchAttrs = new HashMap<>();
searchAttrs.put("CustomKeywordField", "premium");
searchAttrs.put("CustomIntField", 100);

WorkflowOptions options = WorkflowOptions.newBuilder()
    .setTaskQueue("order-task-queue")
    .setSearchAttributes(searchAttrs)
    .build();

// 之后可以使用 Search Attributes 查询工作流
List<WorkflowExecution> executions = client.listExecutions()
    .setQuery("CustomKeywordField = 'premium'")
    .getExecutions();

Cron 工作流

配置定期执行的工作流。

Cron 配置Java
// Cron 表达式:每 5 分钟执行一次
String cronExpression = "*/5 * * * *";

WorkflowOptions options = WorkflowOptions.newBuilder()
    .setTaskQueue("report-task-queue")
    .setCronSchedule(cronExpression)
    .build();

// Cron 选项说明:
// * * * * *
// | | | | |
// | | | | +-- 星期几 (0-6, 0=周日)
// | | | +---- 月份 (1-12)
// | | +------ 日期 (1-31)
// | +-------- 小时 (0-23)
// +---------- 分钟 (0-59)

// 其他示例:
// "0 0 * * *"      // 每天午夜
// "0 9 * * 1-5"    // 工作日上午 9 点
// "0 0 1 * *"      // 每月第一天
// "*/30 * * * *"   // 每 30 分钟

完整示例

完整配置示例Java
import io.temporal.client.WorkflowOptions;
import io.temporal.common.RetryOptions;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

// 配置重试策略
RetryOptions retryOptions = RetryOptions.newBuilder()
    .setInitialInterval(Duration.ofSeconds(1))
    .setBackoffCoefficient(2.0)
    .setMaximumInterval(Duration.ofSeconds(10))
    .setMaximumAttempts(3)
    .build();

// 配置 Memo
Map<String, Object> memo = new HashMap<>();
memo.put("customerId", customerId);
memo.put("customerType", "premium");

// 配置 Search Attributes
Map<String, Object> searchAttrs = new HashMap<>();
searchAttrs.put("CustomKeywordField", "payment");

// 创建工作流选项
WorkflowOptions options = WorkflowOptions.newBuilder()
    // 必需配置
    .setTaskQueue("payment-task-queue")
    .setWorkflowId("payment-" + transactionId)
    
    // 超时配置
    .setExecutionTimeout(Duration.ofHours(1))
    .setWorkflowRunTimeout(Duration.ofMinutes(10))
    .setWorkflowTaskTimeout(Duration.ofSeconds(10))
    
    // 重试策略
    .setRetryOptions(retryOptions)
    
    // Memo 和 Search Attributes
    .setMemo(memo)
    .setSearchAttributes(searchAttrs)
    
    .build();

// 启动工作流
PaymentWorkflow workflow = client.newWorkflowStub(
    PaymentWorkflow.class,
    options
);
PaymentResult result = workflow.processPayment(request);

最佳实践

合理设置超时

根据实际业务需求设置超时,避免过长导致资源浪费,过短导致误判失败

使用业务 ID

使用业务相关的 ID 作为工作流 ID,便于追踪和避免重复执行

区分可重试和不可重试错误

通过 NonRetryableExceptions 避免重试业务逻辑错误

按域划分任务队列

不同的业务域使用不同的任务队列,便于隔离和管理

下一步