Go异步任务解决方案之Asynq库详解

目录
  • 一、概述
  • 二、快速开始
    • 1. 准备工作
    • 2. 安装asynq软件包
    • 3. 创建项目asynq_task
    • 4. Task任务
    • 5. 编写程序
    • 6. 运行查看结果
  • 三、细节
    • 1. 关于asynq的优雅退出
    • 2. client中 client.Enqueue 的使用
  • 总结

今天为大家介绍一个Go处理异步任务的解决方案:Asynq,是一个 Go 库,用于排队任务并与 worker 异步处理它们。它由Redis提供支持,旨在实现可扩展且易于上手。

一、概述

Asynq 是一个 Go 库,用于对任务进行排队并与工作人员异步处理它们。

  • Asynq 工作原理的高级概述:

    • 客户端将任务放入队列
    • 服务器从队列中拉出任务并为每个任务启动一个工作 goroutine
    • 多个工作人员同时处理任务
  • git库地址:

https://github.com/hibiken/asynq

二、快速开始

1. 准备工作

确保已安装并运行了redis

redis 版本大于5.0

redis-server

目录结构

.
├── conf
│   └── redis.conf
└── docker-compose.yml

docker-compose.yml

version: '3.8'
services:
  myredis:
    container_name: myredis
    image: redis:6.2.5 #6.0.6
    restart: always
    ports:
      - 6379:6379
    privileged: true
    environment:
      # 时区上海
      TZ: Asia/Shanghai
    command: redis-server /etc/redis/redis.conf --appendonly yes
    volumes:
      - $PWD/data:/data
      - $PWD/conf/redis.conf:/etc/redis/redis.conf
   # networks:
    #  - myweb

#networks:
 # myweb:
  #  driver: bridge

conf/redis.conf

#开启保护
protected-mode yes
#开启远程连接
#bind 127.0.0.1
#自定义密码
requirepass 123456
port 6379
timeout 0
# 900s内至少一次写操作则执行bgsave进行RDB持久化
save 900 1
save 300 10
save 60 10000
# rdbcompression ;默认值是yes。对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。如果你不想消耗CPU来进行压缩的话,可以设置为关闭此功能,但是存储在磁盘上的快照会比较大。
rdbcompression yes
# dbfilename :设置快照的文件名,默认是 dump.rdb
dbfilename dump.rdb
# dir:设置快照文件的存放路径,这个配置项一定是个目录,而不能是文件名。使用上面的 dbfilename 作为保存的文件名。
dir /data
# 默认redis使用的是rdb方式持久化,这种方式在许多应用中已经足够用了。但是redis如果中途宕机,会导致可能有几分钟的数据丢失,根据save来策略进行持久化,Append Only File是另一种持久化方式, 可以提供更好的持久化特性。Redis会把每次写入的数据在接收后都写入appendonly.aof文件,每次启动时Redis都会先把这个文件的数据读入内存里,先忽略RDB文件。默认值为no。
appendonly yes
# appendfilename :aof文件名,默认是"appendonly.aof"
# appendfsync:aof持久化策略的配置;no表示不执行fsync,由操作系统保证数据同步到磁盘,速度最快;always表示每次写入都执行fsync,以保证数据同步到磁盘;everysec表示每秒执行一次fsync,可能会导致丢失这1s数据
appendfsync everysec

启动 redis 服务

docker-compose up -d

2. 安装asynq软件包

go get -u github.com/hibiken/asynq

3. 创建项目asynq_task

目录结构:

.
|-- README.md
|-- cmd
|   `-- main.go  # 启动消费者监听
|-- go.mod
|-- go.sum
|-- test.go # 生产者 发送测试数据
`-- test_delivery
    |-- client 
    |   `-- client.go # 生产者 具体发送测试数据的逻辑
    `-- test_delivery.go  # 消费者,执行任务具体处理逻辑

Redis连接项

Asynq 使用 Redis 作为消息代理。

client.gomain.go 都需要连接到 Redis 进行写入和读取。

我们将使用 asynq.RedisClientOpt 指定如何连接到本地 Redis 实例。

