解决Golang并发工具Singleflight的问题

目录
  • 前言
  • 定义
  • 用途
  • 简单Demo
  • 源码分析
  • 结构
  • 对外暴露的方法
  • 重点方法分析
    • Do
    • 流程图
    • Forget
    • doCall
  • 实际使用
  • 弊端与解决方案
    • 参考文章

前言

前段时间在一个项目里使用到了分布式锁进行共享资源的访问限制,后来了解到Golang里还能够使用singleflight对共享资源的访问做限制,于是利用空余时间了解,将知识沉淀下来,并做分享

文章尽量用通俗的语言表达自己的理解,从入门demo开始,结合源码分析singleflight的重点方法,最后分享singleflight的实际使用方式与需要注意的“坑“。

定义

按照官方文档的定义,singleflight 提供了一个重复的函数调用抑制机制

Package singleflight provides a duplicate function call suppression

用途

通俗的来说就是 singleflight将相同的并发请求合并成一个请求,进而减少对下层服务的压力,通常用于解决缓存击穿的问题

  • 缓存击穿是指: 在高并发的场景中,大量的request同时请求查询一个共享资源(例如Redis缓存的key) ,如果这个共享资源正好过期失效了,就会导致大量相同的request都打到Redis下游的数据库,导致数据库的负载上升。

简单Demo

var (
	sfKey1 = "key1"
	wg     *sync.WaitGroup
	sf     singleflight.Group
	nums   = 10
)

func getValueService(key string) { //service
   var val string
   wg = &sync.WaitGroup{}
   wg.Add(nums)
   for idx := 0; idx < nums; idx++ { // 模拟多协程同时请求
      go func(idx int) { // 注意for的一个小坑
         defer wg.Done()
         value, _ := getAndSetCacheNoChan(idx, key) //简化代码,不处理error
         log.Printf("request %v get value: %v", idx, value)
         val = value
      }(idx)
   }
   wg.Wait()
   log.Println("val: ", val)
   return
}

// getValueBySingleflight 使用singleflight取cacheKey对应的value值
func getValueBySingleflight(idx int, cacheKey string) (string, error) {
   log.Printf("idx %v into-cache...", idx)
   // 调用singleflight的Do()方法
   value, _, _ := sf.Do(cacheKey, func() (ret interface{}, err error) {
      log.Printf("idx %v is-setting-cache", idx)
      // 休眠0.1s以捕获并发的相同请求
      time.Sleep(100 * time.Millisecond)
      log.Printf("idx %v set-cache-success!", idx)
      return "myValue", nil
   })
   return value.(string), nil
}

看看实际效果

  • 由结果图可以看到,索引=8的协程第一个进入了Do()方法,其他协程则阻塞住,等到idx=8的协程拿到执行结果后,协程以乱序的形式返回执行结果。
  • 相同key的情况下,singleflight将我们的多个请求合并成1个请求。由1个请求去执行对共享资源的操作。

源码分析

结构

type (
   Group struct { // singleflight实体
      mu sync.Mutex       // 互斥锁
      m  map[string]*call // 懒加载
   }

   call struct {
      wg sync.WaitGroup
      // 存储 调用singleflight.Do()方法返回的结果
      val interface{}
      err error

      // 调用singleflight.Forget(key)时将对应的key从Group.m中删除
      forgotten bool

      // 通俗的理解成singleflight合并的并发请求数
      dups  int
      // 存储 调用singleflight.DoChan()方法返回的结果
      chans []chan<- Result
   }

   Result struct {
      Val    interface{}
      Err    error
      Shared bool
   }
)

对外暴露的方法

func Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)   

func DoChan(key string, fn func() (interface{}, error)) <-chan Result) 

// 将key从Group.m中删除
func Forget(key string)

DoChan()和Do()最大的区别是DoChan()属于异步调用,返回一个channel,解决同步调用时的阻塞问题

重点方法分析

Do

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
   g.mu.Lock() // 加互斥锁
   if g.m == nil { // 懒加载map
      g.m = make(map[string]*call)
   }
   if c, ok := g.m[key]; ok { // 检查相同的请求已经是否进入过singleflight
      c.dups++
      g.mu.Unlock()
      c.wg.Wait() // 调用waitGroup的wait()方法阻塞住本次调用,等待第一个进入singleflight的请求执行完毕拿到结果,将本次请求唤醒.

      if e, ok := c.err.(*panicError); ok { //如果调用完成,发生error ,将error上抛
         panic(e)
      } else if c.err == errGoexit {
         runtime.Goexit()
      }
      // 返回调用结果
      return c.val, c.err, true
   }
   c := new(call) // 相同的请求第一次进入singleflight
   c.wg.Add(1)
   g.m[key] = c // new一个call实体,放入singleflight.call这个map
   g.mu.Unlock()

   g.doCall(c, key, fn) //实际执行的函数
   return c.val, c.err, c.dups > 0
}

