查询
查询允许您读取运行中或已完成的工作流的当前状态,而不影响工作流执行。
核心概念
查询是一种只读操作,不会修改工作流状态。与信号不同,查询会立即返回结果, 不会等待工作流执行到特定位置。
查询特性
只读操作
查询不会修改工作流状态
即时返回
查询立即返回结果,不等待工作流执行
无副作用
查询不会记录到工作流历史中
定义查询
在工作流接口中使用 @QueryMethod 注解定义查询方法。
查询定义Java
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.QueryMethod;
@WorkflowInterface
public interface OrderWorkflow {
@WorkflowMethod
OrderResult processOrder(OrderRequest request);
@SignalMethod
void cancelOrder(String reason);
// 定义查询方法
@QueryMethod(name = "get-status")
OrderStatus getStatus();
@QueryMethod(name = "get-details")
OrderDetails getDetails();
@QueryMethod(name = "get-progress")
ProgressInfo getProgress();
}实现查询
在工作流实现类中实现查询方法,返回当前状态信息。
查询实现Java
public class OrderWorkflowImpl implements OrderWorkflow {
private OrderStatus status = OrderStatus.PENDING;
private String orderId;
private String paymentId;
private String shippingId;
private int totalSteps = 5;
private int completedSteps = 0;
private OrderRequest request;
@Override
public OrderResult processOrder(OrderRequest request) {
this.request = request;
this.orderId = request.getOrderId();
try {
// 步骤 1
status = OrderStatus.VALIDATING;
activities.validateOrder(request);
completedSteps++;
// 步骤 2
status = OrderStatus.PROCESSING_PAYMENT;
paymentId = activities.processPayment(request);
completedSteps++;
// 步骤 3
status = OrderStatus.PREPARING;
activities.prepareOrder(orderId);
completedSteps++;
// 步骤 4
status = OrderStatus.SHIPPING;
shippingId = activities.shipOrder(orderId);
completedSteps++;
// 步骤 5
status = OrderStatus.COMPLETED;
completedSteps++;
return OrderResult.success(orderId);
} catch (Exception e) {
status = OrderStatus.FAILED;
throw e;
}
}
@Override
public OrderStatus getStatus() {
// 返回当前状态
return status;
}
@Override
public OrderDetails getDetails() {
// 返回订单详细信息
return new OrderDetails(
orderId,
paymentId,
shippingId,
status,
request
);
}
@Override
public ProgressInfo getProgress() {
// 返回进度信息
return new ProgressInfo(
completedSteps,
totalSteps,
(completedSteps * 100.0) / totalSteps
);
}
@Override
public void cancelOrder(String reason) {
if (status != OrderStatus.COMPLETED) {
status = OrderStatus.CANCELED;
}
}
}执行查询
使用客户端查询运行中或已完成的工作流状态。
执行查询Java
import io.temporal.client.WorkflowClient;
// 通过工作流 ID 获取工作流存根
OrderWorkflow workflow = client.newWorkflowStub(
OrderWorkflow.class,
"order-12345"
);
// 查询状态
OrderStatus status = workflow.getStatus();
System.out.println("Status: " + status);
// 查询详细信息
OrderDetails details = workflow.getDetails();
System.out.println("Order ID: " + details.getOrderId());
System.out.println("Payment ID: " + details.getPaymentId());
System.out.println("Shipping ID: " + details.getShippingId());
// 查询进度
ProgressInfo progress = workflow.getProgress();
System.out.println("Progress: " + progress.getPercentage() + "%");
System.out.println("Completed: " + progress.getCompletedSteps() +
"/" + progress.getTotalSteps());查询条件
查询方法可以根据参数返回不同的结果。
带参数的查询Java
@QueryMethod(name = "get-history")
List<ActivityInfo> getActivityHistory(String type);
@QueryMethod(name = "get-history")
List<ActivityInfo> getActivityHistory(String type, int limit);
// 实现
@Override
public List<ActivityInfo> getActivityHistory(String type) {
return getActivityHistory(type, Integer.MAX_VALUE);
}
@Override
public List<ActivityInfo> getActivityHistory(String type, int limit) {
return activityHistory.stream()
.filter(info -> info.getType().equals(type))
.limit(limit)
.collect(Collectors.toList());
}
// 使用
List<ActivityInfo> allActivities = workflow.getActivityHistory("all");
List<ActivityInfo> recentActivities = workflow.getActivityHistory("payment", 5);查询约束
重要约束
- • 查询方法不能修改工作流状态
- • 查询方法不能调用活动
- • 查询方法不能使用
Workflow.await() - • 查询方法不能抛出非确定性异常
- • 查询方法必须快速返回(通常 < 1 秒)
动态查询
使用动态查询处理未定义的查询类型。
动态查询处理Java
import io.temporal.workflow.DynamicQueryHandler;
import io.temporal.workflow.DynamicWorkflow;
public class DynamicWorkflowImpl implements DynamicWorkflow {
private final Map<String, Object> state = new HashMap<>();
@Override
public Object execute(Object[] args) {
// 工作流逻辑
return "result";
}
@Override
public Object handleQuery(String queryName, Object[] args) {
// 处理所有未定义的查询
switch (queryName) {
case "get-state":
return state;
case "get-value":
if (args.length > 0) {
return state.get(args[0]);
}
return null;
case "get-all-keys":
return new ArrayList<>(state.keySet());
default:
throw new IllegalArgumentException("Unknown query: " + queryName);
}
}
}查询 vs 信号
| 特性 | 查询 | 信号 |
|---|---|---|
| 修改状态 | 否 | 是 |
| 返回值 | 是 | 否 |
| 记录历史 | 否 | 是 |
| 异步执行 | 否 | 是 |
| 执行时间 | 立即返回 | 异步处理 |
最佳实践
保持查询简单
查询方法应该快速返回,避免复杂计算
只读操作
查询方法不应该修改任何工作流状态
缓存状态
在工作流中缓存状态,避免每次查询都重新计算
使用有意义的名称
查询名称应该清晰表达返回的数据类型