asynq.RedisClientOpt{
	Addr:     "127.0.0.1:6379",
	Password: "",
	DB:       2,
}

4. Task任务

*asynq.Task

type Task struct {
	// 一个简单的字符串值,表示要执行的任务的类型.
	typename string

	// 有效载荷保存执行任务所需的数据,有效负载值必须是可序列化的.
	payload []byte

	// 保存任务的选项.
	opts []Option

	// 任务的结果编写器.
	w *ResultWriter
}

5. 编写程序

1)test_delivery.go 一个封装任务创建和任务处理的包

package test_delivery

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/hibiken/asynq"
	"log"
)

const (
	TypeEmailDelivery = "email:deliver"
)

// EmailDeliveryPayload 异步任务需要传递的数据结构
type EmailDeliveryPayload struct {
	UserID     int
	TemplateID string
	DataStr    string
}

// NewEmailDeliveryTask 异步任务需要传递的数据
func NewEmailDeliveryTask(userID int, tmplID, dataStr string) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID, DataStr: dataStr})
	if err != nil {
		fmt.Println(err)
		return nil, err
	}
	return asynq.NewTask(TypeEmailDelivery, payload), nil
}

// HandleEmailDeliveryTask 发送email处理逻辑
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
	//接收任务数据
	var p EmailDeliveryPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
	}
	//逻辑处理start...
	log.Printf("Sending Email to User: user_id=%d, template_id=%s data_str:%s", p.UserID, p.TemplateID, p.DataStr)
	return nil
}

client.go

在应用程序代码中,导入上述包并用于Client将任务放入队列中。

package client

import (
	"asynq_task/test_delivery"
	"fmt"
	"github.com/hibiken/asynq"
	"log"
	"time"
)

func EmailDeliveryTaskAdd(i int) {
	client := asynq.NewClient(asynq.RedisClientOpt{
		Addr:     "192.168.0.120:6379",
		Password: "123456",
		DB:       2,
	})
	defer client.Close()

	// 初使货需要传递的数据
	task, err := test_delivery.NewEmailDeliveryTask(42, fmt.Sprintf("some:template:id:%d", i), `{"name":"lisi"}`)
	if err != nil {
		log.Fatalf("could not create task: %v", err)
	}
	// 任务入队
	//info, err := client.Enqueue(task)

	//info, err := client.Enqueue(task, time.Now())
	// 延迟执行
	info, err := client.Enqueue(task, asynq.ProcessIn(3*time.Second))
	// MaxRetry 重度次数 Timeout超时时间
	//info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3*time.Second))
	if err != nil {
		log.Fatalf("could not enqueue task: %v", err)
	}
	log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

main.go 异步任务服务入口文件

接下来,启动一个工作服务器以在后台处理这些任务。要启动后台工作人员,使用Server并提供您Handler来处理任务。可以选择使用ServeMux来创建处理程序,就像使用net/httpHandler 一样。

package main

import (
	"asynq_task/test_delivery"
	"github.com/hibiken/asynq"
	"log"
)

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{
			Addr:     "192.168.0.120:6379",
			Password: "123456",
			DB:       2,
		},
		asynq.Config{
			// 每个进程并发执行的worker数量
			Concurrency: 5,
			// Optionally specify multiple queues with different priority.
			Queues: map[string]int{
				"critical": 6,
				"default":  3,
				"low":      1,
			},
			// See the godoc for other configuration options
		},
	)

	mux := asynq.NewServeMux()
	mux.HandleFunc(test_delivery.TypeEmailDelivery, test_delivery.HandleEmailDeliveryTask)

	if err := srv.Run(mux); err != nil {
		log.Fatalf("could not run server: %v", err)
	}
}

4)test.go 用来分发异步任务

package main

import (
	"asynq_task/test_delivery/client"
	"time"
)

func main() {
	for i := 0; i < 3; i++ {
		client.EmailDeliveryTaskAdd(i)
		time.Sleep(time.Second * 3)
	}
}

6. 运行查看结果

首先,我们要先把异步任务启动起来准备好接收,也就是启动cmd/main.go

