Java SDK

Temporal Java SDK 提供完整的 API 来开发可靠的工作流和活动。

安装

添加 Temporal SDK 依赖到您的项目。

Maven (pom.xml)Maven
<dependency>
    <groupId>io.temporal</groupId>
    <artifactId>temporal-sdk</artifactId>
    <version>1.23.1</version>
</dependency>
Gradle (build.gradle)Gradle
implementation 'io.temporal:temporal-sdk:1.23.1'

客户端配置

创建 WorkflowClient 连接到 Temporal Server。

创建客户端Java
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;

// 创建服务连接
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(
    WorkflowServiceStubsOptions.newBuilder()
        .setTarget("localhost:7233")
        .build()
);

// 创建工作流客户端
WorkflowClient client = WorkflowClient.newInstance(
    service,
    WorkflowClientOptions.newBuilder()
        .setNamespace("default")
        .build()
);

工作流定义

使用接口和注解定义工作流。

工作流接口Java
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.QueryMethod;

@WorkflowInterface
public interface MyWorkflow {
    
    @WorkflowMethod
    String execute(String input);
    
    @SignalMethod
    void updateValue(String value);
    
    @QueryMethod
    String getValue();
}
工作流实现Java
import io.temporal.workflow.Workflow;
import io.temporal.workflow.SignalMethod;

public class MyWorkflowImpl implements MyWorkflow {
    
    private String value = "";
    
    @Override
    public String execute(String input) {
        value = input;
        Workflow.await(() -> value.equals("done"));
        return "Completed: " + value;
    }
    
    @Override
    public void updateValue(String value) {
        this.value = value;
    }
    
    @Override
    public String getValue() {
        return value;
    }
}

活动定义

使用接口定义活动,实现类处理业务逻辑。

活动接口和实现Java
import io.temporal.activity.ActivityInterface;

// 活动接口
@ActivityInterface
public interface MyActivities {
    String process(String data);
    void saveResult(String result);
}

// 活动实现
public class MyActivitiesImpl implements MyActivities {
    
    @Override
    public String process(String data) {
        // 处理数据
        return data.toUpperCase();
    }
    
    @Override
    public void saveResult(String result) {
        // 保存结果到数据库
        System.out.println("Saving: " + result);
    }
}

Worker 配置

创建并启动 Worker 来处理工作流和活动任务。

Worker 主程序Java
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;

// 创建 Worker 工厂
WorkerFactory factory = WorkerFactory.newInstance(client);

// 创建 Worker
Worker worker = factory.newWorker("my-task-queue");

// 注册工作流和活动
worker.registerWorkflowImplementationTypes(MyWorkflowImpl.class);
worker.registerActivitiesImplementations(new MyActivitiesImpl());

// 启动 Worker
factory.start();

// 保持运行
System.out.println("Worker started...");
Thread.currentThread().join();

启动工作流

使用客户端启动工作流执行。

启动工作流Java
import io.temporal.client.WorkflowOptions;
import io.temporal.workflow.WorkflowExecution;

// 配置工作流选项
WorkflowOptions options = WorkflowOptions.newBuilder()
    .setTaskQueue("my-task-queue")
    .setWorkflowId("workflow-" + System.currentTimeMillis())
    .setExecutionTimeout(Duration.ofMinutes(10))
    .build();

// 获取工作流存根
MyWorkflow workflow = client.newWorkflowStub(MyWorkflow.class, options);

// 同步执行(等待结果)
String result = workflow.execute("input-data");

// 或异步执行
WorkflowExecution execution = WorkflowClient.start(
    workflow::execute,
    "input-data"
);

System.out.println("Workflow started: " + execution.getWorkflowId());

发送信号

向运行中的工作流发送信号。

发送信号Java
// 通过工作流 ID 获取工作流存根
MyWorkflow workflow = client.newWorkflowStub(
    MyWorkflow.class,
    "workflow-123456"
);

// 发送信号
workflow.updateValue("done");

查询工作流

查询工作流的当前状态。

查询状态Java
// 通过工作流 ID 获取工作流存根
MyWorkflow workflow = client.newWorkflowStub(
    MyWorkflow.class,
    "workflow-123456"
);

// 执行查询
String value = workflow.getValue();
System.out.println("Current value: " + value);

常用 API

Workflow API
Workflow.await(condition)

等待条件满足

Workflow.sleep(duration)

延迟执行

Workflow.newActivityStub()

创建活动存根

Activity API
Activity.heartbeat()

发送活动心跳

Activity.getExecutionContext()

获取活动上下文

Client API
client.newWorkflowStub()

获取工作流存根

client.newUntypedWorkflowStub()

获取无类型存根

Options API
ActivityOptions

活动选项配置

WorkflowOptions

工作流选项配置

RetryOptions

重试策略配置

完整示例

完整的工作流示例Java
// 1. 定义接口
@WorkflowInterface
public interface OrderWorkflow {
    @WorkflowMethod
    OrderResult processOrder(OrderRequest request);
}

@ActivityInterface
public interface OrderActivities {
    void validateOrder(OrderRequest request);
    String processPayment(OrderRequest request);
    void shipOrder(String orderId);
}

// 2. 实现工作流
public class OrderWorkflowImpl implements OrderWorkflow {
    private final OrderActivities activities = Workflow.newActivityStub(
        OrderActivities.class,
        ActivityOptions.newBuilder()
            .setStartToCloseTimeout(Duration.ofSeconds(30))
            .build()
    );
    
    @Override
    public OrderResult processOrder(OrderRequest request) {
        activities.validateOrder(request);
        String transactionId = activities.processPayment(request);
        activities.shipOrder(request.getOrderId());
        
        return new OrderResult(transactionId, "COMPLETED");
    }
}

// 3. 实现活动
public class OrderActivitiesImpl implements OrderActivities {
    @Override
    public void validateOrder(OrderRequest request) {
        // 验证逻辑
    }
    
    @Override
    public String processPayment(OrderRequest request) {
        // 支付处理
        return "TXN-" + System.currentTimeMillis();
    }
    
    @Override
    public void shipOrder(String orderId) {
        // 发货逻辑
    }
}

// 4. 启动 Worker
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker("order-task-queue");
worker.registerWorkflowImplementationTypes(OrderWorkflowImpl.class);
worker.registerActivitiesImplementations(new OrderActivitiesImpl());
factory.start();

// 5. 执行工作流
OrderWorkflow workflow = client.newWorkflowStub(
    OrderWorkflow.class,
    WorkflowOptions.newBuilder()
        .setTaskQueue("order-task-queue")
        .build()
);
OrderResult result = workflow.processOrder(new OrderRequest("order-123"));

更多资源