错误处理

学习如何在 Temporal 工作流和活动中正确处理错误和异常。

核心概念

Temporal 提供强大的错误处理机制,包括自动重试、错误传播和自定义异常处理。 理解不同类型的错误对于构建可靠的应用程序至关重要。

错误类型

Application Failure

应用程序逻辑错误,通常可重试

  • • 数据库连接超时
  • • API 调用失败
  • • 临时资源不可用
Activity Task Failure

活动执行失败,Worker 无法完成任务

  • • Worker 崩溃
  • • 活动超时
  • • 资源耗尽
Workflow Task Failure

工作流任务执行失败,通常是代码错误

  • • 工作流代码错误
  • • 违反确定性约束
  • • 历史不匹配
Terminated

工作流被外部终止,无法重试

  • • 手动终止
  • • 达到重试上限
  • • 执行超时

活动中的错误处理

活动中抛出的异常会触发重试机制。

活动错误处理Java
public class PaymentActivitiesImpl implements PaymentActivities {
    
    @Override
    public String chargePayment(PaymentRequest request) {
        try {
            // 调用支付网关
            PaymentResponse response = PaymentGateway.charge(request);
            
            if (!response.isSuccess()) {
                // 业务错误,抛出异常触发重试
                throw new ApplicationFailureException(
                    "Payment failed: " + response.getErrorMessage(),
                    "PaymentError",
                    true  // nonRetryable: true 表示不重试
                );
            }
            
            return response.getTransactionId();
            
        } catch (NetworkException e) {
            // 网络错误,自动重试
            throw new ApplicationFailureException(
                "Network error: " + e.getMessage(),
                "NetworkError",
                false  // nonRetryable: false 表示可重试
            );
        }
    }
}

工作流中的错误处理

在工作流中捕获和处理活动抛出的异常。

工作流错误处理Java
import io.temporal.failure.ApplicationFailure;

public class PaymentWorkflowImpl implements PaymentWorkflow {
    
    private final PaymentActivities activities = Workflow.newActivityStub(
        PaymentActivities.class,
        ActivityOptions.newBuilder()
            .setStartToCloseTimeout(Duration.ofSeconds(30))
            .setRetryOptions(RetryOptions.newBuilder()
                .setMaximumAttempts(3)
                .build())
            .build()
    );
    
    @Override
    public PaymentResult processPayment(PaymentRequest request) {
        try {
            // 尝试扣款
            String transactionId = activities.chargePayment(request);
            return PaymentResult.success(transactionId);
            
        } catch (ApplicationFailure e) {
            // 根据错误类型处理
            String errorType = e.getType();
            
            if ("PaymentError".equals(errorType)) {
                // 不可重试的支付错误,直接失败
                return PaymentResult.failed(e.getMessage());
                
            } else if ("NetworkError".equals(errorType)) {
                // 网络错误已自动重试,仍然失败
                return PaymentResult.failed("Payment failed after retries");
                
            } else {
                // 其他错误
                throw e;
            }
        }
    }
}

自定义异常

创建自定义异常类型以区分不同的业务错误。

自定义异常Java
// 定义自定义异常
public class InsufficientFundsException extends RuntimeException {
    private final double currentBalance;
    private final double requestedAmount;
    
    public InsufficientFundsException(double currentBalance, double requestedAmount) {
        super(String.format(
            "Insufficient funds. Current: %.2f, Requested: %.2f",
            currentBalance,
            requestedAmount
        ));
        this.currentBalance = currentBalance;
        this.requestedAmount = requestedAmount;
    }
    
    public double getCurrentBalance() {
        return currentBalance;
    }
    
    public double getRequestedAmount() {
        return requestedAmount;
    }
}

// 在活动中使用
public class AccountActivitiesImpl implements AccountActivities {
    
    @Override
    public void withdraw(String accountId, double amount) {
        Account account = database.getAccount(accountId);
        
        if (account.getBalance() < amount) {
            // 抛出业务异常
            throw new InsufficientFundsException(
                account.getBalance(),
                amount
            );
        }
        
        account.withdraw(amount);
        database.saveAccount(account);
    }
}

// 在工作流中捕获
public class TransferWorkflowImpl implements TransferWorkflow {
    
    @Override
    public void transfer(String from, String to, double amount) {
        try {
            activities.withdraw(from, amount);
            activities.deposit(to, amount);
            
        } catch (InsufficientFundsException e) {
            // 处理余额不足
            activities.notifyUser(from, 
                String.format("Transfer failed: %s", e.getMessage())
            );
        }
    }
}

重试策略

配置活动的重试策略,包括重试次数、间隔和回退系数。

重试策略配置Java
import io.temporal.common.RetryOptions;
import java.time.Duration;

// 配置重试策略
RetryOptions retryOptions = RetryOptions.newBuilder()
    // 初始重试间隔:1 秒
    .setInitialInterval(Duration.ofSeconds(1))
    
    // 回退系数:每次重试间隔乘以 2
    .setBackoffCoefficient(2.0)
    
    // 最大重试间隔:10 秒
    .setMaximumInterval(Duration.ofSeconds(10))
    
    // 最大重试次数:5 次
    .setMaximumAttempts(5)
    
    // 不重试的异常类型
    .setNonRetryableExceptions(
        InsufficientFundsException.class,
        InvalidAccountException.class
    )
    
    .build();

