Golang工作池的使用实例讲解

目录
  • 一、概念
  • 二、实例
    • 1.简单示例
    • 2.读入数据

一、概念

我们可以将工作池理解为线程池。线程池的创建和销毁非常消耗资源,所以专门写一个pool,每次用过的线程池再放回pool中而不是销毁。不过在Go语言中不会使用系统的线程,而是使用goroutine。gorotine的创建和销毁比系统线程的消耗要小的多,而且goroutine没有标号。所以goroutine的pool就不再时线程池,而是work pool(工作池)。

虽然goroutine的系统消耗较小,但也不能随意在编码时使用go func(),如果程序频繁启动goroutine,会造成极其不可控性能问题。对于可以提前预知的大量异步处理的任务就要考虑使用工作池。

工作池的作用控制goroutine的规模,或者说是goroutine的数量。在Go语言中,控制goroutine的数量最好方式就是使用缓存通道。

二、实例

1.简单示例

下面是Go语言解决工作池的经典用法。

func worker(id int, jobs <-chan int, results chan<- int) {
	for job := range jobs {
		fmt.Printf("worker(%d) start to do job(%d)\n", id, job)
		time.Sleep(time.Second)
		fmt.Printf("worker(%d) finished job(%d)\n", id, job)
		results <- job
	}
}
func main() {
    // 为了使用我们的工作池,我们需要发送工作和接受工作的结果,
    // 这里我们定义两个通道,一个jobs,一个results
	jobs := make(chan int, 100)
	results := make(chan int, 100)
	// 开启3个goroutine
	for id := 1; id <= 3; id++ {
		go worker(id, jobs, results)
	}
	// 创建5个任务
	for job := 1; job <= 5; job++ {
		jobs <- job
	}
	close(jobs)
	// 输出结果
	for i := 1; i <= 5; i++ {
		<-results
	}
}

上述代码工作池思想主要体现在jobs的通道上,因为定义了一个缓存长度为100的通道,所以在通道到100以后,新任务就会阻塞,只有等worker从通道取走一个工作以后才能继续分配新工作。

本案例较为简单,如果worker的数量较大,业务执行时间较长的话,我们需要在程序设计上将jobs和worker的模式进行优化,每个worker处理一项工作,工作池可以自定义最大数量的worker;这样可以保证goroutine的最大数量,可程序更加可控,避免代码消耗压垮系统。

2.读入数据

下面时改良之后代码

1package main
import (
	"fmt"
	"reflect"
	"time"
)
// Job 任务内容
type Job struct {
	ID   int
	Name string
}
// Worker 工作
type Worker struct {
	id         int           // id
	WorkerPool chan chan Job // 工作者池(通道的通道),每个元素都是一个job通道, 公共的job
	JobChannel chan Job      // 工作通道,每个元素是一个job,worker私有的job
	exit       chan bool     // 结束信号
}
var (
	MaxWorker = 5                 // 最大worker数量
	JobQueue  = make(chan Job, 5) // 工作通道,模拟需处理的工作
)
// Scheduler 排程中心
type Scheduler struct {
	WorkerPool   chan chan Job // 工作池
	WorkerMaxNum int           // 最大工作者数
	Workers      []*Worker     // worker队列
}
// NewScheduler 创建排程中心
func NewScheduler(workerMaxNum int) *Scheduler {
	workerPool := make(chan chan Job, workerMaxNum) // 工作池
	return &Scheduler{WorkerPool: workerPool, WorkerMaxNum: workerMaxNum}
}
// Start 工作池开始
func (s *Scheduler) Start() {
	Workers := make([]*Worker, s.WorkerMaxNum)
	for i := 0; i < s.WorkerMaxNum; i++ {
		worker := NewWorker(s.WorkerPool, i)
		worker.Start()
		Workers[i] = &worker
	}
	s.Workers = Workers
	go s.schedule()
}
// Stop 工作池的关闭
func (s *Scheduler) Stop() {
	Workers := s.Workers
	for _, w := range Workers {
		w.Stop()
	}
	time.Sleep(time.Second)
	close(s.WorkerPool)
}
func NewWorker(WorkerPool chan chan Job, id int) Worker {
	fmt.Printf("new a worker(%d)\n", id)
	return Worker{
		id:         id,
		WorkerPool: WorkerPool,
		JobChannel: make(chan Job),
		exit:       make(chan bool),
	}
}
// Start 监听任务和结束信号
func (w Worker) Start() {
	go func() {
		for {
			select {
			case job := <-w.JobChannel: // 收到任务
				fmt.Println("get a job from private w.JobChannel")
				fmt.Println(job)
			case <-w.exit: // 收到结束信号
				fmt.Println("worker exit", w)
				return
			}
		}
	}()
}
func (w Worker) Stop() {
	go func() {
		w.exit <- true
	}()
}
// 排程
func (s *Scheduler) schedule() {
	for {
		select {
		case job := <-JobQueue:
			fmt.Println("get a job from JobQueue")
			go func(job Job) {
				//从WorkerPool获取jobChannel,忙时阻塞
				jobChannel := <-s.WorkerPool
				fmt.Println("get a private jobChannel from public s.WorkerPool", reflect.TypeOf(jobChannel))
				jobChannel <- job
				fmt.Println("worker's private jobChannel add one job")
			}(job)
		}
	}
}
func main() {
	scheduler := NewScheduler(MaxWorker)
	scheduler.Start()
	jobQueue()
	scheduler.Stop()
}
// 模拟Job任务
func jobQueue() {
	for i := 1; i <= 30; i++ {
		JobQueue <- Job{ID: i, Name: fmt.Sprintf("Job【%d】", i)}
		fmt.Printf("jobQueue add %d job\n", i)
	}
}

