工作流选项
通过 WorkflowOptions 配置工作流的执行行为、超时和重试策略。
核心概念
WorkflowOptions 允许您配置工作流的执行参数,包括任务队列、工作流 ID、超时设置等。 这些选项在启动工作流时指定,影响整个工作流的生命周期。
基本配置
创建 WorkflowOptions 对象并配置基本参数。
基本配置示例Java
import io.temporal.client.WorkflowOptions;
import java.time.Duration;
WorkflowOptions options = WorkflowOptions.newBuilder()
// 指定任务队列(必需)
.setTaskQueue("my-task-queue")
// 设置工作流 ID(可选,默认自动生成)
.setWorkflowId("workflow-123")
// 执行超时(从开始到完成的总时间)
.setExecutionTimeout(Duration.ofMinutes(10))
// 工作流运行超时(单个执行的超时)
.setWorkflowRunTimeout(Duration.ofMinutes(5))
// 工作流任务超时(单个工作流任务的超时)
.setWorkflowTaskTimeout(Duration.ofSeconds(10))
.build();任务队列
任务队列是连接客户端和 Worker 的通道。
配置任务队列Java
// 指定任务队列
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("payment-task-queue")
.build();
// Worker 需要监听相同的任务队列
Worker worker = factory.newWorker("payment-task-queue");任务队列的作用
- • 连接客户端和 Worker
- • 实现任务分发
- • 支持多版本 Worker
- • 实现任务优先级
最佳实践
- • 按业务域区分任务队列
- • 使用语义化的命名
- • 支持 Worker 版本升级
- • 实现灰度发布
工作流 ID
工作流 ID 是工作流的唯一标识符。
工作流 ID 配置Java
// 设置工作流 ID
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("order-task-queue")
// 使用业务 ID 作为工作流 ID
.setWorkflowId("order-" + orderId)
.build();
// 使用业务 ID 的好处:
// 1. 避免重复执行相同业务逻辑
// 2. 便于查询和追踪
// 3. 支持幂等性
// 不指定工作流 ID 时,Temporal 自动生成
WorkflowOptions autoId = WorkflowOptions.newBuilder()
.setTaskQueue("order-task-queue")
// workflowId 未设置,自动生成
.build();超时配置
配置不同级别的超时,确保工作流不会无限期运行。
ExecutionTimeout
工作流从开始到完成的总超时时间,包括所有重试和重试
WorkflowRunTimeout
单个工作流运行的超时时间,不包括重试
WorkflowTaskTimeout
单个工作流任务的超时时间,Worker 处理任务的时间限制
超时配置示例Java
import java.time.Duration;
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("payment-task-queue")
.setExecutionTimeout(Duration.ofHours(24)) // 总超时:24小时
.setWorkflowRunTimeout(Duration.ofMinutes(10)) // 运行超时:10分钟
.setWorkflowTaskTimeout(Duration.ofSeconds(10)) // 任务超时:10秒
.build();
// 超时关系:
// ExecutionTimeout >= WorkflowRunTimeout
// WorkflowRunTimeout >= WorkflowTaskTimeout
// WorkflowTaskTimeout >= 活动超时重试策略
配置工作流失败时的重试策略。
重试策略配置Java
import io.temporal.common.RetryOptions;
import java.time.Duration;
RetryOptions retryOptions = RetryOptions.newBuilder()
// 初始重试间隔
.setInitialInterval(Duration.ofSeconds(1))
// 回退系数(每次重试间隔乘以该系数)
.setBackoffCoefficient(2.0)
// 最大重试间隔
.setMaximumInterval(Duration.ofSeconds(10))
// 最大重试次数
.setMaximumAttempts(5)
// 不重试的异常类型
.setNonRetryableExceptions(
IllegalArgumentException.class
)
.build();
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("payment-task-queue")
.setRetryOptions(retryOptions)
.build();Memo 和 Search Attributes
在启动工作流时设置元数据和可搜索属性。
Memo 配置Java
import java.util.HashMap;
import java.util.Map;
// Memo:保存工作流的元数据(不可搜索)
Map<String, Object> memo = new HashMap<>();
memo.put("customerId", "cust-123");
memo.put("orderValue", 99.99);
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("order-task-queue")
.setMemo(memo)
.build();Search Attributes 配置Java
import io.temporal.common.SearchAttributes;
import java.util.HashMap;
import java.util.Map;
// Search Attributes:可搜索的属性
Map<String, Object> searchAttrs = new HashMap<>();
searchAttrs.put("CustomKeywordField", "premium");
searchAttrs.put("CustomIntField", 100);
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("order-task-queue")
.setSearchAttributes(searchAttrs)
.build();
// 之后可以使用 Search Attributes 查询工作流
List<WorkflowExecution> executions = client.listExecutions()
.setQuery("CustomKeywordField = 'premium'")
.getExecutions();Cron 工作流
配置定期执行的工作流。
Cron 配置Java
// Cron 表达式:每 5 分钟执行一次
String cronExpression = "*/5 * * * *";
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("report-task-queue")
.setCronSchedule(cronExpression)
.build();
// Cron 选项说明:
// * * * * *
// | | | | |
// | | | | +-- 星期几 (0-6, 0=周日)
// | | | +---- 月份 (1-12)
// | | +------ 日期 (1-31)
// | +-------- 小时 (0-23)
// +---------- 分钟 (0-59)
// 其他示例:
// "0 0 * * *" // 每天午夜
// "0 9 * * 1-5" // 工作日上午 9 点
// "0 0 1 * *" // 每月第一天
// "*/30 * * * *" // 每 30 分钟完整示例
完整配置示例Java
import io.temporal.client.WorkflowOptions;
import io.temporal.common.RetryOptions;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
// 配置重试策略
RetryOptions retryOptions = RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(1))
.setBackoffCoefficient(2.0)
.setMaximumInterval(Duration.ofSeconds(10))
.setMaximumAttempts(3)
.build();
// 配置 Memo
Map<String, Object> memo = new HashMap<>();
memo.put("customerId", customerId);
memo.put("customerType", "premium");
// 配置 Search Attributes
Map<String, Object> searchAttrs = new HashMap<>();
searchAttrs.put("CustomKeywordField", "payment");
// 创建工作流选项
WorkflowOptions options = WorkflowOptions.newBuilder()
// 必需配置
.setTaskQueue("payment-task-queue")
.setWorkflowId("payment-" + transactionId)
// 超时配置
.setExecutionTimeout(Duration.ofHours(1))
.setWorkflowRunTimeout(Duration.ofMinutes(10))
.setWorkflowTaskTimeout(Duration.ofSeconds(10))
// 重试策略
.setRetryOptions(retryOptions)
// Memo 和 Search Attributes
.setMemo(memo)
.setSearchAttributes(searchAttrs)
.build();
// 启动工作流
PaymentWorkflow workflow = client.newWorkflowStub(
PaymentWorkflow.class,
options
);
PaymentResult result = workflow.processPayment(request);最佳实践
合理设置超时
根据实际业务需求设置超时,避免过长导致资源浪费,过短导致误判失败
使用业务 ID
使用业务相关的 ID 作为工作流 ID,便于追踪和避免重复执行
区分可重试和不可重试错误
通过 NonRetryableExceptions 避免重试业务逻辑错误
按域划分任务队列
不同的业务域使用不同的任务队列,便于隔离和管理