启动test.go文件向异步任务服务添加任务队列

结果如下:

消息者 go run main.go

$ go run main.go
asynq: pid=12092 2023/02/02 23:18:04.161872 INFO: Starting processing
asynq: pid=12092 2023/02/02 23:18:04.161872 INFO: Send signal TERM or INT to terminate the process
2023/02/03 07:18:14 Sending Email to User: user_id=42, template_id=some:template:id:0 data_str:{"name":"lisi"}
2023/02/03 07:18:19 Sending Email to User: user_id=42, template_id=some:template:id:1 data_str:{"name":"lisi"}
2023/02/03 07:18:19 Sending Email to User: user_id=42, template_id=some:template:id:2 data_str:{"name":"lisi"}

生产者 go run test.go

$ go run test.go
2023/02/03 07:18:09 enqueued task: id=5d998c6b-3978-4a25-a096-6e564e032359 queue=default
2023/02/03 07:18:12 enqueued task: id=74a5fea4-d4d4-465f-b310-31981e472f6a queue=default
2023/02/03 07:18:15 enqueued task: id=41c46b7b-ea78-4abc-878a-ea65e3859e28 queue=default

三、细节

1. 关于asynq的优雅退出

如果异步服务突然被暂停,正在执行的异步任务会push到队列中,下次启动的时候自动执行。

我们可以将一个异步任务中途sleep几秒,发送一个异步任务,任务没执行完中途停掉任务测试出结果:

再次启动异步任务服务,发现这个任务被重新执行。

2. client中 client.Enqueue 的使用

立即处理任务

client.Enqueue(t1, time.Now())

2)延时处理任务, 两小时后处理

client.Enqueue(t2, asynq.ProcessIn(time.Now().Add(2 * time.Hour)))

任务重试,最大重试次数为25次。

client.Enqueue(task, asynq.MaxRetry(5))

4)确保任务的唯一性

4-1:使用TaskID选项:自行生成唯一的任务 ID

_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))

// Second task will fail, err is ErrTaskIDConflict (assuming that the first task didn't get processed yet)
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))

4-2:使用Unique选项:让 Asynq 为任务创建唯一性锁

err := c.Enqueue(t1, asynq.Unique(time.Hour))

另外,asynq 异步任务提供了命令行工具和 Asynqmon 用于监控和管理 Asynq 异步任务和队列。WebUI 可以通过传递两个标志来启用与 Prometheus 的集成。

  #asynqmon asynq延迟队列、定时队列的webui
  asynqmon:
    image: hibiken/asynqmon:latest
    container_name: asynqmon
    ports:
      - 8980:8080
    command:
      - '--redis-addr=192.168.0.120:6379'
      - '--redis-password=123456'
      - '--redis-db=2'
    restart: always
#    networks:
#      - looklook_net
#    depends_on:
#      - redis

启动服务:

docker-compose up

访问:

http://192.168.0.120:8980/

总结

