go 项目中类似 sidekiq 的消息队列 Asynq

温馨提示:仅为个人笔记以免遗忘,不保证代码完整

之前在 ruby 项目中做异步任务,一直使用 sidekiq,sidekiq 依赖 Redis 实现队列任务的增加、重试以及调度,可以算是穷人版的消息队列吧,毕竟不需要付费买生产版本已经够用了。

换用 golang 之后一直想找个替代品,毕竟像 sidekiq 一样简单高效并不是那么容易的,附加的学习和使用成本也很高
最近发现了一个很好的 Go 简单高效的异步任务处理库:Asynq, 开发自谷歌员工。将其引入的项目内,已经基本能满足我的需求了

asynq 的特性:

安装 asynq

首先需要安装 redis,并正常运行

brew install redis
brew services  start redis

## 以下在项目目录下执行
go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynq/tools/asynq #命令行工具

main.go Asynq 服务端 worker.go 处理程序 asynq_test.go 模拟客户端使用

asynq.yml 配置

asynq:
  host: 127.0.0.1                # redis 地址
  port: 6379                     # redis 端口
  db: 0                          # redis db
  password: ""                   # redis password
  poolSize: 30                   # 连接池
  concurrency: 10                # 并发数
  queues:
    - default: 5                  # 默认队列
    - pm_mailers: 6               # 邮件发送任务
    - low: 1                      # 测试队列

asynq server

package gasynq

import (
	"fmt"
	"goapp/internal/pkg/log"
	"sync"
	"time"

	"github.com/gookit/goutil/mathutil"
	"github.com/hibiken/asynq"
	"github.com/pkg/errors"
	"github.com/spf13/viper"
	"go.uber.org/zap"
)

type Config struct {
	Host        string
	Port        int
	Db          int
	Password    string
	PoolSize    string
	Concurrency int
	Queues      map[string]int
}

func NewConfig(v *viper.Viper) (*Config, error) {
	var (
		err error
		o   = new(Config)
	)
	if err = v.UnmarshalKey("asynq", o); err != nil {
		return nil, err
	}

	return o, err
}

type Entry struct {
	Pattern string
	Handler asynq.Handler
}

type AsynqServer struct {
	*asynq.Server
	logger  log.ILogger
	entries []Entry
}

var (
	asynqServer *AsynqServer
	serverOnce  sync.Once
)

func GetAsyncServer() *AsynqServer {
	return asynqServer
}

func StopServer() {
	asynqServer.Stop()
}

func NewServer(v *viper.Viper,
	logger log.ILogger, entries []Entry) (*AsynqServer, error) {
	o, err := NewConfig(v)
	if err != nil {
		return nil, errors.Wrap(err, "unmarshal redis option error")
	}
	logger.Infof("load asynq redis options success!: host:%s port:%d db:%d", o.Host, o.Port, o.Db)

	redis := asynq.RedisClientOpt{
		Addr:         fmt.Sprintf("%s:%d", o.Host, o.Port),
		Password:     o.Password,
		DB:           o.Db,
		PoolSize:     mathutil.MustInt(o.PoolSize),
		DialTimeout:  10 * time.Second,
		ReadTimeout:  30 * time.Second,
		WriteTimeout: 30 * time.Second,
	}

	serverOnce.Do(func() {
		asynqServer = &AsynqServer{
			asynq.NewServer(
				redis,
				asynq.Config{
					Concurrency: o.Concurrency,
					Queues:      o.Queues,
				},
			),
			logger.With(zap.String("type", "Asynq.Server")),
			entries,
		}
	})

	return asynqServer, nil
}

func SeverRun() error {
	if asynqServer == nil {
		return errors.New("async server is nil")
	}

	mux := asynq.NewServeMux()
	for _, entry := range asynqServer.entries {
		mux.Handle(entry.Pattern, entry.Handler)
	}
	if err := asynqServer.Run(mux); err != nil {
		asynqServer.logger.Fatalf("could not run server: %v", err)
		return err
	}
	return nil
}

asynq client

package gasynq

import (
	"fmt"
	"goapp/internal/pkg/log"
	"sync"
	"time"

	"github.com/gookit/goutil/mathutil"
	"github.com/hibiken/asynq"
	"github.com/pkg/errors"
	"github.com/spf13/viper"
)

var (
	asynqClient *asynq.Client
	once        sync.Once
)

func GetAsynqClient() *asynq.Client {
	return asynqClient
}

func ClostClient() error {
	return asynqClient.Close()
}

func NewClient(v *viper.Viper,
	logger log.ILogger) (*asynq.Client, error) {
	o, err := NewConfig(v)
	if err != nil {
		return nil, errors.Wrap(err, "unmarshal redis option error")
	}
	logger.Infof("load asynq redis options success!: host:%s port:%d asynqDb:%d", o.Host, o.Port, o.Db)

	redis := asynq.RedisClientOpt{
		Addr:         fmt.Sprintf("%s:%d", o.Host, o.Port),
		Password:     o.Password,
		DB:           o.Db,
		PoolSize:     mathutil.MustInt(o.PoolSize),
		DialTimeout:  10 * time.Second,
		ReadTimeout:  30 * time.Second,
		WriteTimeout: 30 * time.Second,
	}

	once.Do(func() {
		asynqClient = asynq.NewClient(redis)
	})

	return asynqClient, nil
}

Helloword 任务示例

package tasks

import (
	"context"
	"encoding/json"
	"fmt"
	"goapp/internal/pkg/app"
	"time"

	"github.com/hibiken/asynq"
)

const (
	TypeHelloword = "helloword"
)

type HelloWorldPayload struct {
	Username string
}

func NewHelloWorldTask(username string) (*asynq.Task, error) {
	payload, err := json.Marshal(HelloWorldPayload{Username: username})
	if err != nil {
		return nil, err
	}
	// task options can be passed to NewTask, which can be overridden at enqueue time.
	return asynq.NewTask(TypeHelloword, payload, asynq.MaxRetry(5), asynq.Timeout(20*time.Minute), asynq.Queue("pm_mailers")), nil
}

// HelloWorldProcessor implements asynq.Handler interface.
type HelloWorldProcessor struct {
}

func (job *HelloWorldProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
	var p HelloWorldPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
	}
	app.Logger().Infof("================HelloWorldProcessor: username=%s\n", p.Username)

	// 在这里完善代码...
	return nil
}

func NewHelloWorldProcessor() *HelloWorldProcessor {
	return &HelloWorldProcessor{}
}

server 的启动和 client 的使用,以及 Asynqmon web UI 的使用请参考 https://github.com/hibiken/asynq,这里不详述