子工作流
子工作流允许您在一个工作流中编排其他工作流,实现复杂业务逻辑的模块化。
核心概念
子工作流是从父工作流启动的独立工作流实例。每个子工作流有自己的工作流 ID、 执行历史和状态,但与父工作流在同一个任务队列中执行。
为什么使用子工作流?
模块化
将复杂的业务逻辑拆分为多个可重用的工作流
层次化
构建工作流的层次结构,便于管理和追踪
并发执行
并发执行多个子工作流,提高效率
独立重试
每个子工作流可以独立重试,不影响其他子工作流
定义子工作流
子工作流是普通的工作流,可以被父工作流启动。
子工作流定义Java
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
// 子工作流接口
@WorkflowInterface
public interface PaymentWorkflow {
@WorkflowMethod
PaymentResult processPayment(PaymentRequest request);
}
// 子工作流实现
public class PaymentWorkflowImpl implements PaymentWorkflow {
private final PaymentActivities activities = Workflow.newActivityStub(
PaymentActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build()
);
@Override
public PaymentResult processPayment(PaymentRequest request) {
String transactionId = activities.chargePayment(request);
return PaymentResult.success(transactionId);
}
}启动子工作流
在父工作流中使用 Workflow.newChildWorkflowStub() 创建子工作流存根。
启动子工作流Java
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
public class OrderWorkflowImpl implements OrderWorkflow {
private final OrderActivities activities = Workflow.newActivityStub(
OrderActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build()
);
@Override
public OrderResult processOrder(OrderRequest request) {
// 步骤 1: 验证订单
activities.validateOrder(request);
// 步骤 2: 启动支付子工作流
ChildWorkflowOptions paymentOptions = ChildWorkflowOptions.newBuilder()
.setWorkflowId("payment-" + request.getOrderId())
.build();
PaymentWorkflow paymentWorkflow = Workflow.newChildWorkflowStub(
PaymentWorkflow.class,
paymentOptions
);
// 同步执行子工作流(等待结果)
PaymentResult paymentResult = paymentWorkflow.processPayment(
new PaymentRequest(request.getAmount(), request.getCardNumber())
);
if (!paymentResult.isSuccess()) {
return OrderResult.failed("Payment failed");
}
// 步骤 3: 备货
activities.prepareOrder(request.getOrderId());
// 步骤 4: 启动发货子工作流
ChildWorkflowOptions shippingOptions = ChildWorkflowOptions.newBuilder()
.setWorkflowId("shipping-" + request.getOrderId())
.build();
ShippingWorkflow shippingWorkflow = Workflow.newChildWorkflowStub(
ShippingWorkflow.class,
shippingOptions
);
// 同步执行子工作流
ShippingResult shippingResult = shippingWorkflow.shipOrder(
new ShippingRequest(request.getOrderId(), request.getAddress())
);
return OrderResult.success(
paymentResult.getTransactionId(),
shippingResult.getTrackingNumber()
);
}
}异步启动子工作流
使用 Workflow.newChildWorkflowStub() 加上 Async.function() 异步启动子工作流。
异步启动子工作流Java
import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;
import java.util.ArrayList;
import java.util.List;
public class BatchProcessingWorkflowImpl implements BatchProcessingWorkflow {
@Override
public BatchResult processBatch(BatchRequest batch) {
List<Promise<ItemResult>> promises = new ArrayList<>();
// 并发启动多个子工作流
for (Item item : batch.getItems()) {
ChildWorkflowOptions options = ChildWorkflowOptions.newBuilder()
.setWorkflowId("item-" + item.getId())
.build();
ItemWorkflow itemWorkflow = Workflow.newChildWorkflowStub(
ItemWorkflow.class,
options
);
// 异步启动子工作流
Promise<ItemResult> promise = Async.function(
itemWorkflow::processItem,
item
);
promises.add(promise);
}
// 等待所有子工作流完成
List<ItemResult> results = new ArrayList<>();
for (Promise<ItemResult> promise : promises) {
results.add(promise.get());
}
return new BatchResult(results);
}
}子工作流选项
使用 ChildWorkflowOptions 配置子工作流的执行参数。
子工作流选项Java
import io.temporal.workflow.ChildWorkflowOptions;
import java.time.Duration;
ChildWorkflowOptions options = ChildWorkflowOptions.newBuilder()
// 设置工作流 ID
.setWorkflowId("child-workflow-123")
// 设置任务队列(默认与父工作流相同)
.setTaskQueue("custom-task-queue")
// 设置工作流运行超时
.setWorkflowRunTimeout(Duration.ofMinutes(5))
// 设置工作流任务超时
.setWorkflowTaskTimeout(Duration.ofSeconds(10))
// 设置搜索属性
.setSearchAttributes(searchAttributes)
// 设置父关闭策略
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
.build();
// 父关闭策略说明:
// ABANDON: 子工作流继续运行(默认)
// REQUEST_CANCEL: 取消子工作流
// TERMINATE: 终止子工作流信号和查询子工作流
父工作流可以向子工作流发送信号或查询状态。
信号和查询子工作流Java
// 启动子工作流并获取执行信息
ChildWorkflowExecution execution = Workflow.executeChildWorkflow(
PaymentWorkflow.class,
paymentRequest,
paymentOptions
);
// 获取子工作流存根(用于发送信号)
PaymentWorkflow childWorkflow = Workflow.newChildWorkflowStub(
PaymentWorkflow.class,
execution.getWorkflowId()
);
// 发送信号
childWorkflow.cancelPayment("Customer requested cancellation");
// 获取子工作流存根(用于查询)
PaymentWorkflow queryStub = Workflow.newChildWorkflowStub(
PaymentWorkflow.class,
execution.getWorkflowId()
);
// 查询状态
PaymentStatus status = queryStub.getStatus();外部工作流
外部工作流是不直接作为子工作流启动,但可以引用和交互的工作流。
外部工作流Java
import io.temporal.workflow.ExternalWorkflowStub;
import io.temporal.workflow.WorkflowExecution;
// 通过工作流 ID 获取外部工作流存根
ExternalWorkflowStub externalWorkflow = Workflow.newExternalWorkflowStub(
PaymentWorkflow.class,
new WorkflowExecution("payment-12345")
);
// 发送信号
externalWorkflow.signal("cancelPayment", "Reason");
// 检查工作流是否存在
boolean exists = externalWorkflow.exists();
// 等待工作流完成
externalWorkflow.result(String.class);子工作流 vs 活动
| 特性 | 子工作流 | 活动 |
|---|---|---|
| 复杂性 | 复杂业务逻辑 | 简单任务 |
| 状态持久化 | 支持 | 不支持 |
| 可观测性 | 独立历史 | 无独立历史 |
| 超时 | 工作流运行超时 | 活动超时 |
| 适用场景 | 多步骤、长时间运行的流程 | 单次调用、快速执行 |
最佳实践
合理的粒度
子工作流应该代表一个完整的业务单元,避免过度拆分
层级控制
避免过深的嵌套层级,通常 2-3 层是合理的
并发控制
注意并发启动的子工作流数量,避免资源耗尽
错误处理
正确处理子工作流的失败,避免影响整个父工作流