到此这篇关于Go异步任务解决方案之Asynq库的文章就介绍到这了,更多相关Go异步任务Asynq库内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Go异步任务解决方案之Asynq库详解

    目录 一.概述 二.快速开始 1. 准备工作 2. 安装asynq软件包 3. 创建项目asynq_task 4. Task任务 5. 编写程序 6. 运行查看结果 三.细节 1. 关于asynq的优雅退出 2. client中 client.Enqueue 的使用 总结 今天为大家介绍一个Go处理异步任务的解决方案:Asynq,是一个 Go 库,用于排队任务并与 worker 异步处理它们.它由Redis提供支持,旨在实现可扩展且易于上手. 一.概述 Asynq 是一个 Go 库,用于对任务进

  • Python如何急速下载第三方库详解

    前言 pip 是一个现代的,通用的 Python 包管理工具 ,是一个安装第三方 库必备的工具,提供了对Python 包的查找.下载.安装.卸载的功能.但是在国内使用有很多因素的限制,一个3.4M的库安装都需要几分钟的时间,而且有时还安装失败.那有没有一个可以极速安装第三方库的方法呢!答案是有的,这是小编经常用来安装第三方库的一种方法,拒绝 pip install 库名 方法如下: 首先,用Windows+R打开运行,输入cmd 按'确定' 然后在cmd界面输入 pip install -i h

  • Python基础之time库详解

    一.前言 time库运行访问多种类型的时钟,这些时钟用于不同的场景.本篇,将详细讲解time库的应用知识. 二.获取各种时钟 既然time库提供了多种类型的时钟.下面我们直接来获取这些时钟,对比其具体的用途.具体代码如下: import time print(time.monotonic()) print(time.monotonic_ns()) print(time.perf_counter()) print(time.perf_counter_ns()) print(time.process

  • Python爬虫进阶之Beautiful Soup库详解

    一.Beautiful Soup库简介 BeautifulSoup4 是一个 HTML/XML 的解析器,主要的功能是解析和提取 HTML/XML 的数据.和 lxml 库一样. lxml 只会局部遍历,而 BeautifulSoup4 是基于 HTML DOM 的,会加载整个文档,解析 整个 DOM 树,因此内存开销比较大,性能比较低. BeautifulSoup4 用来解析 HTML 比较简单,API 使用非常人性化,支持 CSS 选择器,是 Python 标准库中的 HTML 解析器,也支

  • Python高级文件操作之shutil库详解

    前言 什么算是高层的文件操作呢? 普通的文件操作,我们一般只涉及创建文件,文件夹以及写入文件等等.假如我现在需要复制一个文件的内容到另一个文件之中,用pathlib等都只能先打开复制文件,然后进行将其读出来保存,然后再写入新的文件,这种普通的复制操作,无形之中增加了许多步骤. 而shutil库可以直接完成复制符间的操作,同时还支持归档.本篇,将详细介绍文件的高层次操作. 一.copyfile() copyfile()函数用于将一个文件的内容复制到另一个文件之中,准备的来说,它不是copy内容,而

  • python状态机transitions库详解

    一.简介  transitions库 pip install transitions 状态机 state:状态节点 transition:用于从一个状态节点移动到另一个状态节点 教程 https://pypi.org/project/transitions/ 二.逐步创建 创建对象 创建一个继承object的类Number的实体对象number,然后调用transitions.Machine()将状态机绑定到这个实体对象上. from transitions import Machine cla

  • Python爬虫之urllib库详解

    目录 一.说明: 二.urllib四个模块组成: 三.urllib.request 1.urlopen函数 2.response 响应类型 3.Request对象 4.高级请求方式 四.urllib.error 五.URL解析urllib.parse 六.urllib.robotparser 总结 一.说明: urllib库是python内置的一个http请求库,requests库就是基于该库开发出来的,虽然requests库使用更方便,但作为最最基本的请求库,了解一下原理和用法还是很有必要的.

  • 从零开始使用gradle配置即可执行的Hook库详解

    目录 背景 本文须知 当前技术背景 底层选择 目标流程图 Transform ASM 封装开始 目标 实现 gradle 定义extension Transform阶段收集信息: 自定义的classvisitor 自定义method visitor 自定义hook操作 总结 背景 有一天,老板突然找到小B说,隐私合规需要我们获取权限前,需要明确授权来意,这个你来跟一下吧!小B此时就可愁了,因为项目权限那么多,每个自己手动加上授权来意提示的话,可能会漏掉很多,工作量也大,这可咋办呀!老B看到小B这

  • java开发主流定时任务解决方案全横评详解

    目录 引言 Crontab 目标定位 使用方式 实现原理 方案分析 Spring Task 目标定位 使用方式 实现原理 方案分析 ElasticJob 目标定位 使用方式 实现原理 方案分析 XXLJob 目标定位 使用方式 实现原理 方案分析 Serverless Job 目标定位 使用方式 实现原理 方案分析 总结 引言 定时任务作为一种按照约定时间执行预期逻辑的通用模式,在企业级开发中承载着丰富的业务场景,诸如后台定时同步数据生成报表,定时清理磁盘日志文件,定时扫描超时订单进行补偿回调等

随机推荐