活动
活动表示工作流中的单个操作或任务,通常是调用外部系统或执行计算。
核心概念
活动是工作流中执行非确定性操作的地方。与工作流不同,活动可以访问外部系统、 使用系统时间、执行随机数操作等。活动失败时会自动重试,确保可靠性。
活动特性
非确定性操作
可以调用外部 API、数据库、文件系统等非确定性操作
自动重试
活动失败时自动重试,可配置重试策略和最大重试次数
超时控制
支持多种超时设置,确保活动不会无限期运行
状态隔离
活动之间状态隔离,每次执行都是独立的
定义活动接口
使用 @ActivityInterface 注解定义活动接口。
PaymentActivities.javaJava
package io.temporal.sample;
import io.temporal.activity.ActivityInterface;
@ActivityInterface
public interface PaymentActivities {
void validatePayment(PaymentRequest request);
String chargePayment(PaymentRequest request);
void refundPayment(String transactionId);
}实现活动
创建类实现活动接口,实现具体的业务逻辑。
PaymentActivitiesImpl.javaJava
package io.temporal.sample;
import java.util.UUID;
public class PaymentActivitiesImpl implements PaymentActivities {
@Override
public void validatePayment(PaymentRequest request) {
// 调用外部支付服务验证
if (request.getCardNumber() == null ||
request.getCardNumber().length() < 16) {
throw new IllegalArgumentException("Invalid card number");
}
if (request.getAmount() <= 0) {
throw new IllegalArgumentException("Invalid amount");
}
// 可以调用外部 API 进行验证
// PaymentGatewayApi.validate(request);
}
@Override
public String chargePayment(PaymentRequest request) {
// 调用支付网关执行扣款
// Transaction transaction = PaymentGatewayApi.charge(request);
// 模拟返回交易 ID
String transactionId = "TXN-" + UUID.randomUUID().toString();
return transactionId;
}
@Override
public void refundPayment(String transactionId) {
// 调用支付网关执行退款
// PaymentGatewayApi.refund(transactionId);
System.out.println("Refunded transaction: " + transactionId);
}
}配置活动选项
在工作流中创建活动存根时,可以配置活动的超时和重试策略。
ActivityOptions 示例Java
import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import java.time.Duration;
// 创建活动选项
ActivityOptions options = ActivityOptions.newBuilder()
// 设置活动超时(从开始到完成的总时间)
.setStartToCloseTimeout(Duration.ofSeconds(30))
// 设置重试策略
.setRetryOptions(RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(1))
.setBackoffCoefficient(2.0)
.setMaximumInterval(Duration.ofSeconds(10))
.setMaximumAttempts(3)
.build())
// 设置心跳超时(用于长时间运行的活动)
.setHeartbeatTimeout(Duration.ofSeconds(5))
.build();
// 创建活动存根
PaymentActivities activities = Workflow.newActivityStub(
PaymentActivities.class,
options
);超时类型
StartToCloseTimeout
活动从开始到完成的总超时时间,包括重试时间
ScheduleToCloseTimeout
活动从调度到完成的总超时时间,包括在任务队列中等待的时间
ScheduleToStartTimeout
活动在任务队列中等待的最大时间
HeartbeatTimeout
长时间运行的活动必须定期发送心跳,防止超时
活动心跳
对于长时间运行的活动,需要定期发送心跳来保持活动活跃。
带心跳的活动Java
package io.temporal.sample;
import io.temporal.activity.Activity;
public class LongRunningActivitiesImpl implements LongRunningActivities {
@Override
public void processLargeData(DataRequest request) {
int totalItems = request.getItems().size();
int processedItems = 0;
for (DataItem item : request.getItems()) {
// 处理数据
processItem(item);
processedItems++;
// 每 10 个项目发送一次心跳
if (processedItems % 10 == 0) {
Activity.heartbeat(processedItems);
// 可以携带进度信息
Activity.heartbeat(new Progress(processedItems, totalItems));
}
}
}
private void processItem(DataItem item) {
// 实际处理逻辑
}
}本地活动
本地活动直接在 Worker 进程中执行,不通过网络调用,适用于快速、轻量级的操作。
适用场景
- • 计算密集型但执行时间短(<1秒)
- • 不需要调用外部系统
- • 不需要持久化状态
本地活动配置Java
import io.temporal.activity.LocalActivityOptions;
LocalActivityOptions localOptions = LocalActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(1))
.build();
ValidationActivities validation = Workflow.newLocalActivityStub(
ValidationActivities.class,
localOptions
);注册活动
在 Worker 中注册活动实现类。
Worker 注册活动Java
import io.temporal.worker.WorkerFactory;
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker("payment-task-queue");
// 注册活动实现类
worker.registerActivitiesImplementations(
new PaymentActivitiesImpl(),
new LongRunningActivitiesImpl()
);
factory.start();最佳实践
幂等性
活动应该是幂等的,因为重试可能导致多次执行
合理的超时设置
根据活动实际执行时间设置超时,避免过长或过短
避免硬编码
使用配置文件或参数传递配置信息
错误处理
抛出特定异常以区分业务错误和系统错误