Golang实现带优先级的select

目录
  • 背景
  • 解决方案
  • 一个封装

背景

在 Golang 里面,我们经常使用 channel 进行协程之间的通信。这里有一个经典的场景,也就是生产者消费者模式,生产者协程不断地往 Channel 里面塞元素,而消费者协程不断地消费这些元素。

写成代码就是如下:

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	ch := make(chan int, 10)
	var wg sync.WaitGroup
	wg.Add(2)
	go producer(ch, &wg)
	go consumer(ch, &wg)
	wg.Wait()
}

// 生产者
func producer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	i := 0
	for {
		select {
		case ch <- i:
		default:
			// 丢弃
			log.Println("discard")
		}
		i++
		time.Sleep(time.Second)
	}
}

// 消费者
func consumer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	consume := func(i int) {
		fmt.Println(i)
		time.Sleep(time.Millisecond * 700)
	}
	for {
		i := <-ch
		consume(i) // 消费元素
	}
}

生产者不断产生元素,消费者消费元素。生产者不会等待消费者消费完毕(不然可能影响其他任务),如果 channel 已经满了,也就是说明消费者消费不过来,生产者就会丢弃这个任务。

生产者平均一秒生成1个,消费者0.7秒消费一个。正常情况下消费者是消费得过来的,然而很多时候消费者协程还需要做一些定时任务,比如一些定时清理工作。假如这个清理工作每2秒触发一次,清理时间一般需要1.5秒,也就是如果每次都做每一秒有0.75秒会被清理工作占有了,但是它不是一定要非常及时的,可以等空闲时再进行。 如下代码:

// 消费者
func consumer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	t := time.NewTicker(time.Second * 2)
	consume := func(i int) {
		fmt.Println(i)
		time.Sleep(time.Millisecond * 700)
	}
	clear := func() {
		fmt.Println("clear")
		time.Sleep(time.Millisecond * 1500)
	}
	for {
		select {
		case i := <-ch:
			consume(i) // 消费元素:
		case <-t.C:
			clear() // 清理
		}
	}
}

运行程序到第15秒的时候,生产者发现 channel满了,于是开始丢包:

0
1
clear
2
3
4
5
6
clear
7
clear
8
clear
9
clear
clear
10
clear
11
12
13
14
clear
15
clear
clear
discard
16
clear
discard
discard

解决方案

既然清理任务的优先级并不高,那么它就不应该阻塞消费元素流程,而是应该在空闲时才去执行。由于 Golang 里面,如果 select 两个 case 都同时满足,会随机选一个执行,因此第一想到的可能会使用如下代码实现优先级case:

// 消费者
func consumer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	t := time.NewTicker(time.Second * 2)
	consume := func(i int) {
		fmt.Println(i)
		time.Sleep(time.Millisecond * 700)
	}
	clear := func() {
		fmt.Println("clear")
		time.Sleep(time.Millisecond * 1500)
	}
	for {
		select {
		case i := <-ch:
			consume(i) // 消费元素
			continue   // 可能还有元素,不走清理逻辑
		default:
		}

		// 没有元素才走清理逻辑
		select {
		case <-t.C:
			clear() // 清理
		default:
		}
	}
}

如果运行这个程序,可以发现它能够满足优先级的需求,先消费元素,空闲时再执行清理任务。

然而,在没有元素可以消费,也没有清理任务可以执行的时候,这里的for将会不断地循环,浪费CPU资源。

其实,可以使用下面的方法实现优先级case,它能够在没有元素就绪的时候阻塞在 select,而不是不断循环:

// 消费者
func consumer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	t := time.NewTicker(time.Second * 2)
	consume := func(i int) {
		fmt.Println(i)
		time.Sleep(time.Millisecond * 700)
	}
	clear := func() {
		fmt.Println("clear")
		time.Sleep(time.Millisecond * 1500)
	}
	for {
		select {
		case i := <-ch:
			consume(i) // 消费元素
		case <-t.C:
		priority:
			for { // 清理前先把元素消费完
				select {
				case i := <-ch:
					consume(i) // 消费元素
				default:
					break priority // 注:这里会跳过这个循环,而不是再次执行
				}
			}
			clear() // 清理
		}
	}
}

这里的关键是在触发清理case的时候,先去把channel里面的元素消费完,再进行清理,从而保证能够留下足够的channel缓冲区给生产者放置生产的元素。

一个封装