定义了两个结构体:Task任务和Job工作,Task并没有实质性的内容,这里仅仅定义了一个整型变量;

定义两个全局变量:MaxWorker是最大的worker数量;JobQueue是Job的通道。这两个变量都用于后面的模拟,在真实场景中可以不设置这两个变量。

定义了一个Worker结构体,与上一个简单工作池的示例不同,本例的Worker不再是简单的一个goroutine,而是一个结构体。结构体内定义了如下四个变量。id:worker编号。exit:这是一个bool类型的通道,当有数据写入时worker结束运行。JobChannel:Job类型的通道,该通道是专属于当前worker的私有工作队列。WorkerPool:注意看,定义的时候使用了两个Channel,每一个元素是一个Job通道,其实每一个元素是一个JobChannel。

NewWorker方法用于创建一个新的worker,要注意该方法的参数workerPool用于创建worker时传入,这就说明每个worker与其他worker的WorkerPool是共享的,或者说多个worker使用一个WorkerPool。这一点很重要,这是本示例代码在上一个简单示例代码基础上的优化。而JobChannel和exit变量则是随着Worker的新建而新建的。

Worker的Start方法,该方法用于监听任务或者结束信号。Start方法一开始就用goroutine运行一个匿名函数,而函数内部是一个无限循环。在循环内部,首先是把当前的JobChannel注册到WorkerPool里,一旦注册进去也就说明该worker可以接收任务了。然后通过select判断JobChannel是否可以读取,也就是其中是否有Job,或者exit通道是否可以读取。如果JobChannel可读取,证明有Job,后续开始处理Job;而如果exit可读,则结束当前的无限循环。所以,后面的代码中要特别注意对WorkerPool的操作,Worker是从WorkerPool领取工作的。Worker的Stop方法,用于为exit通道写入数据,在Start方法内Worker会读取到写入的数据,进而结束无限循环。

NewScheduler函数用于创建一个Scheduler,可以看到函数内部的WorkerPool是通过make函数新建的,NewWorker函数一样靠参数传入。注意WorkerPool是有缓存通道的,缓存长度是MaxWorkers。

Scheduler的Create方法,该方法根据MaxWorkers最大数创建Worker,并且把引用存入Workers切片。创建好Worker后,马上调用Worker的Start方法,最后通过goroutine运行Schedule方法。Scheduler的Shutdown方法,用于关闭工作池,调用所有worker的Stop方法并且关闭WorkerPool工作池。

Scheduler的Schedule方法,该方法内也是一个无限循环,循环内部就是不停地读取JobQueue,然后运行一个goroutine。在新运行的goroutine内从s.WorkerPool读取一个JobChannel,注意,Worker注册到WorkerPool以后此处才可以读取到,如果WorkerPool的缓存通道内没有JobChannel,则会阻塞,直到读取到JobChannel,才把Job写入。

备注:此文内容来自《Go微服务实战》

