Java SDK
Temporal Java SDK 提供完整的 API 来开发可靠的工作流和活动。
安装
添加 Temporal SDK 依赖到您的项目。
Maven (pom.xml)Maven
<dependency>
<groupId>io.temporal</groupId>
<artifactId>temporal-sdk</artifactId>
<version>1.23.1</version>
</dependency>Gradle (build.gradle)Gradle
implementation 'io.temporal:temporal-sdk:1.23.1'
客户端配置
创建 WorkflowClient 连接到 Temporal Server。
创建客户端Java
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
// 创建服务连接
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(
WorkflowServiceStubsOptions.newBuilder()
.setTarget("localhost:7233")
.build()
);
// 创建工作流客户端
WorkflowClient client = WorkflowClient.newInstance(
service,
WorkflowClientOptions.newBuilder()
.setNamespace("default")
.build()
);工作流定义
使用接口和注解定义工作流。
工作流接口Java
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.QueryMethod;
@WorkflowInterface
public interface MyWorkflow {
@WorkflowMethod
String execute(String input);
@SignalMethod
void updateValue(String value);
@QueryMethod
String getValue();
}工作流实现Java
import io.temporal.workflow.Workflow;
import io.temporal.workflow.SignalMethod;
public class MyWorkflowImpl implements MyWorkflow {
private String value = "";
@Override
public String execute(String input) {
value = input;
Workflow.await(() -> value.equals("done"));
return "Completed: " + value;
}
@Override
public void updateValue(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
}活动定义
使用接口定义活动,实现类处理业务逻辑。
活动接口和实现Java
import io.temporal.activity.ActivityInterface;
// 活动接口
@ActivityInterface
public interface MyActivities {
String process(String data);
void saveResult(String result);
}
// 活动实现
public class MyActivitiesImpl implements MyActivities {
@Override
public String process(String data) {
// 处理数据
return data.toUpperCase();
}
@Override
public void saveResult(String result) {
// 保存结果到数据库
System.out.println("Saving: " + result);
}
}Worker 配置
创建并启动 Worker 来处理工作流和活动任务。
Worker 主程序Java
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
// 创建 Worker 工厂
WorkerFactory factory = WorkerFactory.newInstance(client);
// 创建 Worker
Worker worker = factory.newWorker("my-task-queue");
// 注册工作流和活动
worker.registerWorkflowImplementationTypes(MyWorkflowImpl.class);
worker.registerActivitiesImplementations(new MyActivitiesImpl());
// 启动 Worker
factory.start();
// 保持运行
System.out.println("Worker started...");
Thread.currentThread().join();启动工作流
使用客户端启动工作流执行。
启动工作流Java
import io.temporal.client.WorkflowOptions;
import io.temporal.workflow.WorkflowExecution;
// 配置工作流选项
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("my-task-queue")
.setWorkflowId("workflow-" + System.currentTimeMillis())
.setExecutionTimeout(Duration.ofMinutes(10))
.build();
// 获取工作流存根
MyWorkflow workflow = client.newWorkflowStub(MyWorkflow.class, options);
// 同步执行(等待结果)
String result = workflow.execute("input-data");
// 或异步执行
WorkflowExecution execution = WorkflowClient.start(
workflow::execute,
"input-data"
);
System.out.println("Workflow started: " + execution.getWorkflowId());发送信号
向运行中的工作流发送信号。
发送信号Java
// 通过工作流 ID 获取工作流存根
MyWorkflow workflow = client.newWorkflowStub(
MyWorkflow.class,
"workflow-123456"
);
// 发送信号
workflow.updateValue("done");查询工作流
查询工作流的当前状态。
查询状态Java
// 通过工作流 ID 获取工作流存根
MyWorkflow workflow = client.newWorkflowStub(
MyWorkflow.class,
"workflow-123456"
);
// 执行查询
String value = workflow.getValue();
System.out.println("Current value: " + value);常用 API
Workflow API
Workflow.await(condition)等待条件满足
Workflow.sleep(duration)延迟执行
Workflow.newActivityStub()创建活动存根
Activity API
Activity.heartbeat()发送活动心跳
Activity.getExecutionContext()获取活动上下文
Client API
client.newWorkflowStub()获取工作流存根
client.newUntypedWorkflowStub()获取无类型存根
Options API
ActivityOptions活动选项配置
WorkflowOptions工作流选项配置
RetryOptions重试策略配置
完整示例
完整的工作流示例Java
// 1. 定义接口
@WorkflowInterface
public interface OrderWorkflow {
@WorkflowMethod
OrderResult processOrder(OrderRequest request);
}
@ActivityInterface
public interface OrderActivities {
void validateOrder(OrderRequest request);
String processPayment(OrderRequest request);
void shipOrder(String orderId);
}
// 2. 实现工作流
public class OrderWorkflowImpl implements OrderWorkflow {
private final OrderActivities activities = Workflow.newActivityStub(
OrderActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build()
);
@Override
public OrderResult processOrder(OrderRequest request) {
activities.validateOrder(request);
String transactionId = activities.processPayment(request);
activities.shipOrder(request.getOrderId());
return new OrderResult(transactionId, "COMPLETED");
}
}
// 3. 实现活动
public class OrderActivitiesImpl implements OrderActivities {
@Override
public void validateOrder(OrderRequest request) {
// 验证逻辑
}
@Override
public String processPayment(OrderRequest request) {
// 支付处理
return "TXN-" + System.currentTimeMillis();
}
@Override
public void shipOrder(String orderId) {
// 发货逻辑
}
}
// 4. 启动 Worker
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker("order-task-queue");
worker.registerWorkflowImplementationTypes(OrderWorkflowImpl.class);
worker.registerActivitiesImplementations(new OrderActivitiesImpl());
factory.start();
// 5. 执行工作流
OrderWorkflow workflow = client.newWorkflowStub(
OrderWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue("order-task-queue")
.build()
);
OrderResult result = workflow.processOrder(new OrderRequest("order-123"));