工作流版本控制

安全地更新工作流代码,确保现有工作流执行的兼容性。

核心概念

由于工作流代码必须是确定性的,修改工作流代码可能会导致历史不匹配。 工作流版本控制机制允许您安全地更新工作流代码,同时保持向后兼容性。

为什么需要版本控制?

工作流代码修改可能导致以下问题:

历史不匹配

修改后的代码与历史事件不匹配,导致工作流无法继续执行

非确定性结果

相同的历史可能产生不同的结果,违反确定性约束

版本控制机制

Temporal 使用 Workflow.getVersion() 方法来区分不同版本的工作流代码。

版本控制示例Java
import io.temporal.workflow.Workflow;

public class MyWorkflowImpl implements MyWorkflow {
    
    private final MyActivities activities = Workflow.newActivityStub(
        MyActivities.class,
        ActivityOptions.newBuilder()
            .setStartToCloseTimeout(Duration.ofSeconds(30))
            .build()
    );
    
    @Override
    public String execute(String input) {
        // 获取工作流版本
        String version = Workflow.getVersion(
            "activity-change",    // 变更 ID
            Workflow.DEFAULT_VERSION,   // 默认版本
            1                         // 新版本
        );
        
        if (version == Workflow.DEFAULT_VERSION) {
            // 旧版本:调用旧的活动
            return activities.processLegacy(input);
            
        } else if (version.equals("1")) {
            // 新版本:调用新的活动
            return activities.processNew(input);
            
        } else {
            // 未来版本
            throw new IllegalStateException("Unknown version: " + version);
        }
    }
}

版本控制使用场景

场景 1: 添加新的活动

在工作流中添加新的活动调用,但不影响现有执行。

Java
String version = Workflow.getVersion(
    "add-validation",
    Workflow.DEFAULT_VERSION,
    1
);

if (version == Workflow.DEFAULT_VERSION) {
    // 旧版本:没有验证步骤
    String result = activities.process(input);
    
} else {
    // 新版本:添加验证步骤
    activities.validate(input);
    String result = activities.process(input);
}
场景 2: 修改活动参数

修改活动的输入参数,需要确保新旧版本都能正常工作。

Java
String version = Workflow.getVersion(
    "change-activity-signature",
    Workflow.DEFAULT_VERSION,
    1
);

if (version == Workflow.DEFAULT_VERSION) {
    // 旧版本:使用旧的参数
    activities.processLegacy(input, timeout);
    
} else {
    // 新版本:使用新的参数对象
    ProcessRequest request = new ProcessRequest(input, timeout);
    activities.processNew(request);
}
场景 3: 更新活动实现

更新活动的实现逻辑,但保持接口不变。

注意: 更新活动实现不需要版本控制,因为活动代码是独立于工作流历史的。 只需重新部署 Worker 即可。

场景 4: 修改活动选项

修改活动的超时或重试配置。

Java
String version = Workflow.getVersion(
    "change-timeout",
    Workflow.DEFAULT_VERSION,
    1
);

ActivityOptions options;
if (version == Workflow.DEFAULT_VERSION) {
    // 旧版本:较短的超时
    options = ActivityOptions.newBuilder()
        .setStartToCloseTimeout(Duration.ofSeconds(10))
        .build();
        
} else {
    // 新版本:更长的超时
    options = ActivityOptions.newBuilder()
        .setStartToCloseTimeout(Duration.ofSeconds(30))
        .build();
}

MyActivities activities = Workflow.newActivityStub(
    MyActivities.class,
    options
);

最佳实践

使用有意义的变更 ID

使用描述性的变更 ID,如 "add-validation" 或 "change-timeout",便于理解代码意图

保持版本号递增

使用递增的整数作为版本号,便于追踪演进历史

保留旧版本代码

不要删除旧版本的代码,确保运行中的工作流能够继续执行

渐进式迁移

先部署新版本代码,等所有旧工作流完成后,再考虑删除旧版本代码

测试兼容性

在生产环境部署前,在测试环境验证新旧版本的兼容性

常见错误

❌ 错误做法
1

直接修改活动调用参数

会导致历史不匹配,运行中的工作流失败

2

删除旧版本代码

运行中的工作流需要旧版本代码才能继续执行

3

修改工作流接口定义

工作流接口定义变更会影响历史兼容性

✅ 正确做法
1

使用 Workflow.getVersion()

在代码变更前检查版本号,区分新旧逻辑

2

保留所有版本代码

确保运行中的工作流能够继续执行到完成

3

变更活动而非工作流

如果可能,只修改活动代码,不需要版本控制

完整示例

完整版本控制示例Java
import io.temporal.workflow.Workflow;
import java.time.Duration;

public class OrderWorkflowImpl implements OrderWorkflow {
    
    private final OrderActivities activities = Workflow.newActivityStub(
        OrderActivities.class,
        getActivityOptions()
    );
    
    private ActivityOptions getActivityOptions() {
        // 超时版本控制
        String timeoutVersion = Workflow.getVersion(
            "timeout-v2",
            Workflow.DEFAULT_VERSION,
            2
        );
        
        if (timeoutVersion == Workflow.DEFAULT_VERSION) {
            return ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(10))
                .build();
        } else {
            return ActivityOptions.newBuilder()
                .setStartToCloseTimeout(Duration.ofSeconds(30))
                .build();
        }
    }
    
    @Override
    public OrderResult processOrder(OrderRequest request) {
        // 验证版本控制
        String validationVersion = Workflow.getVersion(
            "add-validation",
            Workflow.DEFAULT_VERSION,
            1
        );
        
        if (validationVersion.equals("1")) {
            // 新版本:添加验证步骤
            activities.validateOrder(request);
        }
        
        // 处理版本控制
        String processVersion = Workflow.getVersion(
            "process-change",
            Workflow.DEFAULT_VERSION,
            1
        );
        
        if (processVersion == Workflow.DEFAULT_VERSION) {
            // 旧版本:直接处理
            return activities.processOrderLegacy(request);
            
        } else {
            // 新版本:使用新的处理逻辑
            activities.reserveInventory(request);
            String paymentId = activities.processPayment(request);
            activities.shipOrder(request);
            
            return new OrderResult(paymentId, "COMPLETED");
        }
    }
}

下一步