信号
信号允许您向运行中的工作流发送消息,实现外部与工作流的交互。
核心概念
信号是一种异步通信机制,允许客户端向运行中的工作流发送数据。 与查询不同,信号可以修改工作流状态,但不返回结果。
信号特性
异步通信
信号是异步发送的,不会阻塞客户端
修改状态
信号可以修改工作流的内部状态
可靠性保证
信号保证至少被处理一次
定义信号
在工作流接口中使用 @SignalMethod 注解定义信号方法。
信号定义Java
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.QueryMethod;
@WorkflowInterface
public interface OrderWorkflow {
@WorkflowMethod
OrderResult processOrder(OrderRequest request);
// 定义信号方法
@SignalMethod(name = "cancel-order")
void cancelOrder(String reason);
@SignalMethod(name = "update-shipping")
void updateShippingAddress(ShippingAddress address);
@QueryMethod
OrderStatus getStatus();
}实现信号
在工作流实现类中实现信号方法,处理接收到的信号。
信号实现Java
import io.temporal.workflow.Workflow;
import java.util.HashMap;
import java.util.Map;
public class OrderWorkflowImpl implements OrderWorkflow {
private OrderStatus status = OrderStatus.PENDING;
private String orderId;
private String cancellationReason = null;
private ShippingAddress shippingAddress;
private final OrderActivities activities = Workflow.newActivityStub(
OrderActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build()
);
@Override
public OrderResult processOrder(OrderRequest request) {
this.orderId = request.getOrderId();
try {
// 处理订单
status = OrderStatus.PROCESSING;
String paymentId = activities.processPayment(request);
// 检查是否被取消
if (cancellationReason != null) {
activities.refundPayment(paymentId);
return OrderResult.canceled(cancellationReason);
}
// 更新发货地址
if (shippingAddress != null) {
activities.updateShippingAddress(orderId, shippingAddress);
}
status = OrderStatus.COMPLETED;
return OrderResult.success(orderId);
} catch (Exception e) {
status = OrderStatus.FAILED;
throw e;
}
}
@Override
public void cancelOrder(String reason) {
// 只能在特定状态下取消
if (status == OrderStatus.PENDING ||
status == OrderStatus.PROCESSING) {
cancellationReason = reason;
status = OrderStatus.CANCELED;
}
}
@Override
public void updateShippingAddress(ShippingAddress address) {
// 可以在任何时候更新地址
this.shippingAddress = address;
}
@Override
public OrderStatus getStatus() {
return status;
}
}发送信号
使用客户端向运行中的工作流发送信号。
发送信号Java
import io.temporal.client.WorkflowClient;
// 方法 1: 通过工作流存根发送信号
OrderWorkflow workflow = client.newWorkflowStub(
OrderWorkflow.class,
"order-12345"
);
// 同步发送信号(等待确认)
workflow.cancelOrder("Customer requested cancellation");
// 方法 2: 通过 WorkflowSignalExternal 发送信号
OrderWorkflow workflow = client.newWorkflowStub(
OrderWorkflow.class,
"order-12345"
);
// 异步发送信号(不等待确认)
WorkflowExecution execution = WorkflowClient.signalWithStart(
workflow::processOrder,
orderRequest,
workflow::cancelOrder,
"Customer requested cancellation"
);
// 方法 3: 使用 SignalWithStart
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("order-task-queue")
.setWorkflowId("order-12345")
.build();
OrderWorkflow workflow = client.newWorkflowStub(
OrderWorkflow.class,
options
);
// 如果工作流不存在,先启动工作流,然后发送信号
WorkflowExecution execution = WorkflowClient.signalWithStart(
workflow::processOrder,
orderRequest,
workflow::cancelOrder,
"Cancel immediately"
);信号和工作流状态
信号可以修改工作流状态,工作流可以使用 Workflow.await() 等待信号。
等待信号Java
import io.temporal.workflow.Workflow;
public class ApprovalWorkflowImpl implements ApprovalWorkflow {
private boolean approved = false;
private boolean rejected = false;
private String approver = null;
@Override
public ApprovalResult requestApproval(ApprovalRequest request) {
// 发送通知
activities.sendApprovalRequest(request);
// 等待批准或拒绝
Workflow.await(() -> approved || rejected);
if (approved) {
return ApprovalResult.approved(approver);
} else {
return ApprovalResult.rejected("Request was rejected");
}
}
@Override
public void approve(String approver) {
this.approved = true;
this.approver = approver;
}
@Override
public void reject(String reason) {
this.rejected = true;
}
}信号保证
至少一次传递
信号保证至少被工作流处理一次,但可能被多次处理
顺序保证
来自同一个发送者的信号保持发送顺序
动态信号
使用动态信号处理未定义的信号类型。
动态信号处理Java
import io.temporal.workflow.DynamicSignalHandler;
public class DynamicWorkflowImpl implements DynamicWorkflow {
private final Map<String, Object> signalData = new HashMap<>();
@Override
public Object execute(Object[] args) {
// 等待信号
Workflow.await(() -> !signalData.isEmpty());
return signalData;
}
@Override
public void handleSignal(String signalName, Object[] args) {
// 处理所有未定义的信号
if ("update-status".equals(signalName)) {
signalData.put("status", args[0]);
} else if ("set-metadata".equals(signalName)) {
signalData.put("metadata", args[0]);
} else {
// 记录未知信号
Workflow.getLogger(getClass())
.warn("Unknown signal: " + signalName);
}
}
}最佳实践
幂等性处理
信号方法应该是幂等的,因为可能被多次调用
状态检查
在信号方法中检查当前状态,决定是否接受信号
记录信号历史
在工作流中记录接收到的信号,便于追踪和调试
使用有意义的名称
信号名称应该清晰表达其用途