工作流
工作流定义了整个业务流程,是 Temporal 应用的核心构建块。
核心概念
工作流是定义业务流程的类,它使用 Temporal SDK 提供的 API 来协调活动的执行、 处理信号和查询,并维护状态。工作流代码必须是确定性的,以确保在不同时间和位置执行结果一致。
工作流特性
确定性执行
工作流代码必须是确定性的,相同的输入总是产生相同的输出
状态持久化
工作流状态自动保存到 Temporal Server,支持长时间运行的流程
可观测性
完整的执行历史记录,每个操作都可追踪和调试
自动重试
工作流失败时自动重试,确保业务流程最终完成
定义工作流接口
在 Java SDK 中,工作流通过接口定义。使用 @WorkflowInterface 注解标记接口, 并使用 @WorkflowMethod 注解标记工作流方法。
PaymentWorkflow.javaJava
package io.temporal.sample;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.QueryMethod;
@WorkflowInterface
public interface PaymentWorkflow {
@WorkflowMethod
PaymentResult processPayment(PaymentRequest request);
@SignalMethod
void cancelPayment(String reason);
@QueryMethod
PaymentStatus getStatus();
}实现工作流
创建类实现工作流接口。工作流实现类必须提供无参构造函数。
PaymentWorkflowImpl.javaJava
package io.temporal.sample;
import io.temporal.workflow.Workflow;
import io.temporal.activity.ActivityOptions;
import java.time.Duration;
public class PaymentWorkflowImpl implements PaymentWorkflow {
private PaymentStatus status = PaymentStatus.PENDING;
private String cancellationReason = null;
private final PaymentActivities activities = Workflow.newActivityStub(
PaymentActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build()
);
@Override
public PaymentResult processPayment(PaymentRequest request) {
// 检查是否被取消
if (cancellationReason != null) {
return PaymentResult.canceled(cancellationReason);
}
status = PaymentStatus.PROCESSING;
try {
// 验证支付信息
activities.validatePayment(request);
// 检查是否被取消
if (cancellationReason != null) {
return PaymentResult.canceled(cancellationReason);
}
status = PaymentStatus.CHARGING;
// 执行扣款
String transactionId = activities.chargePayment(request);
status = PaymentStatus.COMPLETED;
return PaymentResult.success(transactionId);
} catch (Exception e) {
status = PaymentStatus.FAILED;
throw e;
}
}
@Override
public void cancelPayment(String reason) {
if (status == PaymentStatus.PENDING ||
status == PaymentStatus.PROCESSING) {
cancellationReason = reason;
status = PaymentStatus.CANCELED;
}
}
@Override
public PaymentStatus getStatus() {
return status;
}
}工作流功能
调用活动
使用 Workflow.newActivityStub() 创建活动存根,然后调用活动方法
等待定时器
使用 Workflow.await() 或 Workflow.sleep() 延迟执行
处理信号
使用 @SignalMethod 注解定义信号方法
响应查询
使用 @QueryMethod 注解定义查询方法
确定性约束
重要:必须遵守的规则
- • 不要在工作流中直接调用外部系统(HTTP、数据库等)
- • 不要使用
System.currentTimeMillis()或Date(),使用Workflow.currentTimeMillis() - • 不要使用
Math.random(),使用确定性的随机数生成器 - • 不要记录敏感信息到工作流历史
- • 不要修改工作流接口定义(会导致历史不兼容)
注册工作流
在 Worker 中注册工作流实现类,使其能够接收和执行工作流任务。
WorkerMain.javaJava
import io.temporal.worker.WorkerFactory;
// 创建 Worker 工厂
WorkerFactory factory = WorkerFactory.newInstance(client);
// 创建 Worker 并指定任务队列
Worker worker = factory.newWorker("payment-task-queue");
// 注册工作流实现类
worker.registerWorkflowImplementationTypes(
PaymentWorkflowImpl.class
);
// 启动 Worker
factory.start();执行工作流
使用客户端创建工作流存根并调用工作流方法。
PaymentClient.javaJava
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.workflow.WorkflowExecution;
// 配置工作流选项
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("payment-task-queue")
.setWorkflowId("payment-" + requestId)
.build();
// 获取工作流存根
PaymentWorkflow workflow = client.newWorkflowStub(
PaymentWorkflow.class,
options
);
// 同步执行(等待结果)
PaymentResult result = workflow.processPayment(request);
// 或异步执行(获取 WorkflowExecution)
WorkflowExecution execution = WorkflowClient.start(
workflow::processPayment,
request
);