流程图

由源码可以分析出,最后实际执行我们业务逻辑的函数其实是放到了doCall() 里,我们稍后分析这个函数

Forget

再简单看看Forget()函数,很短.

func (g *Group) Forget(key string) {
   g.mu.Lock()
   if c, ok := g.m[key]; ok {
      c.forgotten = true // key的forgotten标志位记为true
   }
   delete(g.m, key)  // Group.m中删除对应的key
   g.mu.Unlock()
}

doCall

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
   normalReturn := false
   recovered := false

    //使用双重defer来区分error的类型: panic && runtime.error
   defer func() {
      if !normalReturn && !recovered {
        // fn()发生了panic且fn()中的panic没有被recover掉
        // errGoexit连接runtime.Goexit错误
         c.err = errGoexit
      }

      c.wg.Done()
      g.mu.Lock()
      defer g.mu.Unlock()
      if !c.forgotten { // 检查key是否调用了Forget()
         delete(g.m, key)
      }

      if e, ok := c.err.(*panicError); ok {
         // 如果返回的是 panic 错误,为了避免channel被永久阻塞,我们需要确保这个panic无法被recover
         if len(c.chans) > 0 {
            go panic(e)  // panic无法被恢复
            select {} // 阻塞本goroutinue.
         } else {
            panic(e)
         }
      } else {
         // 将结果正常地返回
         for _, ch := range c.chans {
            ch <- Result{c.val, c.err, c.dups > 0}
         }
      }
   }()

   func() {
      defer func() {
         if !normalReturn {
            // 表示fn()发生了panic()
            // 此时与panic相关的堆栈已经被丢弃(调用的fn()) ,无法通过堆栈跟踪去确定error类型
            if r := recover(); r != nil {
               c.err = newPanicError(r) //new一个新的自定义panic err,往第一个defer抛
            }
         }
      }()
     // 执行我们实际的业务逻辑,并将业务方法的返回值赋给singleflight.call
      c.val, c.err = fn()的val和err属性
      // 如果fn()发生panic,normalReturn无法被赋值为true,而是进入doCall()的第二个defer()
      normalReturn = true
   }()
   // 如果normalResult为false时,表示fn()发生了panic
   // 但是执行到了这一步,表示fn()中的panic被recover了
   if !normalReturn {
      recovered = true // recovered标志位置为true
   }
}

由以上分析可以得出几个重要的结论

  • singleflight主要使用sync.Mutex和sync.WaitGroup进行并发控制.
  • 对于key相同的请求, singleflight只会处理的一个进入的请求,后续的请求都会使用waitGroup.wait()将请求阻塞
  • 使用双重defer()区分了panic和runtime.Goexit错误,如果返回的是一个panic错误,group.c.chans会发生阻塞,那么需要抛出这个panic且确保其无法被recover

实际使用

分享一段实际项目中使用singleflight结合本地缓存的代码模版

