Go SDK

Temporal Go SDK 让您使用 Go 语言构建可靠的分布式应用程序。

快速开始

安装 Go SDK:

go get go.temporal.io/sdk

SDK 组成部分

Client
用于启动工作流、发送信号和查询的客户端库

go.temporal.io/sdk/client

Worker
用于运行工作流和活动的 Worker 库

go.temporal.io/sdk/worker

Workflow
定义工作流的 API 和上下文

go.temporal.io/sdk/workflow

创建客户端连接

client.go
package main

import (
    "log"
    "go.temporal.io/sdk/client"
)

func main() {
    // 创建客户端连接
    c, err := client.Dial(client.Options{
        HostPort: "localhost:7233",
    })
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }
    defer c.Close()
    
    // 使用客户端启动工作流...
}

创建 Worker

worker.go
package main

import (
    "log"
    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
)

func main() {
    // 创建客户端连接
    c, err := client.Dial(client.Options{
        HostPort: "localhost:7233",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()
    
    // 创建 Worker
    w := worker.New(c, "my-task-queue", worker.Options{})
    
    // 注册工作流和活动
    w.RegisterWorkflow(MyWorkflow)
    w.RegisterActivity(MyActivity)
    
    // 启动 Worker
    if err := w.Run(worker.InterruptCh()); err != nil {
        log.Fatal(err)
    }
}

定义工作流

workflow.go
package main

import (
    "time"
    "go.temporal.io/sdk/workflow"
    "go.temporal.io/sdk/temporal"
)

func MyWorkflow(ctx workflow.Context, name string) (string, error) {
    // 设置活动选项
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 10 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 2.0,
            MaximumAttempts:    3,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)
    
    // 执行活动
    var result string
    err := workflow.ExecuteActivity(ctx, MyActivity, name).Get(ctx, &result)
    if err != nil {
        return "", err
    }
    
    return result, nil
}

定义活动

activity.go
package main

import (
    "context"
    "fmt"
)

func MyActivity(ctx context.Context, name string) (string, error) {
    // 执行实际的业务逻辑
    result := fmt.Sprintf("Hello, %s!", name)
    return result, nil
}

启动工作流

starter.go
package main

import (
    "context"
    "log"
    "go.temporal.io/sdk/client"
)

func main() {
    c, err := client.Dial(client.Options{
        HostPort: "localhost:7233",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()
    
    // 启动工作流
    options := client.StartWorkflowOptions{
        ID:        "my-workflow-id",
        TaskQueue: "my-task-queue",
    }
    
    we, err := c.ExecuteWorkflow(
        context.Background(),
        options,
        MyWorkflow,
        "World",
    )
    if err != nil {
        log.Fatal(err)
    }
    
    // 等待工作流完成
    var result string
    err = we.Get(context.Background(), &result)
    if err != nil {
        log.Fatal(err)
    }
    
    log.Println("Result:", result)
}

常用功能

发送信号

向运行中的工作流发送数据:

c.SignalWorkflow(ctx, workflowID, runID, "signal-name", data)
查询工作流

查询工作流状态:

c.QueryWorkflow(ctx, workflowID, runID, "query-name", result)
取消工作流

取消正在运行的工作流:

c.CancelWorkflow(ctx, workflowID, runID)

资源链接