简单、可靠、高效的分布式go异步任务队列Asynq

什么是Asynq

Asynq是一个go语言实现的分布式任务队列和异步处理库,基于redis,类似sidekiqcelery,他具有以下特点:

  • 保证至少执行一次任务
  • 持久化
  • 失败重试
  • worker崩溃自动恢复
  • 优先队列
  • 暂停队列
  • 支持中间件
  • 允许唯一任务
  • 支持Redis Cluster实现自动分片
  • 支持Redis Sentinels实现高可用
  • 提供web ui管理
  • 提供cli管理

安装

go get -u github.com/hibiken/asynq
// 命令行工具:
go get -u github.com/hibiken/asynq/tools/asynq

使用

前提需要保证redis可用

main.go Asynq服务端 worker.go 处理程序 asynq_test.go 模拟客户端使用

Asynq Server

//go main.go
package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "time"

    "github.com/hibiken/asynq"
    "golang.org/x/sys/unix"
)

func main() {
    // asynq server
    srv := asynq.NewServer(
        asynq.RedisClientOpt{
            Addr:     ":6379",
            Password: "Your password",
            DB:       0,
        },
        asynq.Config{Concurrency: 20},
    )

    mux := asynq.NewServeMux()

    // some middlewares
    mux.Use(func(next asynq.Handler) asynq.Handler {
        return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
            // just record a log
            fmt.Println(fmt.Printf("[%s] log - %+v", time.Now().Format("2006-01-02 15:04:05"), t.Payload))

            return next.ProcessTask(ctx, t)
        })
    })

    // some workers
    mux.HandleFunc("msg", HandleMsg)

    // start server
    if err := srv.Start(mux); err != nil {
        log.Fatalf("could not start server: %v", err)
    }

    // Wait for termination signal.
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
    for {
        s := <-sigs
        if s == unix.SIGTSTP {
            srv.Quiet() // Stop processing new tasks
            continue
        }
        break
    }

    // Stop worker server.
    srv.Stop()
}

Asynq Workers

//go worker.go
package main

import (
    "context"
    "fmt"

    "github.com/hibiken/asynq"
)

// HandleMsg 处理msg
func HandleMsg(ctx context.Context, t *asynq.Task) (err error) {
    fmt.Println("------HandleMsg start------")

    message, _ := t.Payload.GetString("message")
    userid, _ := t.Payload.GetInt("user_id")

    fmt.Println(fmt.Printf("{message: \"%s\", user_id: %d}", message, userid))
    return
}

模拟使用

//go asynq_test.go
package main

import (
    "fmt"
    "os"
    "testing"
    "time"

    "github.com/hibiken/asynq"
)

var c *asynq.Client

func TestMain(m *testing.M) {
    r := asynq.RedisClientOpt{
        Addr:     ":6379",
        Password: "Your password",
        DB:       0,
    }
    c = asynq.NewClient(r)
    ret := m.Run()
    c.Close()
    os.Exit(ret)
}

// 即时消费
func Test_Enqueue(t *testing.T) {
    payload := map[string]interface{}{"user_id": 1, "message": "i'm immediately message"}
    task := asynq.NewTask("msg", payload)
    res, err := c.Enqueue(task)
    if err != nil {
        t.Errorf("could not enqueue task: %v", err)
        t.FailNow()
    }
    fmt.Printf("Enqueued Result: %+v\n", res)
}

// 延时消费
func Test_EnqueueDelay(t *testing.T) {
    payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
    task := asynq.NewTask("msg", payload)
    res, err := c.Enqueue(task, asynq.ProcessIn(5*time.Second))
    // res, err := c.Enqueue(task, asynq.ProcessAt(time.Now().Add(5*time.Second)))
    if err != nil {
        t.Errorf("could not enqueue task: %v", err)
        t.FailNow()
    }
    fmt.Printf("Enqueued Result: %+v\n", res)
}

// 超时、重试、过期
func Test_EnqueueOther(t *testing.T) {
    payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
    task := asynq.NewTask("msg", payload)
    // 10秒超时,最多重试3次,20秒后过期
    res, err := c.Enqueue(task, asynq.MaxRetry(3), asynq.Timeout(10*time.Second), asynq.Deadline(time.Now().Add(20*time.Second)))
    if err != nil {
        t.Errorf("could not enqueue task: %v", err)
        t.FailNow()
    }
    fmt.Printf("Enqueued Result: %+v\n", res)
}

如何测试

先将服务运行起来

##bash
$ go version
go version go1.16 darwin/amd64
$ go run .

运行指定测试

##bash
$ go test -timeout 30s -run ^Test_Enqueue$ asynq_test -v -count=1

=== RUN   Test_Enqueue
Enqueued Result: &{ID:683d8f36-f8c5-49c0-88b4-f1aefa7686de EnqueuedAt:2021-06-11 10:41:49.018475 +0000 UTC ProcessAt:2021-06-11 18:41:49.017778 +0800 CST m=+0.000892619 Retry:25 Queue:default Timeout:30m0s Deadline:1970-01-01 08:00:00 +0800 CST}
--- PASS: Test_Enqueue (0.00s)
PASS
ok      asynq_test  0.009s

队列管理

Asynq提供了webui 和 命令行工具asynq

webui

Asynqmon webui在这个仓库里

##bash
$ ./asynqmon --port=3000 --redis-addr=localhost:6380

img

asynq命令行

##bash
$ asynq -p Yourpassword stats
Task Count by State
active      pending   scheduled  retry  archived
----------  --------  ---------  -----  ----
0           0         0          0      0

Task Count by Queue
default
-------
0

Daily Stats 2021-06-11 UTC
processed  failed  error rate
---------  ------  ----------
4          0       0.00%

Redis Info
version  uptime  connections  memory usage  peak memory usage
-------  ------  -----------  ------------  -----------------
6.2.0    0 days  5            16.04MB       16.14MB

更多阅读

完整代码 官方文档

京ICP备16046576号-1