活动

活动表示工作流中的单个操作或任务,通常是调用外部系统或执行计算。

核心概念

活动是工作流中执行非确定性操作的地方。与工作流不同,活动可以访问外部系统、 使用系统时间、执行随机数操作等。活动失败时会自动重试,确保可靠性。

活动特性

非确定性操作
可以调用外部 API、数据库、文件系统等非确定性操作
自动重试
活动失败时自动重试,可配置重试策略和最大重试次数
超时控制

支持多种超时设置,确保活动不会无限期运行

状态隔离
活动之间状态隔离,每次执行都是独立的

定义活动接口

使用 @ActivityInterface 注解定义活动接口。

PaymentActivities.javaJava
package io.temporal.sample;

import io.temporal.activity.ActivityInterface;

@ActivityInterface
public interface PaymentActivities {
    
    void validatePayment(PaymentRequest request);
    
    String chargePayment(PaymentRequest request);
    
    void refundPayment(String transactionId);
}

实现活动

创建类实现活动接口,实现具体的业务逻辑。

PaymentActivitiesImpl.javaJava
package io.temporal.sample;

import java.util.UUID;

public class PaymentActivitiesImpl implements PaymentActivities {
    
    @Override
    public void validatePayment(PaymentRequest request) {
        // 调用外部支付服务验证
        if (request.getCardNumber() == null || 
            request.getCardNumber().length() < 16) {
            throw new IllegalArgumentException("Invalid card number");
        }
        
        if (request.getAmount() <= 0) {
            throw new IllegalArgumentException("Invalid amount");
        }
        
        // 可以调用外部 API 进行验证
        // PaymentGatewayApi.validate(request);
    }
    
    @Override
    public String chargePayment(PaymentRequest request) {
        // 调用支付网关执行扣款
        // Transaction transaction = PaymentGatewayApi.charge(request);
        
        // 模拟返回交易 ID
        String transactionId = "TXN-" + UUID.randomUUID().toString();
        
        return transactionId;
    }
    
    @Override
    public void refundPayment(String transactionId) {
        // 调用支付网关执行退款
        // PaymentGatewayApi.refund(transactionId);
        
        System.out.println("Refunded transaction: " + transactionId);
    }
}

配置活动选项

在工作流中创建活动存根时,可以配置活动的超时和重试策略。

ActivityOptions 示例Java
import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import java.time.Duration;

// 创建活动选项
ActivityOptions options = ActivityOptions.newBuilder()
    // 设置活动超时(从开始到完成的总时间)
    .setStartToCloseTimeout(Duration.ofSeconds(30))
    
    // 设置重试策略
    .setRetryOptions(RetryOptions.newBuilder()
        .setInitialInterval(Duration.ofSeconds(1))
        .setBackoffCoefficient(2.0)
        .setMaximumInterval(Duration.ofSeconds(10))
        .setMaximumAttempts(3)
        .build())
    
    // 设置心跳超时(用于长时间运行的活动)
    .setHeartbeatTimeout(Duration.ofSeconds(5))
    
    .build();

// 创建活动存根
PaymentActivities activities = Workflow.newActivityStub(
    PaymentActivities.class,
    options
);

超时类型

StartToCloseTimeout

活动从开始到完成的总超时时间,包括重试时间

ScheduleToCloseTimeout

活动从调度到完成的总超时时间,包括在任务队列中等待的时间

ScheduleToStartTimeout

活动在任务队列中等待的最大时间

HeartbeatTimeout

长时间运行的活动必须定期发送心跳,防止超时

活动心跳

对于长时间运行的活动,需要定期发送心跳来保持活动活跃。

带心跳的活动Java
package io.temporal.sample;

import io.temporal.activity.Activity;

public class LongRunningActivitiesImpl implements LongRunningActivities {
    
    @Override
    public void processLargeData(DataRequest request) {
        int totalItems = request.getItems().size();
        int processedItems = 0;
        
        for (DataItem item : request.getItems()) {
            // 处理数据
            processItem(item);
            
            processedItems++;
            
            // 每 10 个项目发送一次心跳
            if (processedItems % 10 == 0) {
                Activity.heartbeat(processedItems);
                
                // 可以携带进度信息
                Activity.heartbeat(new Progress(processedItems, totalItems));
            }
        }
    }
    
    private void processItem(DataItem item) {
        // 实际处理逻辑
    }
}

本地活动

本地活动直接在 Worker 进程中执行,不通过网络调用,适用于快速、轻量级的操作。

适用场景

  • • 计算密集型但执行时间短(<1秒)
  • • 不需要调用外部系统
  • • 不需要持久化状态
本地活动配置Java
import io.temporal.activity.LocalActivityOptions;

LocalActivityOptions localOptions = LocalActivityOptions.newBuilder()
    .setStartToCloseTimeout(Duration.ofSeconds(1))
    .build();

ValidationActivities validation = Workflow.newLocalActivityStub(
    ValidationActivities.class,
    localOptions
);

注册活动

在 Worker 中注册活动实现类。

Worker 注册活动Java
import io.temporal.worker.WorkerFactory;

WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker("payment-task-queue");

// 注册活动实现类
worker.registerActivitiesImplementations(
    new PaymentActivitiesImpl(),
    new LongRunningActivitiesImpl()
);

factory.start();

最佳实践

幂等性

活动应该是幂等的,因为重试可能导致多次执行

合理的超时设置

根据活动实际执行时间设置超时,避免过长或过短

避免硬编码

使用配置文件或参数传递配置信息

错误处理

抛出特定异常以区分业务错误和系统错误

下一步