工作流

工作流定义了整个业务流程,是 Temporal 应用的核心构建块。

核心概念

工作流是定义业务流程的类,它使用 Temporal SDK 提供的 API 来协调活动的执行、 处理信号和查询,并维护状态。工作流代码必须是确定性的,以确保在不同时间和位置执行结果一致。

工作流特性

确定性执行
工作流代码必须是确定性的,相同的输入总是产生相同的输出
状态持久化
工作流状态自动保存到 Temporal Server,支持长时间运行的流程
可观测性
完整的执行历史记录,每个操作都可追踪和调试
自动重试
工作流失败时自动重试,确保业务流程最终完成

定义工作流接口

在 Java SDK 中,工作流通过接口定义。使用 @WorkflowInterface 注解标记接口, 并使用 @WorkflowMethod 注解标记工作流方法。

PaymentWorkflow.javaJava
package io.temporal.sample;

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.QueryMethod;

@WorkflowInterface
public interface PaymentWorkflow {
    
    @WorkflowMethod
    PaymentResult processPayment(PaymentRequest request);
    
    @SignalMethod
    void cancelPayment(String reason);
    
    @QueryMethod
    PaymentStatus getStatus();
}

实现工作流

创建类实现工作流接口。工作流实现类必须提供无参构造函数。

PaymentWorkflowImpl.javaJava
package io.temporal.sample;

import io.temporal.workflow.Workflow;
import io.temporal.activity.ActivityOptions;
import java.time.Duration;

public class PaymentWorkflowImpl implements PaymentWorkflow {
    
    private PaymentStatus status = PaymentStatus.PENDING;
    private String cancellationReason = null;
    
    private final PaymentActivities activities = Workflow.newActivityStub(
        PaymentActivities.class,
        ActivityOptions.newBuilder()
            .setStartToCloseTimeout(Duration.ofSeconds(30))
            .build()
    );
    
    @Override
    public PaymentResult processPayment(PaymentRequest request) {
        // 检查是否被取消
        if (cancellationReason != null) {
            return PaymentResult.canceled(cancellationReason);
        }
        
        status = PaymentStatus.PROCESSING;
        
        try {
            // 验证支付信息
            activities.validatePayment(request);
            
            // 检查是否被取消
            if (cancellationReason != null) {
                return PaymentResult.canceled(cancellationReason);
            }
            
            status = PaymentStatus.CHARGING;
            
            // 执行扣款
            String transactionId = activities.chargePayment(request);
            
            status = PaymentStatus.COMPLETED;
            return PaymentResult.success(transactionId);
            
        } catch (Exception e) {
            status = PaymentStatus.FAILED;
            throw e;
        }
    }
    
    @Override
    public void cancelPayment(String reason) {
        if (status == PaymentStatus.PENDING || 
            status == PaymentStatus.PROCESSING) {
            cancellationReason = reason;
            status = PaymentStatus.CANCELED;
        }
    }
    
    @Override
    public PaymentStatus getStatus() {
        return status;
    }
}

工作流功能

调用活动

使用 Workflow.newActivityStub() 创建活动存根,然后调用活动方法

等待定时器

使用 Workflow.await()Workflow.sleep() 延迟执行

处理信号

使用 @SignalMethod 注解定义信号方法

响应查询

使用 @QueryMethod 注解定义查询方法

确定性约束

重要:必须遵守的规则

  • • 不要在工作流中直接调用外部系统(HTTP、数据库等)
  • • 不要使用 System.currentTimeMillis()Date(),使用 Workflow.currentTimeMillis()
  • • 不要使用 Math.random(),使用确定性的随机数生成器
  • • 不要记录敏感信息到工作流历史
  • • 不要修改工作流接口定义(会导致历史不兼容)

注册工作流

在 Worker 中注册工作流实现类,使其能够接收和执行工作流任务。

WorkerMain.javaJava
import io.temporal.worker.WorkerFactory;

// 创建 Worker 工厂
WorkerFactory factory = WorkerFactory.newInstance(client);

// 创建 Worker 并指定任务队列
Worker worker = factory.newWorker("payment-task-queue");

// 注册工作流实现类
worker.registerWorkflowImplementationTypes(
    PaymentWorkflowImpl.class
);

// 启动 Worker
factory.start();

执行工作流

使用客户端创建工作流存根并调用工作流方法。

PaymentClient.javaJava
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.workflow.WorkflowExecution;

// 配置工作流选项
WorkflowOptions options = WorkflowOptions.newBuilder()
    .setTaskQueue("payment-task-queue")
    .setWorkflowId("payment-" + requestId)
    .build();

// 获取工作流存根
PaymentWorkflow workflow = client.newWorkflowStub(
    PaymentWorkflow.class,
    options
);

// 同步执行(等待结果)
PaymentResult result = workflow.processPayment(request);

// 或异步执行(获取 WorkflowExecution)
WorkflowExecution execution = WorkflowClient.start(
    workflow::processPayment, 
    request
);

下一步