子工作流

子工作流允许您在一个工作流中编排其他工作流,实现复杂业务逻辑的模块化。

核心概念

子工作流是从父工作流启动的独立工作流实例。每个子工作流有自己的工作流 ID、 执行历史和状态,但与父工作流在同一个任务队列中执行。

为什么使用子工作流?

模块化

将复杂的业务逻辑拆分为多个可重用的工作流

层次化

构建工作流的层次结构,便于管理和追踪

并发执行

并发执行多个子工作流,提高效率

独立重试

每个子工作流可以独立重试,不影响其他子工作流

定义子工作流

子工作流是普通的工作流,可以被父工作流启动。

子工作流定义Java
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

// 子工作流接口
@WorkflowInterface
public interface PaymentWorkflow {
    @WorkflowMethod
    PaymentResult processPayment(PaymentRequest request);
}

// 子工作流实现
public class PaymentWorkflowImpl implements PaymentWorkflow {
    
    private final PaymentActivities activities = Workflow.newActivityStub(
        PaymentActivities.class,
        ActivityOptions.newBuilder()
            .setStartToCloseTimeout(Duration.ofSeconds(30))
            .build()
    );
    
    @Override
    public PaymentResult processPayment(PaymentRequest request) {
        String transactionId = activities.chargePayment(request);
        return PaymentResult.success(transactionId);
    }
}

启动子工作流

在父工作流中使用 Workflow.newChildWorkflowStub() 创建子工作流存根。

启动子工作流Java
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;

public class OrderWorkflowImpl implements OrderWorkflow {
    
    private final OrderActivities activities = Workflow.newActivityStub(
        OrderActivities.class,
        ActivityOptions.newBuilder()
            .setStartToCloseTimeout(Duration.ofSeconds(30))
            .build()
    );
    
    @Override
    public OrderResult processOrder(OrderRequest request) {
        // 步骤 1: 验证订单
        activities.validateOrder(request);
        
        // 步骤 2: 启动支付子工作流
        ChildWorkflowOptions paymentOptions = ChildWorkflowOptions.newBuilder()
            .setWorkflowId("payment-" + request.getOrderId())
            .build();
        
        PaymentWorkflow paymentWorkflow = Workflow.newChildWorkflowStub(
            PaymentWorkflow.class,
            paymentOptions
        );
        
        // 同步执行子工作流(等待结果)
        PaymentResult paymentResult = paymentWorkflow.processPayment(
            new PaymentRequest(request.getAmount(), request.getCardNumber())
        );
        
        if (!paymentResult.isSuccess()) {
            return OrderResult.failed("Payment failed");
        }
        
        // 步骤 3: 备货
        activities.prepareOrder(request.getOrderId());
        
        // 步骤 4: 启动发货子工作流
        ChildWorkflowOptions shippingOptions = ChildWorkflowOptions.newBuilder()
            .setWorkflowId("shipping-" + request.getOrderId())
            .build();
        
        ShippingWorkflow shippingWorkflow = Workflow.newChildWorkflowStub(
            ShippingWorkflow.class,
            shippingOptions
        );
        
        // 同步执行子工作流
        ShippingResult shippingResult = shippingWorkflow.shipOrder(
            new ShippingRequest(request.getOrderId(), request.getAddress())
        );
        
        return OrderResult.success(
            paymentResult.getTransactionId(),
            shippingResult.getTrackingNumber()
        );
    }
}

异步启动子工作流

使用 Workflow.newChildWorkflowStub() 加上 Async.function() 异步启动子工作流。

异步启动子工作流Java
import io.temporal.workflow.Async;
import io.temporal.workflow.Promise;
import java.util.ArrayList;
import java.util.List;

public class BatchProcessingWorkflowImpl implements BatchProcessingWorkflow {
    