// 应用重试策略
ActivityOptions options = ActivityOptions.newBuilder()
    .setStartToCloseTimeout(Duration.ofMinutes(1))
    .setRetryOptions(retryOptions)
    .build();

PaymentActivities activities = Workflow.newActivityStub(
    PaymentActivities.class,
    options
);

// 重试时间线:
// 尝试 1: 立即执行
// 失败后等待: 1 秒
// 尝试 2: 1 秒后执行
// 失败后等待: 2 秒
// 尝试 3: 3 秒后执行
// 失败后等待: 4 秒
// 尝试 4: 7 秒后执行
// 失败后等待: 8 秒
// 尝试 5: 15 秒后执行

Compensation(补偿)

当工作流失败时,执行补偿操作撤销已完成的操作。

补偿模式Java
public class BookingWorkflowImpl implements BookingWorkflow {
    
    private final BookingActivities activities = Workflow.newActivityStub(
        BookingActivities.class,
        ActivityOptions.newBuilder()
            .setStartToCloseTimeout(Duration.ofSeconds(30))
            .build()
    );
    
    @Override
    public BookingResult bookTrip(TripRequest request) {
        List<String> bookedItems = new ArrayList<>();
        
        try {
            // 步骤 1: 预订机票
            String flightId = activities.bookFlight(request.getFlight());
            bookedItems.add(flightId);
            
            // 步骤 2: 预订酒店
            String hotelId = activities.bookHotel(request.getHotel());
            bookedItems.add(hotelId);
            
            // 步骤 3: 预订租车
            String carId = activities.bookCar(request.getCar());
            bookedItems.add(carId);
            
            return BookingResult.success(bookedItems);
            
        } catch (Exception e) {
            // 发生错误,执行补偿
            compensate(bookedItems);
            
            return BookingResult.failed(
                "Booking failed: " + e.getMessage()
            );
        }
    }
    
    private void compensate(List<String> bookedItems) {
        // 按相反顺序取消预订
        for (int i = bookedItems.size() - 1; i >= 0; i--) {
            try {
                activities.cancelBooking(bookedItems.get(i));
            } catch (Exception e) {
                // 补偿失败,记录日志但继续
                Workflow.getLogger(this.getClass())
                    .warn("Failed to cancel booking: " + bookedItems.get(i), e);
            }
        }
    }
}

Saga 模式

使用 Saga 模式管理跨多个服务的分布式事务。

Saga 模式说明

Saga 模式将分布式事务拆分为一系列本地事务,每个本地事务都有对应的补偿事务。 如果任何步骤失败,执行之前所有步骤的补偿事务,确保系统最终一致。

Saga 实现Java
public class SagaOrchestrator {
    
    private List<SagaStep> steps = new ArrayList<>();
    
    public <T> SagaOrchestrator step(
        String name,
        Supplier<T> action,
        Consumer<T> compensation
    ) {
        steps.add(new SagaStep<>(name, action, compensation));
        return this;
    }
    
    public void execute() {
        List<CompensableAction> completedActions = new ArrayList<>();
        
        try {
            for (SagaStep step : steps) {
                // 执行操作
                Object result = step.getAction().get();
                completedActions.add(new CompensableAction(step, result));
            }
            
        } catch (Exception e) {
            // 发生错误,执行补偿
            compensate(completedActions);
            throw new SagaExecutionException(
                "Saga failed", e
            );
        }
    }
    
    private void compensate(List<CompensableAction> actions) {
        // 按相反顺序执行补偿
        Collections.reverse(actions);
        
        for (CompensableAction action : actions) {
            try {
                action.getCompensation().accept(action.getResult());
            } catch (Exception e) {
                Workflow.getLogger(SagaOrchestrator.class)
                    .error("Compensation failed for: " + 
                           action.getStep().getName(), e);
            }
        }
    }
}

// 在工作流中使用
public class OrderWorkflowImpl implements OrderWorkflow {
    
    @Override
    public void processOrder(Order order) {
        new SagaOrchestrator()
            .step("reserve-inventory",
                () -> activities.reserveInventory(order),
                (result) -> activities.releaseInventory(order)
            )
            .step("process-payment",
                () -> activities.processPayment(order),
                (result) -> activities.refundPayment(result)
            )
            .step("confirm-order",
                () -> activities.confirmOrder(order),
                (result) -> activities.cancelOrder(order)
            )
            .execute();
    }
}

错误可观测性

使用 Temporal Web UI 查看错误详情和重试历史。

查看错误堆栈

在 Web UI 中查看完整的错误堆栈和失败原因

重试历史

查看每次重试的详细信息和时间线

输入输出

查看失败的输入参数和期望的输出

事件历史

追踪工作流的完整执行历史和事件

最佳实践

区分可重试和不可重试错误

业务逻辑错误(如余额不足)不应重试,临时错误(如网络超时)应重试

实现补偿机制

对于多步骤操作,实现补偿机制确保最终一致性

记录详细错误信息

在异常中包含足够的上下文信息,便于调试和监控

使用自定义异常

创建自定义异常类型以区分不同的业务错误场景

下一步