func (s Service) getDataBySingleFlight(ctx  context.Context) (entity.List, error) {
    // 1. 从localCache查
    resData, err := local_cache.Get(ctx, key)
    if err != nil {
       log.Fatalln()
       return resData, err
    }
    if resData != nil {
       return resData, nil
    }
    // 2. localCache无数据,从redis查
    resData, err = srv.rdsRepo.Get()
    if err != nil && err != redis.Nil {
       // redis错误
       log.Fatalln()
       return resData, err
    } else if redis.Nil == err {
           // redis无数据 ,查db
           resData, err, _ = singleFlight.Do(key, func() (interface{}, error) {
           // 构建db查询条件
          searchConn := entity.SearchInfo{}
           //  建议休眠0.1s 捕获0.1s内的重复请求
          time.Sleep(100 * time.Millisecond)
           // 4. 查db
          data, err := srv.dBRepo.GetByConn(ctx, searchConn)
          if err != nil {
             log.Fatalln()
             return data, err
          }
           // 5. 回写localCache && redisCache
          err = local_cache.Set(ctx, data)
          if err != nil {
             log.Fatalln()
          }
          err = srv.rdsRepo.Set(ctx, data)
          if err != nil {
             log.Fatalln()
          }
      // 返回db数据,回写cache的error不上抛
      return data, nil
   })
   return resData, err
}
return resData, nil

弊端与解决方案

singleflight当然不是解决问题的银弹,在使用的过程中有一些“坑”需要我们注意

  • Do()方法是一个同步调用的方法,无法处理下游服务调用的超时情况

解决方案:

使用singleflight的doChan()方法,在service层使用 channel+select 做超时控制.

func enterGetAndSetCacheWithChan(ctx context.Context, key string) (str string, err error) {
   tag := "enterGetAndSetCacheWithChan"
   sonCtx, _ := context.WithTimeout(ctx, 2 * time.Second)
   val := ""
   nums := 10 //协程数
   wg = &sync.WaitGroup{}
   wg.Add(nums)
   for idx := 0; idx < nums; idx++ {
      go func() {
         defer wg.Done()
         val, err = getAndSetCacheWithChan(sonCtx, idx, key)
         if err != nil {
            log.Printf("err:[%+v]", err)
            return
         }
         str = val
      }()
   }
   wg.Wait()
   log.Printf("tag:[%s] val:[%s]", tag, val)
   return
}

func getAndSetCacheWithChan(ctx context.Context, idx int, cacheKey string) (string, error) {
   tag := "getAndSetCacheWithChan"
   log.Printf("tag: %s ;idx %d into-cache...", tag, idx)
   ch := sf.DoChan(cacheKey, func() (ret interface{}, err error) { // do的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB
      log.Printf("idx %v is-setting-cache", idx)
      time.Sleep(100 * time.Millisecond)
      log.Printf("idx %v set-cache-success!", idx)
      return "myValue", nil
   })
   for { // 选择 context + select 超时控制
      select {
      case <-ctx.Done():
         return "", errors.New("ctx-timeout") // 根据业务逻辑选择上抛 error
      case data, _ := <-ch:
         return data.Val.(string), nil
      default:
      }
   }
}
  • 如果第一个请求失败了,那么所有等待的请求都会返回同一个error

解决方案

根据实际情况,结合下游服务调用耗时与下游实际能支持的QPS等数据,对key做定时Forget()。

go func() {
       time.Sleep(100 * time.Millisecond)
       g.Forget(key)
   }()

参考文章

singleflight双重defer: developer.51cto.com/article/652…

到此这篇关于Golang并发工具-Singleflight的文章就介绍到这了,更多相关Golang并发Singleflight内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • golang并发工具MapReduce降低服务响应时间

    目录 前言 并发处理工具MapReduce MapReduce的用法演示 MapReduce使用注意事项 实现原理分析: 文末 前言 在微服务中开发中,api网关扮演对外提供restful api的角色,而api的数据往往会依赖其他服务,复杂的api更是会依赖多个甚至数十个服务.虽然单个被依赖服务的耗时一般都比较低,但如果多个服务串行依赖的话那么整个api的耗时将会大大增加. 那么通过什么手段来优化呢?我们首先想到的是通过并发来的方式来处理依赖,这样就能降低整个依赖的耗时,Go基础库中为我们提供

  • 使用Golang的singleflight防止缓存击穿的方法

    在使用缓存时,容易发生缓存击穿. 缓存击穿:一个存在的key,在缓存过期的瞬间,同时有大量的请求过来,造成所有请求都去读dB,这些请求都会击穿到DB,造成瞬时DB请求量大.压力骤增. singleflight 介绍 import "golang.org/x/sync/singleflight" singleflight类的使用方法就新建一个singleflight.Group,使用其方法Do或者DoChan来包装方法,被包装的方法在对于同一个key,只会有一个协程执行,其他协程等待那个

  • Golang编程并发工具库MapReduce使用实践

    目录 环境 项目需求 mapReduce使用说明 需求实现 业务逻辑 创建任务队列 运行结果 结论 引申阅读 环境 go version go1.16.4 windows/amd64 Intel(R) Core(TM) i7-7820HK CPU @ 2.90GHz 4核心8线程 项目需求 处理数个约5MB的小文件 从源目录读取文件并拷贝到目标目录 计算源文件MD5和目标文件MD5进行对比,如不相同则报错并终止程序执行 mapReduce使用说明 go get -u github.com/tal

  • 解决Golang并发工具Singleflight的问题

    目录 前言 定义 用途 简单Demo 源码分析 结构 对外暴露的方法 重点方法分析 Do 流程图 Forget doCall 实际使用 弊端与解决方案 参考文章 前言 前段时间在一个项目里使用到了分布式锁进行共享资源的访问限制,后来了解到Golang里还能够使用singleflight对共享资源的访问做限制,于是利用空余时间了解,将知识沉淀下来,并做分享 文章尽量用通俗的语言表达自己的理解,从入门demo开始,结合源码分析singleflight的重点方法,最后分享singleflight的实际

  • gorm golang 并发连接数据库报错的解决方法

    底层报错 error:cannot assign requested address 原因 并发场景下 client 频繁请求端口建立tcp连接导致端口被耗尽 解决方案 root执行即可 sysctl -w net.ipv4.tcp_timestamps=1 开启对于TCP时间戳的支持,若该项设置为0,则下面一项设置不起作用 sysctl -w net.ipv4.tcp_tw_recycle=1 表示开启TCP连接中TIME-WAIT sockets的快速回收 以上这篇gorm golang 并

  • Golang源码分析之golang/sync之singleflight

    目录 1.背景 1.1. 项目介绍 1.2.使用方法 2.源码分析 2.1.项目结构 2.2.数据结构 2.3.API代码流程 3.总结 1.背景 1.1. 项目介绍 golang/sync库拓展了官方自带的sync库,提供了errgroup.semaphore.singleflight及syncmap四个包,本次分析singlefliht的源代码.singlefliht用于解决单机协程并发调用下的重复调用问题,常与缓存一起使用,避免缓存击穿. 1.2.使用方法 go get -u golang

  • Golang并发编程重点讲解

    目录 1.通过通信共享 2.Goroutines 3.Channels 3.1 Channel都有哪些特性 3.2 channel 的最佳实践 4.Channels of channels 5.并行(Parallelization) 6.漏桶缓冲区(A leaky buffer) 1.通过通信共享 并发编程是一个很大的主题,这里只提供一些特定于go的重点内容. 在许多环境中,实现对共享变量的正确访问所需要的微妙之处使并发编程变得困难.Go鼓励一种不同的方法,在这种方法中,共享值在通道中传递,实际

  • C#解决SQlite并发异常问题的方法(使用读写锁)

    本文实例讲述了C#解决SQlite并发异常问题的方法.分享给大家供大家参考,具体如下: 使用C#访问sqlite时,常会遇到多线程并发导致SQLITE数据库损坏的问题. SQLite是文件级别的数据库,其锁也是文件级别的:多个线程可以同时读,但是同时只能有一个线程写.Android提供了SqliteOpenHelper类,加入Java的锁机制以便调用.但在C#中未提供类似功能. 作者利用读写锁(ReaderWriterLock),达到了多线程安全访问的目标. using System; usin

  • 详解JUC 常用4大并发工具类

    什么是JUC? JUC就是java.util.concurrent包,这个包俗称JUC,里面都是解决并发问题的一些东西 该包的位置位于java下面的rt.jar包下面 4大常用并发工具类: CountDownLatch CyclicBarrier Semaphore ExChanger CountDownLatch: CountDownLatch,俗称闭锁,作用是类似加强版的Join,是让一组线程等待其他的线程完成工作以后才执行 就比如在启动框架服务的时候,我们主线程需要在环境线程初始化完成之后

  • PHP解决高并发的优化方案实例

    我们通常衡量一个Web系统的吞吐率的指标是QPS(Query Per Second,每秒处理请求数),解决每秒数万次的高并发场景,这个指标非常关键.举个例子,我们假设处理一个业务请求平均响应时间为100ms,同时,系统内有20台Apache的Web服务器,配置MaxClients为500个(表示Apache的最大连接数目). 那么,我们的Web系统的理论峰值QPS为(理想化的计算方式): 20*500/0.1 = 100000 (10万QPS) 咦?我们的系统似乎很强大,1秒钟可以处理完10万的

  • golang 并发编程之生产者消费者详解

    golang 最吸引人的地方可能就是并发了,无论代码的编写上,还是性能上面,golang 都有绝对的优势 学习一个语言的并发特性,我喜欢实现一个生产者消费者模型,这个模型非常经典,适用于很多的并发场景,下面我通过这个模型,来简单介绍一下 golang 的并发编程 go 并发语法 协程 go 协程是 golang 并发的最小单元,类似于其他语言的线程,只不过线程的实现借助了操作系统的实现,每次线程的调度都是一次系统调用,需要从用户态切换到内核态,这是一项非常耗时的操作,因此一般的程序里面线程太多会

随机推荐