工作流版本控制
安全地更新工作流代码,确保现有工作流执行的兼容性。
核心概念
由于工作流代码必须是确定性的,修改工作流代码可能会导致历史不匹配。 工作流版本控制机制允许您安全地更新工作流代码,同时保持向后兼容性。
为什么需要版本控制?
工作流代码修改可能导致以下问题:
历史不匹配
修改后的代码与历史事件不匹配,导致工作流无法继续执行
非确定性结果
相同的历史可能产生不同的结果,违反确定性约束
版本控制机制
Temporal 使用 Workflow.getVersion() 方法来区分不同版本的工作流代码。
版本控制示例Java
import io.temporal.workflow.Workflow;
public class MyWorkflowImpl implements MyWorkflow {
private final MyActivities activities = Workflow.newActivityStub(
MyActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build()
);
@Override
public String execute(String input) {
// 获取工作流版本
String version = Workflow.getVersion(
"activity-change", // 变更 ID
Workflow.DEFAULT_VERSION, // 默认版本
1 // 新版本
);
if (version == Workflow.DEFAULT_VERSION) {
// 旧版本:调用旧的活动
return activities.processLegacy(input);
} else if (version.equals("1")) {
// 新版本:调用新的活动
return activities.processNew(input);
} else {
// 未来版本
throw new IllegalStateException("Unknown version: " + version);
}
}
}版本控制使用场景
场景 1: 添加新的活动
在工作流中添加新的活动调用,但不影响现有执行。
Java
String version = Workflow.getVersion(
"add-validation",
Workflow.DEFAULT_VERSION,
1
);
if (version == Workflow.DEFAULT_VERSION) {
// 旧版本:没有验证步骤
String result = activities.process(input);
} else {
// 新版本:添加验证步骤
activities.validate(input);
String result = activities.process(input);
}场景 2: 修改活动参数
修改活动的输入参数,需要确保新旧版本都能正常工作。
Java
String version = Workflow.getVersion(
"change-activity-signature",
Workflow.DEFAULT_VERSION,
1
);
if (version == Workflow.DEFAULT_VERSION) {
// 旧版本:使用旧的参数
activities.processLegacy(input, timeout);
} else {
// 新版本:使用新的参数对象
ProcessRequest request = new ProcessRequest(input, timeout);
activities.processNew(request);
}场景 3: 更新活动实现
更新活动的实现逻辑,但保持接口不变。
注意: 更新活动实现不需要版本控制,因为活动代码是独立于工作流历史的。 只需重新部署 Worker 即可。
场景 4: 修改活动选项
修改活动的超时或重试配置。
Java
String version = Workflow.getVersion(
"change-timeout",
Workflow.DEFAULT_VERSION,
1
);
ActivityOptions options;
if (version == Workflow.DEFAULT_VERSION) {
// 旧版本:较短的超时
options = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(10))
.build();
} else {
// 新版本:更长的超时
options = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build();
}
MyActivities activities = Workflow.newActivityStub(
MyActivities.class,
options
);最佳实践
使用有意义的变更 ID
使用描述性的变更 ID,如 "add-validation" 或 "change-timeout",便于理解代码意图
保持版本号递增
使用递增的整数作为版本号,便于追踪演进历史
保留旧版本代码
不要删除旧版本的代码,确保运行中的工作流能够继续执行
渐进式迁移
先部署新版本代码,等所有旧工作流完成后,再考虑删除旧版本代码
测试兼容性
在生产环境部署前,在测试环境验证新旧版本的兼容性
常见错误
❌ 错误做法
1
直接修改活动调用参数
会导致历史不匹配,运行中的工作流失败
2
删除旧版本代码
运行中的工作流需要旧版本代码才能继续执行
3
修改工作流接口定义
工作流接口定义变更会影响历史兼容性
✅ 正确做法
1
使用 Workflow.getVersion()
在代码变更前检查版本号,区分新旧逻辑
2
保留所有版本代码
确保运行中的工作流能够继续执行到完成
3
变更活动而非工作流
如果可能,只修改活动代码,不需要版本控制
完整示例
完整版本控制示例Java
import io.temporal.workflow.Workflow;
import java.time.Duration;
public class OrderWorkflowImpl implements OrderWorkflow {
private final OrderActivities activities = Workflow.newActivityStub(
OrderActivities.class,
getActivityOptions()
);
private ActivityOptions getActivityOptions() {
// 超时版本控制
String timeoutVersion = Workflow.getVersion(
"timeout-v2",
Workflow.DEFAULT_VERSION,
2
);
if (timeoutVersion == Workflow.DEFAULT_VERSION) {
return ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(10))
.build();
} else {
return ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build();
}
}
@Override
public OrderResult processOrder(OrderRequest request) {
// 验证版本控制
String validationVersion = Workflow.getVersion(
"add-validation",
Workflow.DEFAULT_VERSION,
1
);
if (validationVersion.equals("1")) {
// 新版本:添加验证步骤
activities.validateOrder(request);
}
// 处理版本控制
String processVersion = Workflow.getVersion(
"process-change",
Workflow.DEFAULT_VERSION,
1
);
if (processVersion == Workflow.DEFAULT_VERSION) {
// 旧版本:直接处理
return activities.processOrderLegacy(request);
} else {
// 新版本:使用新的处理逻辑
activities.reserveInventory(request);
String paymentId = activities.processPayment(request);
activities.shipOrder(request);
return new OrderResult(paymentId, "COMPLETED");
}
}
}