信号

信号允许您向运行中的工作流发送消息,实现外部与工作流的交互。

核心概念

信号是一种异步通信机制,允许客户端向运行中的工作流发送数据。 与查询不同,信号可以修改工作流状态,但不返回结果。

信号特性

异步通信

信号是异步发送的,不会阻塞客户端

修改状态

信号可以修改工作流的内部状态

可靠性保证

信号保证至少被处理一次

定义信号

在工作流接口中使用 @SignalMethod 注解定义信号方法。

信号定义Java
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.QueryMethod;

@WorkflowInterface
public interface OrderWorkflow {
    
    @WorkflowMethod
    OrderResult processOrder(OrderRequest request);
    
    // 定义信号方法
    @SignalMethod(name = "cancel-order")
    void cancelOrder(String reason);
    
    @SignalMethod(name = "update-shipping")
    void updateShippingAddress(ShippingAddress address);
    
    @QueryMethod
    OrderStatus getStatus();
}

实现信号

在工作流实现类中实现信号方法,处理接收到的信号。

信号实现Java
import io.temporal.workflow.Workflow;
import java.util.HashMap;
import java.util.Map;

public class OrderWorkflowImpl implements OrderWorkflow {
    
    private OrderStatus status = OrderStatus.PENDING;
    private String orderId;
    private String cancellationReason = null;
    private ShippingAddress shippingAddress;
    
    private final OrderActivities activities = Workflow.newActivityStub(
        OrderActivities.class,
        ActivityOptions.newBuilder()
            .setStartToCloseTimeout(Duration.ofSeconds(30))
            .build()
    );
    
    @Override
    public OrderResult processOrder(OrderRequest request) {
        this.orderId = request.getOrderId();
        
        try {
            // 处理订单
            status = OrderStatus.PROCESSING;
            String paymentId = activities.processPayment(request);
            
            // 检查是否被取消
            if (cancellationReason != null) {
                activities.refundPayment(paymentId);
                return OrderResult.canceled(cancellationReason);
            }
            
            // 更新发货地址
            if (shippingAddress != null) {
                activities.updateShippingAddress(orderId, shippingAddress);
            }
            
            status = OrderStatus.COMPLETED;
            return OrderResult.success(orderId);
            
        } catch (Exception e) {
            status = OrderStatus.FAILED;
            throw e;
        }
    }
    
    @Override
    public void cancelOrder(String reason) {
        // 只能在特定状态下取消
        if (status == OrderStatus.PENDING || 
            status == OrderStatus.PROCESSING) {
            cancellationReason = reason;
            status = OrderStatus.CANCELED;
        }
    }
    
    @Override
    public void updateShippingAddress(ShippingAddress address) {
        // 可以在任何时候更新地址
        this.shippingAddress = address;
    }
    
    @Override
    public OrderStatus getStatus() {
        return status;
    }
}

发送信号

使用客户端向运行中的工作流发送信号。

发送信号Java
import io.temporal.client.WorkflowClient;

// 方法 1: 通过工作流存根发送信号
OrderWorkflow workflow = client.newWorkflowStub(
    OrderWorkflow.class,
    "order-12345"
);

// 同步发送信号(等待确认)
workflow.cancelOrder("Customer requested cancellation");

// 方法 2: 通过 WorkflowSignalExternal 发送信号
OrderWorkflow workflow = client.newWorkflowStub(
    OrderWorkflow.class,
    "order-12345"
);

// 异步发送信号(不等待确认)
WorkflowExecution execution = WorkflowClient.signalWithStart(
    workflow::processOrder,
    orderRequest,
    workflow::cancelOrder,
    "Customer requested cancellation"
);

// 方法 3: 使用 SignalWithStart
WorkflowOptions options = WorkflowOptions.newBuilder()
    .setTaskQueue("order-task-queue")
    .setWorkflowId("order-12345")
    .build();

OrderWorkflow workflow = client.newWorkflowStub(
    OrderWorkflow.class,
    options
);

// 如果工作流不存在,先启动工作流,然后发送信号
WorkflowExecution execution = WorkflowClient.signalWithStart(
    workflow::processOrder,
    orderRequest,
    workflow::cancelOrder,
    "Cancel immediately"
);

信号和工作流状态

信号可以修改工作流状态,工作流可以使用 Workflow.await() 等待信号。

等待信号Java
import io.temporal.workflow.Workflow;

public class ApprovalWorkflowImpl implements ApprovalWorkflow {
    
    private boolean approved = false;
    private boolean rejected = false;
    private String approver = null;
    
    @Override
    public ApprovalResult requestApproval(ApprovalRequest request) {
        // 发送通知
        activities.sendApprovalRequest(request);
        
        // 等待批准或拒绝
        Workflow.await(() -> approved || rejected);
        
        if (approved) {
            return ApprovalResult.approved(approver);
        } else {
            return ApprovalResult.rejected("Request was rejected");
        }
    }
    
    @Override
    public void approve(String approver) {
        this.approved = true;
        this.approver = approver;
    }
    
    @Override
    public void reject(String reason) {
        this.rejected = true;
    }
}

信号保证

至少一次传递

信号保证至少被工作流处理一次,但可能被多次处理

顺序保证

来自同一个发送者的信号保持发送顺序

动态信号

使用动态信号处理未定义的信号类型。

动态信号处理Java
import io.temporal.workflow.DynamicSignalHandler;

public class DynamicWorkflowImpl implements DynamicWorkflow {
    
    private final Map<String, Object> signalData = new HashMap<>();
    
    @Override
    public Object execute(Object[] args) {
        // 等待信号
        Workflow.await(() -> !signalData.isEmpty());
        
        return signalData;
    }
    
    @Override
    public void handleSignal(String signalName, Object[] args) {
        // 处理所有未定义的信号
        if ("update-status".equals(signalName)) {
            signalData.put("status", args[0]);
        } else if ("set-metadata".equals(signalName)) {
            signalData.put("metadata", args[0]);
        } else {
            // 记录未知信号
            Workflow.getLogger(getClass())
                .warn("Unknown signal: " + signalName);
        }
    }
}

最佳实践

幂等性处理

信号方法应该是幂等的,因为可能被多次调用

状态检查

在信号方法中检查当前状态,决定是否接受信号

记录信号历史

在工作流中记录接收到的信号,便于追踪和调试

使用有意义的名称

信号名称应该清晰表达其用途

下一步