上面那段优先级case代码其实挺常用的,但是几乎都是模板代码,特别是需要在两个地方写consume(i),因此我们可以封装一下这段代码,方便使用,减少出错:

// 优先级select ch1 的任务先执行完毕后才会执行 ch2 里面的任务
func PrioritySelect[T1, T2 any](ch1 <-chan T1, f1 func(T1), ch2 <-chan T2, f2 func(T2)) {
	for {
		select {
		case a := <-ch1:
			f1(a)
		case b := <-ch2:
		priority:
			for {
				select {
				case a := <-ch1:
					f1(a)
				default:
					break priority
				}
			}
			f2(b)
		}
	}
}

这样,我们的消费者代码就可以简化为:

// 消费者
func consumer(ch chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	t := time.NewTicker(time.Second * 2)
	consume := func(i int) {
		fmt.Println(i)
		time.Sleep(time.Millisecond * 700)
	}
	clear := func(time.Time) {
		fmt.Println("clear")
		time.Sleep(time.Millisecond * 1500)
	}
	PrioritySelect(ch, consume, t.C, clear)
}

到此这篇关于Golang实现带优先级的select的文章就介绍到这了,更多相关Golang带优先级select内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 一文带你了解Golang中select的实现原理

    目录 概述 结构 现象 非阻塞的收发 随机执行 编译 直接阻塞 独立情况 非阻塞操作 通用情况 运行时 初始化 循环 总结 概述 select是go提供的一种跟并发相关的语法,非常有用.本文将介绍 Go 语言中的 select 的实现原理,包括 select 的结构和常见问题.编译期间的多种优化以及运行时的执行过程. select 是一种与 switch 非常相似的控制结构,与 switch 不同的是,select 中虽然也有多个 case,但是这些 case 中的表达式都必须与 Channel

  • golang中select语句的简单实例

    目录 前言 1.先举个简单例子 2. 避免造成死锁 3. select 随机性 4. select 的超时 5. 读取/写入都可以 6. 总结一下 前言 在golang语言中,select语句 就是用来监听和channel有关的IO操作,当IO操作发生时,触发相应的case动作.有了 select语句,可以实现 main主线程 与 goroutine线程 之间的互动. select { case <-ch1 : // 检测有没有数据可读 // 一旦成功读取到数据,则进行该case处理语句 cas

  • golang select 机制和超时问题

    golang 中的协程使用非常方便,但是协程什么时候结束是一个控制问题,可以用 select 配合使用. 首先声明,golang 使用并不熟悉,本文仅仅是记录使用过程中遇到的一些坑. 子协程和父协程的通信通常用 context 或者 chan.我遇到一个通常的使用场景,在子协程中尝试多次处理,父协程等待一段时间超时,我选择用 chan 实现.我以为 select 和 C++ 中 switch 类似,所以最开始代码类似如下: for { select { case <-ctx.Done(): //

  • 深入浅出Golang中select的实现原理

    目录 概述 select实现原理 执行流程 case数据结构 执行select 循环 总结 概述 在go语言中,select语句就是用来监听和channel有关的IO操作,当IO操作发生时,触发相应的case操作,有了select语句,可以实现main主线程与goroutine线程之间的互动.需要的朋友可以参考以下内容,希望对大家有帮助. select实现原理 Golang实现select时,定义了一个数据结构表示每个case语句(包含default,default实际上是一种特殊的case),

  • 详解Golang中select的使用与源码分析

    目录 背景 select 流程 背景 golang 中主推 channel 通信.单个 channel 的通信可以通过一个goroutine往 channel 发数据,另外一个从channel取数据进行.这是阻塞的,因为要想顺利执行完这个步骤,需要 channel 准备好才行,准备好的条件如下: 1.发送 缓存有空间(如果是有缓存的 channel) 有等待接收的 goroutine 2.接收 缓存有数据(如果是有缓存的 channel) 有等待发送的 goroutine 对channel实际使

  • Golang实现带优先级的select

    目录 背景 解决方案 一个封装 背景 在 Golang 里面,我们经常使用 channel 进行协程之间的通信.这里有一个经典的场景,也就是生产者消费者模式,生产者协程不断地往 Channel 里面塞元素,而消费者协程不断地消费这些元素. 写成代码就是如下: package main import ( "fmt" "sync" "time" ) func main() { ch := make(chan int, 10) var wg sync.

  • golang cache带索引超时缓存库实战示例

    目录 正文 定义泛型函数 Filter 函数 Map 函数 First 函数 带超时的cache cache 结构 集合操作 set 结构 带索引的cache index 结构 正文 cache 是一个带索引带超时的缓存库 目的在于优化代码结构,提供了若干实践. https://github.com/weapons97/cache example 定义泛型函数 1.18 已经发布一段实践了.通过泛型函数.我们可以减少循环的使用,优化代码结构.下面分享几个泛型函数和代码上的实践. Filter 函

  • Golang中switch语句和select语句的用法教程

    本文主要给大家介绍了关于Golang中switch和select用法的相关内容,分享出来供大家参考学习,下面来一起看看详细的介绍: 一.switch语句 switch语句提供了一个多分支条件执行的方法.每一个case可以携带一个表达式或一个类型说明符.前者又可被简称为case表达式.因此,Go语言的switch语句又分为表达式switch语句和类型switch语句. 1.表达式switch语句 var name string ... switch name { case "Golang"

  • Golang精编49面试题汇总(选择题)

    目录 Golang精编100题 能力模型(测试) 选择题 1.   [初级]下面属于关键字的是() 2.   [初级]定义一个包内全局字符串变量,下面语法正确的是() 3.   [初级]通过指针变量 p 访问其成员变量 name,下面语法正确的是() 4.   [初级]关于接口和类的说法,下面说法正确的是() 5.   [初级]关于字符串连接,下面语法正确的是() 6.   [初级]关于协程,下面说法正确是() 7.   [中级]关于init函数,下面说法正确的是() 8.   [初级]关于循环

  • jquery操作select元素和option的实例代码

    废话不多说了,直接给大家贴代码,具体代码如下所示: <html> <head> <title></title> <!--添加jquery--> <script src="../Script/jQuery/jquery-1.6.2.min.js" type="text/javascript"></script> <script type="text/javascript

  • Golang实现异步上传文件支持进度条查询的方法

    业务背景 业务需求要求开发一个异步上传文件的接口,并支持上传进度的查询. 需求分析 ZIP压缩包中,包含一个csv文件和一个图片文件夹,要求:解析csv数据存入mongo,将图片文件夹中的图片信息对应上csv中的人员信息. ZIP压缩包解压 使用golang自带的 "archive/zip" 包解压. func decompressZip(filePath, dest string) (string, string, error) { var csvName string imageF

  • JVM优先级线程池做任务队列的实现方法

    前言 我们都知道 web 服务的工作大多是接受 http 请求,并返回处理后的结果.服务器接受的每一个请求又可以看是一个任务.一般而言这些请求任务会根据请求的先后有序处理,如果请求任务的处理比较耗时,往往就需要排队了.而同时不同的任务直接可能会存在一些优先级的变化,这时候就需要引入任务队列并进行管理了.可以做任务队列的东西有很多,Java 自带的线程池,以及其他的消息中间件都可以. 同步与异步 这个问题在之前已经提过很多次了,有些任务是需要请求后立即返回结果的,而有的则不需要.设想一下你下单购物

  • Golang 如何解析和生成json

    JSON(Javascript Object Notation)是一种轻量级的数据交换语言,以文字为基础,具有自我描述性且易于让人阅读.尽管JSON是JavaScript的一个子集,但JSON是独立于语言的文本格式,并且采用了类似于C语言家族的一些习惯.JSON与XML最大的不同在于XML是一个完整的标记语言,而JSON不是.JSON由于比XML更小.更快,更易解析,以及浏览器的內建快速解析支持,使得其更适用于网络数据传输领域. Golang自带的JSON解析库encoding/json,可以用

  • golang的时区和神奇的time.Parse的使用方法

    时区 先写一段测试代码: const TIME_LAYOUT = "2006-01-02 15:04:05" func parseWithLocation(name string, timeStr string) (time.Time, error) { locationName := name if l, err := time.LoadLocation(locationName); err != nil { println(err.Error()) return time.Time

  • golang 打印error的堆栈信息操作

    众所周知,目前的golang error只关注Error()信息,而不关注它的堆栈路径,对错误的定位大多数通过 log.SetFlags(log.Llongfile| log.LstdFlags) log.Println(e) 一旦代码分层,为了定位错误,可能出现每一个层次的同一个error,都得log好几次,比如: func DB()error{ return errors.New("time out") } func Dao()error{ if er:= DB();er!=nil

随机推荐