    @Override
    public BatchResult processBatch(BatchRequest batch) {
        List<Promise<ItemResult>> promises = new ArrayList<>();
        
        // 并发启动多个子工作流
        for (Item item : batch.getItems()) {
            ChildWorkflowOptions options = ChildWorkflowOptions.newBuilder()
                .setWorkflowId("item-" + item.getId())
                .build();
            
            ItemWorkflow itemWorkflow = Workflow.newChildWorkflowStub(
                ItemWorkflow.class,
                options
            );
            
            // 异步启动子工作流
            Promise<ItemResult> promise = Async.function(
                itemWorkflow::processItem,
                item
            );
            promises.add(promise);
        }
        
        // 等待所有子工作流完成
        List<ItemResult> results = new ArrayList<>();
        for (Promise<ItemResult> promise : promises) {
            results.add(promise.get());
        }
        
        return new BatchResult(results);
    }
}

子工作流选项

使用 ChildWorkflowOptions 配置子工作流的执行参数。

子工作流选项Java
import io.temporal.workflow.ChildWorkflowOptions;
import java.time.Duration;

ChildWorkflowOptions options = ChildWorkflowOptions.newBuilder()
    // 设置工作流 ID
    .setWorkflowId("child-workflow-123")
    
    // 设置任务队列(默认与父工作流相同)
    .setTaskQueue("custom-task-queue")
    
    // 设置工作流运行超时
    .setWorkflowRunTimeout(Duration.ofMinutes(5))
    
    // 设置工作流任务超时
    .setWorkflowTaskTimeout(Duration.ofSeconds(10))
    
    // 设置搜索属性
    .setSearchAttributes(searchAttributes)
    
    // 设置父关闭策略
    .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
    
    .build();

// 父关闭策略说明:
// ABANDON: 子工作流继续运行(默认)
// REQUEST_CANCEL: 取消子工作流
// TERMINATE: 终止子工作流

信号和查询子工作流

父工作流可以向子工作流发送信号或查询状态。

信号和查询子工作流Java
// 启动子工作流并获取执行信息
ChildWorkflowExecution execution = Workflow.executeChildWorkflow(
    PaymentWorkflow.class,
    paymentRequest,
    paymentOptions
);

// 获取子工作流存根(用于发送信号)
PaymentWorkflow childWorkflow = Workflow.newChildWorkflowStub(
    PaymentWorkflow.class,
    execution.getWorkflowId()
);

// 发送信号
childWorkflow.cancelPayment("Customer requested cancellation");

// 获取子工作流存根(用于查询)
PaymentWorkflow queryStub = Workflow.newChildWorkflowStub(
    PaymentWorkflow.class,
    execution.getWorkflowId()
);

// 查询状态
PaymentStatus status = queryStub.getStatus();

外部工作流

外部工作流是不直接作为子工作流启动,但可以引用和交互的工作流。

外部工作流Java
import io.temporal.workflow.ExternalWorkflowStub;
import io.temporal.workflow.WorkflowExecution;

// 通过工作流 ID 获取外部工作流存根
ExternalWorkflowStub externalWorkflow = Workflow.newExternalWorkflowStub(
    PaymentWorkflow.class,
    new WorkflowExecution("payment-12345")
);

// 发送信号
externalWorkflow.signal("cancelPayment", "Reason");

// 检查工作流是否存在
boolean exists = externalWorkflow.exists();

// 等待工作流完成
externalWorkflow.result(String.class);

子工作流 vs 活动

特性子工作流活动
复杂性复杂业务逻辑简单任务
状态持久化支持不支持
可观测性独立历史无独立历史
超时工作流运行超时活动超时
适用场景多步骤、长时间运行的流程单次调用、快速执行

最佳实践

合理的粒度

子工作流应该代表一个完整的业务单元,避免过度拆分

层级控制

避免过深的嵌套层级,通常 2-3 层是合理的

并发控制

注意并发启动的子工作流数量,避免资源耗尽

错误处理

正确处理子工作流的失败,避免影响整个父工作流

下一步