Go并发4种方法简明讲解
一、goroutine
1、协程(Coroutine)
Golang 在语言层面对并发编程进行了支持,使用了一种协程(goroutine)机制,
协程本质上是一种用户态线程,不需要操作系统来进行抢占式调度,但是又寄生于线程中,因此系统开销极小,可以有效的提高线程的任务并发性,而避免多线程的缺点。但是协程需要语言上的支持,需要用户自己实现调度器,因为在Go语言中,实现了调度器所以我们可以很方便的能过 go
关键字来使用协程。
func main() { for i := 0; i <10; i++ { go func(i int) { for { fmt.Printf("Hello goroutine %d\n",i) } }(i) } time.Sleep(time.Millisecond) }
最简单的一个并发编程小例子,并发输出一段话。
我们同时开了10个协程进行输出,每次在fmt.printf
时交出控制权(不一定每次都会交出控制权),回到调度器中,再由调度器分配。
2、goroutine 可能切换的点
- I/O,Select
- channel
- 等待锁
- 函数调用
- runtime.Gosched()
我们看一个小例子:
func main() { var a [10]int for i := 0; i <10; i++ { go func(i int) { for { a[i]++ } }(i) } time.Sleep(time.Millisecond) fmt.Println(a) }
在这里,代码直接锁死,程序没有退出,因为在执行函数中没有协程的切换,因为 main
函数也是一个协程。
如果想要程序退出,可以通过 runtime.Gosched()
函数,在执行函数中添加一行。
for { a[i]++ runtime.Gosched() }
加上这个函数之后,代码是可以正常执行了,但是真的是正常执行吗?不一定,我们可以使用 -reac
命令来看一下数据是否有冲突:
这说明数据还是有冲突的,数组a
中的元素一边在做自增,一边在输出。解决这个问题,我们只能使用 channel 来解决。
二、Channel
Channel 中 Go语言在语言级别提供了对 goroutine 之间通信的支持,我们可以使用 channel 在两个或者多个goroutine之间进行信息传递,能过 channel 传递对像的过程和调用函数时的参数传递行为一样,可以传递普通参数和指针。
Channel 有两种模式:
var ch1 = make(chan int) // 无缓冲 channel,同步 var ch2 = make(chan int, 2) // 有缓冲 channel, 异步
无缓冲的方式,数据进入 channel 只要没有被接收,就会处在阻塞状态。
var ch1 = make(chan int) // 无缓冲 channel,同步 ch1 <- 1 ch1 <- 2 // error: all goroutines are asleep - deadlock! fmt.Println(<-ch1)
如果想要运行,必须要再开一个协程不停的去请求数据:
var ch1 = make(chan int) // 无缓冲 channel,同步 go func() { for { n := <-ch1 fmt.Println(n) } }() ch1 <- 1 ch1 <- 2
有缓冲的方式,只要缓冲区没有满就可以一直进数据,缓冲区在填满之后没有接收也会处理阻塞状态。
func bufferChannel() { var ch2 = make(chan int,2) ch2<-1 ch2<-2 fmt.Println(ch2) // 不加这一行的话,是可以正常运行的 ch2<-3 // error: all goroutines are asleep - deadlock! }
1、chaanel 指定方向
比如我现在有一个函数创建一个 channel,并且不断的需要消费channel中的数据:
func worker(ch chan int) { for { fmt.Printf("hello goroutine worker %d\n", <-ch) } } func createWorker() chan int{ ch := make(chan int) go worker(ch) return ch } func main() { ch := createWorker() ch<-1 ch<-2 ch<-3 time.Sleep(time.Millisecond) }
这个函数我是要给别人用的,但是我怎么保证使用 createWorker 函数创建的 channel 都是往里面传入数据的呢?
如果外面有人消费了这个 channel 中的数据,我们怎么限制?
这个时候,我们就可以给返回的channel 加上方向,指明这个 channel 中能往里传入数据,不能从中取数据:
func worker(ch <-chan int) { for { fmt.Printf("hello goroutine worker %d\n", <-ch) } } func createWorker() chan<- int{ ch := make(chan int) go worker(ch) return ch }
我们可以在返回 channel 的地方加上方向,指明返回的函数只能是一个往里传入数据,不能从中取数据。
并且我们还可以给专门消费的函数加上一个方向,指明这个函数只能出不能进。
2、channel 关闭
在使用 channel 的时候,随说我们可以等待channel中的函数使用完之后自己结束,或者等待 main 函数结束时关闭所有的 goroutine 函数,但是这样的方式显示不够优雅。
当一个数据我们明确知道他的结束时候,我们可以发送一个关闭信息给这个 channel ,当这个 channel 接收到这个信号之后,自己关闭。
// 方法一 func worker(ch <-chan int) { for { if c ,ok := <- ch;ok{ fmt.Printf("hello goroutine worker %d\n", c) }else { break } } } // 方法二 func worker(ch <-chan int) { for c := range ch{ fmt.Printf("hello goroutine worker %d\n", c) } } func main() { ch := createWorker() ch<-1 ch<-2 ch<-3 close(ch) time.Sleep(time.Millisecond) }
通过 Close
b函数,我们可以能过 channel 已经关闭,并且我们还可以通过两种方法判断通道内是否还有值。
三、Select
当我们在实际开发中,我们一般同时处理两个或者多个 channel 的数据,我们想要完成一个那个 channel 先来数据,我们先来处理个那 channel 怎么办呢?
此时,我们就可以使用 select 调度:
func genInt() chan int { ch := make(chan int) go func() { i := 0 for { // 随机两秒以内生成一次数据 time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) ch <- i i++ } }() return ch } func main() { var c1 = genInt() var c2 = genInt() for { select { case n := <-c1: fmt.Printf("server 1 generator %d\n", n) case n := <- c2: fmt.Printf("server 2 generator %d\n", n) } } }
1、定时器
for { tick := time.Tick(time.Second) select { case n := <-c1: fmt.Printf("server 1 generator %d\n", n) case n := <-c2: fmt.Printf("server 2 generator %d\n", n) case <-tick: fmt.Println("定时每秒输出一次!") } }
2、超时
for { tick := time.Tick(time.Second) select { case n := <-c1: fmt.Printf("server 1 generator %d\n", n) case n := <-c2: fmt.Printf("server 2 generator %d\n", n) case <-tick: fmt.Println("定时每秒输出一次!") case <-time.After(1300 * time.Millisecond): // 如果 1.3秒内没有数据进来,那么就输出超时 fmt.Println("timeout") } }
四、传统的并发控制
1、sync.Mutex
type atomicInt struct { value int lock sync.Mutex } func (a *atomicInt) increment() { a.lock.Lock() defer a.lock.Unlock() // 使用 defer 解锁,以防忘记 a.value++ } func main() { var a atomicInt a.increment() go func() { a.increment() }() time.Sleep(time.Millisecond) fmt.Println(a.value) }
2、sync.WaitGroup
type waitGrouInt struct { value int wg sync.WaitGroup } func (w *waitGrouInt) addInt() { w.wg.Add(1) w.value++ } func main() { var w waitGrouInt for i := 0; i < 10; i++ { w.addInt() w.wg.Done() } w.wg.Wait() fmt.Println(w.value) }
更多关于Go并发简明讲解请查看下面的相关链接