到此这篇关于Golang工作池的使用实例讲解的文章就介绍到这了,更多相关Go工作池内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Go实现线程池(工作池)的两种方式实例详解

    worker pool简介 worker pool其实就是线程池thread pool.对于go来说,直接使用的是goroutine而非线程,不过这里仍然以线程来解释线程池. 在线程池模型中,有2个队列一个池子:任务队列.已完成任务队列和线程池.其中已完成任务队列可能存在也可能不存在,依据实际需求而定. 只要有任务进来,就会放进任务队列中.只要线程执行完了一个任务,就将任务放进已完成任务队列,有时候还会将任务的处理结果也放进已完成队列中. worker pool中包含了一堆的线程(worker,

  • Golang工作池的使用实例讲解

    目录 一.概念 二.实例 1.简单示例 2.读入数据 一.概念 我们可以将工作池理解为线程池.线程池的创建和销毁非常消耗资源,所以专门写一个pool,每次用过的线程池再放回pool中而不是销毁.不过在Go语言中不会使用系统的线程,而是使用goroutine.gorotine的创建和销毁比系统线程的消耗要小的多,而且goroutine没有标号.所以goroutine的pool就不再时线程池,而是work pool(工作池). 虽然goroutine的系统消耗较小,但也不能随意在编码时使用go fu

  • Golang单元测试与覆盖率的实例讲解

    1 概述 C/C++和Java(以及大多数的主流编程语言)都有自己成熟的单元测试框架,前者如Check,后者如JUnit,但这些编程框架本质上仍是第三方产品,为了执行单元测试,我们不得不从头开始搭建测试工程,并且需要依赖于第三方工具才能生成单元测试的覆盖率. 相比之下,Go语言官方则提供了语言级的单元测试支持,即testing包,而且仅通过go工具本身就可以方便地生成覆盖率数据,也就是说,单元测试是Go语言的自带属性,除了好好设计自己的单元测试用例外,开发者不需要操心工程搭建的任何细节.没错,G

  • Golang JSON的进阶用法实例讲解

    痛点 json 是当前最常用的数据传输格式之一,纯文本,容易使用,方便阅读,在通信过程中大量被使用. 你是否遇到过json中某个字段填入某种类型都适合而陷入两难境地? (例如:定义了一个port字段,你却不知道是填入 8080 ,还是 "8080" 的尴尬局面) 你是否遇到过json反解析报错是因为填入字段的类型不匹配导致的?例如: json: cannot unmarshal number into Go struct field Host.port of type string 你

  • PHP面向对象之工作单元(实例讲解)

    工作单元 这个模式涉及到了领域模型.数据映射器和标识映射,这里就统一进行整理和回顾了. $venue = new \woo\domain\Venue(null,"The Green Tree"); \woo\domain\ObjectWatcher::instance()->performOperations(); 现在以上面的二行客户端代码为切入点大概的叙述一下这个模式是怎么工作的. 第一句在使用领域模型对象创建一个对象的时候,它就调用了标识映射ObjectWatcher类 将

  • python爬虫筛选工作实例讲解

    我们在选择一件商品的时候,会先了解一些相关的商品信息,根据自己的需求和情况再进行选择.这种现象也同样适用于找工作,筛选一个岗位的重要环节,就是看自身是否符合工作经验的要求.不过因为信息量比较大,有没有什么方法可以用python爬虫中的知识点帮我们解决一下呢~具体内容往下看: 根据工作经验年限,划分招聘等级 # 校正拉勾网工作年限描述,以 Boss直聘描述为准 def update_lagou_workyear(): items = db.jobs_lagou_php.find({}) for i

  • python中Pexpect的工作流程实例讲解

    1.工作流程步骤 (1)用spawn来执行一个程序: (2)用expect方法来等待指定的关键字,这个关键字是被执行的程序打印到标准输出上面的: (3)当发现这个关键字以后,使用send/sendline方法发送字符串给这个程序. 2.实例 spawn类 class spawn(SpawnBase): '''This is the main class interface for Pexpect. Use this class to start and control child applica

  • Java实现Web应用中的定时任务(实例讲解)

    定时任务,是指定一个未来的时间范围执行一定任务的功能.在当前WEB应用中,多数应用都具备任务调度功能,针对不同的语音,不同的操作系统, 都有其自己的语法及解决方案,windows操作系统把它叫做任务计划,linux中cron服务都提供了这个功能,在我们开发业务系统中很多时候会涉及到这个功能.本场chat将使用java语言完成日常开发工作中常用定时任务的使用,希望给大家工作及学习带来帮助. 一.定时任务场景 (1)驱动处理工作流程 作为一个新的预支付订单被初始化放置,如果该订单在指定时间内未进行支

  • Java高级之虚拟机加载机制的实例讲解

    Jvm要加载的是二进制流,可以是.class文件形式,也可以是其他形式,按照它加载的标准来设计就不会有太大问题. 以下主要就机制和标准两个问题分析一番: 首先来Java类文件的加载机制 ,跟变量的加载机制类似,它先把Class文件加载入内存,再对数据进行验证.解析和初始化,最终形成虚拟机可以直接使用的Java类型.由于Java是采用JIT机制,所以加载时会比较慢,但优点也明显,具有高度灵活性,支持动态加载和动态连接. 接下来就讲讲类的加载过程: 一个类加载的基本过程是按照下面的顺序 来,但也有不

  • TensorFlow 实战之实现卷积神经网络的实例讲解

    本文根据最近学习TensorFlow书籍网络文章的情况,特将一些学习心得做了总结,详情如下.如有不当之处,请各位大拿多多指点,在此谢过. 一.相关性概念 1.卷积神经网络(ConvolutionNeural Network,CNN) 19世纪60年代科学家最早提出感受野(ReceptiveField).当时通过对猫视觉皮层细胞研究,科学家发现每一个视觉神经元只会处理一小块区域的视觉图像,即感受野.20世纪80年代,日本科学家提出神经认知机(Neocognitron)的概念,被视为卷积神经网络最初

随机推荐