查询

查询允许您读取运行中或已完成的工作流的当前状态,而不影响工作流执行。

核心概念

查询是一种只读操作,不会修改工作流状态。与信号不同,查询会立即返回结果, 不会等待工作流执行到特定位置。

查询特性

只读操作

查询不会修改工作流状态

即时返回

查询立即返回结果,不等待工作流执行

无副作用

查询不会记录到工作流历史中

定义查询

在工作流接口中使用 @QueryMethod 注解定义查询方法。

查询定义Java
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.QueryMethod;

@WorkflowInterface
public interface OrderWorkflow {
    
    @WorkflowMethod
    OrderResult processOrder(OrderRequest request);
    
    @SignalMethod
    void cancelOrder(String reason);
    
    // 定义查询方法
    @QueryMethod(name = "get-status")
    OrderStatus getStatus();
    
    @QueryMethod(name = "get-details")
    OrderDetails getDetails();
    
    @QueryMethod(name = "get-progress")
    ProgressInfo getProgress();
}

实现查询

在工作流实现类中实现查询方法,返回当前状态信息。

查询实现Java
public class OrderWorkflowImpl implements OrderWorkflow {
    
    private OrderStatus status = OrderStatus.PENDING;
    private String orderId;
    private String paymentId;
    private String shippingId;
    private int totalSteps = 5;
    private int completedSteps = 0;
    private OrderRequest request;
    
    @Override
    public OrderResult processOrder(OrderRequest request) {
        this.request = request;
        this.orderId = request.getOrderId();
        
        try {
            // 步骤 1
            status = OrderStatus.VALIDATING;
            activities.validateOrder(request);
            completedSteps++;
            
            // 步骤 2
            status = OrderStatus.PROCESSING_PAYMENT;
            paymentId = activities.processPayment(request);
            completedSteps++;
            
            // 步骤 3
            status = OrderStatus.PREPARING;
            activities.prepareOrder(orderId);
            completedSteps++;
            
            // 步骤 4
            status = OrderStatus.SHIPPING;
            shippingId = activities.shipOrder(orderId);
            completedSteps++;
            
            // 步骤 5
            status = OrderStatus.COMPLETED;
            completedSteps++;
            
            return OrderResult.success(orderId);
            
        } catch (Exception e) {
            status = OrderStatus.FAILED;
            throw e;
        }
    }
    
    @Override
    public OrderStatus getStatus() {
        // 返回当前状态
        return status;
    }
    
    @Override
    public OrderDetails getDetails() {
        // 返回订单详细信息
        return new OrderDetails(
            orderId,
            paymentId,
            shippingId,
            status,
            request
        );
    }
    
    @Override
    public ProgressInfo getProgress() {
        // 返回进度信息
        return new ProgressInfo(
            completedSteps,
            totalSteps,
            (completedSteps * 100.0) / totalSteps
        );
    }
    
    @Override
    public void cancelOrder(String reason) {
        if (status != OrderStatus.COMPLETED) {
            status = OrderStatus.CANCELED;
        }
    }
}

执行查询

使用客户端查询运行中或已完成的工作流状态。

执行查询Java
import io.temporal.client.WorkflowClient;

// 通过工作流 ID 获取工作流存根
OrderWorkflow workflow = client.newWorkflowStub(
    OrderWorkflow.class,
    "order-12345"
);

// 查询状态
OrderStatus status = workflow.getStatus();
System.out.println("Status: " + status);

// 查询详细信息
OrderDetails details = workflow.getDetails();
System.out.println("Order ID: " + details.getOrderId());
System.out.println("Payment ID: " + details.getPaymentId());
System.out.println("Shipping ID: " + details.getShippingId());

// 查询进度
ProgressInfo progress = workflow.getProgress();
System.out.println("Progress: " + progress.getPercentage() + "%");
System.out.println("Completed: " + progress.getCompletedSteps() + 
                   "/" + progress.getTotalSteps());

查询条件

查询方法可以根据参数返回不同的结果。

带参数的查询Java
@QueryMethod(name = "get-history")
List<ActivityInfo> getActivityHistory(String type);

@QueryMethod(name = "get-history")
List<ActivityInfo> getActivityHistory(String type, int limit);

// 实现
@Override
public List<ActivityInfo> getActivityHistory(String type) {
    return getActivityHistory(type, Integer.MAX_VALUE);
}

@Override
public List<ActivityInfo> getActivityHistory(String type, int limit) {
    return activityHistory.stream()
        .filter(info -> info.getType().equals(type))
        .limit(limit)
        .collect(Collectors.toList());
}

// 使用
List<ActivityInfo> allActivities = workflow.getActivityHistory("all");
List<ActivityInfo> recentActivities = workflow.getActivityHistory("payment", 5);

查询约束

重要约束

  • • 查询方法不能修改工作流状态
  • • 查询方法不能调用活动
  • • 查询方法不能使用 Workflow.await()
  • • 查询方法不能抛出非确定性异常
  • • 查询方法必须快速返回(通常 < 1 秒)

动态查询

使用动态查询处理未定义的查询类型。

动态查询处理Java
import io.temporal.workflow.DynamicQueryHandler;
import io.temporal.workflow.DynamicWorkflow;

public class DynamicWorkflowImpl implements DynamicWorkflow {
    
    private final Map<String, Object> state = new HashMap<>();
    
    @Override
    public Object execute(Object[] args) {
        // 工作流逻辑
        return "result";
    }
    
    @Override
    public Object handleQuery(String queryName, Object[] args) {
        // 处理所有未定义的查询
        switch (queryName) {
            case "get-state":
                return state;
                
            case "get-value":
                if (args.length > 0) {
                    return state.get(args[0]);
                }
                return null;
                
            case "get-all-keys":
                return new ArrayList<>(state.keySet());
                
            default:
                throw new IllegalArgumentException("Unknown query: " + queryName);
        }
    }
}

查询 vs 信号

特性查询信号
修改状态
返回值
记录历史
异步执行
执行时间立即返回异步处理

最佳实践

保持查询简单

查询方法应该快速返回,避免复杂计算

只读操作

查询方法不应该修改任何工作流状态

缓存状态

在工作流中缓存状态,避免每次查询都重新计算

使用有意义的名称

查询名称应该清晰表达返回的数据类型

下一步