Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)

前言

同步适合多个连续执行的,每一步的执行依赖于上一步操作,异步执行则和任务执行顺序无关(如从10个站点抓取数据)

同步执行类RunnerAsync

支持返回超时检测,系统中断检测

错误常量定义

//超时错误
var ErrTimeout = errors.New("received timeout")
//操作系统系统中断错误
var ErrInterrupt = errors.New("received interrupt")

实现代码如下

package task
import (
 "os"
 "time"
 "os/signal"
 "sync"
)

//异步执行任务
type Runner struct {
 //操作系统的信号检测
 interrupt chan os.Signal
 //记录执行完成的状态
 complete chan error
 //超时检测
 timeout <-chan time.Time
 //保存所有要执行的任务,顺序执行
 tasks []func(id int) error
 waitGroup sync.WaitGroup
 lock sync.Mutex
 errs []error
}

//new一个Runner对象
func NewRunner(d time.Duration) *Runner {
 return &Runner{
 interrupt: make(chan os.Signal, 1),
 complete: make(chan error),
 timeout: time.After(d),
 waitGroup: sync.WaitGroup{},
 lock: sync.Mutex{},
 }
}

//添加一个任务
func (this *Runner) Add(tasks ...func(id int) error) {
 this.tasks = append(this.tasks, tasks...)
}

//启动Runner,监听错误信息
func (this *Runner) Start() error {
 //接收操作系统信号
 signal.Notify(this.interrupt, os.Interrupt)
 //并发执行任务
 go func() {
 this.complete <- this.Run()
 }()
 select {
 //返回执行结果
 case err := <-this.complete:
 return err
 //超时返回
 case <-this.timeout:
 return ErrTimeout
 }
}

//异步执行所有的任务
func (this *Runner) Run() error {
 for id, task := range this.tasks {
 if this.gotInterrupt() {
  return ErrInterrupt
 }
 this.waitGroup.Add(1)
 go func(id int) {
  this.lock.Lock()
  //执行任务
  err := task(id)
  //加锁保存到结果集中
  this.errs = append(this.errs, err)

  this.lock.Unlock()
  this.waitGroup.Done()
 }(id)
 }
 this.waitGroup.Wait()

 return nil
}

//判断是否接收到操作系统中断信号
func (this *Runner) gotInterrupt() bool {
 select {
 case <-this.interrupt:
 //停止接收别的信号
 signal.Stop(this.interrupt)
 return true
 //正常执行
 default:
 return false
 }
}

//获取执行完的error
func (this *Runner) GetErrs() []error {
 return this.errs
}

使用方法    

Add添加一个任务,任务为接收int类型的一个闭包

Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作)

测试示例代码

package task
import (
 "testing"
 "time"
 "fmt"
 "os"
 "runtime"
)

func TestRunnerAsync_Start(t *testing.T) {
 //开启多核
 runtime.GOMAXPROCS(runtime.NumCPU())
 //创建runner对象,设置超时时间
 runner := NewRunnerAsync(8 * time.Second)
 //添加运行的任务
 runner.Add(
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 )
 fmt.Println("同步执行任务")
 //开始执行任务
 if err := runner.Start(); err != nil {
 switch err {
 case ErrTimeout:
  fmt.Println("执行超时")
  os.Exit(1)
 case ErrInterrupt:
  fmt.Println("任务被中断")
  os.Exit(2)
 }
 }
 t.Log("执行结束")
}

//创建要执行的任务
func createTaskAsync() func(id int) {
 return func(id int) {
 fmt.Printf("正在执行%v个任务\n", id)
 //模拟任务执行,sleep两秒
 //time.Sleep(1 * time.Second)
 }
}

执行结果  

同步执行任务
正在执行0个任务
正在执行1个任务
正在执行2个任务
正在执行3个任务
正在执行4个任务
正在执行5个任务
正在执行6个任务
正在执行7个任务
正在执行8个任务
正在执行9个任务
正在执行10个任务
正在执行11个任务
正在执行12个任务
 runnerAsync_test.go:49: 执行结束

异步执行类Runner

支持返回超时检测,系统中断检测

实现代码如下

package task
import (
 "os"
 "time"
 "os/signal"
 "sync"
)

//异步执行任务
type Runner struct {
 //操作系统的信号检测
 interrupt chan os.Signal
 //记录执行完成的状态
 complete chan error
 //超时检测
 timeout <-chan time.Time
 //保存所有要执行的任务,顺序执行
 tasks []func(id int) error
 waitGroup sync.WaitGroup
 lock sync.Mutex
 errs []error
}

//new一个Runner对象
func NewRunner(d time.Duration) *Runner {
 return &Runner{
  interrupt: make(chan os.Signal, 1),
  complete: make(chan error),
  timeout: time.After(d),
  waitGroup: sync.WaitGroup{},
  lock:  sync.Mutex{},
 }
}

//添加一个任务
func (this *Runner) Add(tasks ...func(id int) error) {
 this.tasks = append(this.tasks, tasks...)
}

//启动Runner,监听错误信息
func (this *Runner) Start() error {
 //接收操作系统信号
 signal.Notify(this.interrupt, os.Interrupt)
 //并发执行任务
 go func() {
  this.complete <- this.Run()
 }()
 select {
 //返回执行结果
 case err := <-this.complete:
  return err
  //超时返回
 case <-this.timeout:
  return ErrTimeout
 }
}

//异步执行所有的任务
func (this *Runner) Run() error {
 for id, task := range this.tasks {
  if this.gotInterrupt() {
   return ErrInterrupt
  }
  this.waitGroup.Add(1)
  go func(id int) {
   this.lock.Lock()
   //执行任务
   err := task(id)
   //加锁保存到结果集中
   this.errs = append(this.errs, err)
   this.lock.Unlock()
   this.waitGroup.Done()
  }(id)
 }
 this.waitGroup.Wait()
 return nil
}

//判断是否接收到操作系统中断信号
func (this *Runner) gotInterrupt() bool {
 select {
 case <-this.interrupt:
  //停止接收别的信号
  signal.Stop(this.interrupt)
  return true
  //正常执行
 default:
  return false
 }
}

//获取执行完的error
func (this *Runner) GetErrs() []error {
 return this.errs
}

使用方法    

Add添加一个任务,任务为接收int类型,返回类型error的一个闭包

Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作)

getErrs获取所有的任务执行结果

测试示例代码

package task
import (
 "testing"
 "time"
 "fmt"
 "os"
 "runtime"
)

func TestRunner_Start(t *testing.T) {
 //开启多核心
 runtime.GOMAXPROCS(runtime.NumCPU())
 //创建runner对象,设置超时时间
 runner := NewRunner(18 * time.Second)
 //添加运行的任务
 runner.Add(
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
 )
 fmt.Println("异步执行任务")
 //开始执行任务
 if err := runner.Start(); err != nil {
  switch err {
  case ErrTimeout:
   fmt.Println("执行超时")
   os.Exit(1)
  case ErrInterrupt:
   fmt.Println("任务被中断")
   os.Exit(2)
  }
 }
 t.Log("执行结束")
 t.Log(runner.GetErrs())
}

//创建要执行的任务
func createTask() func(id int) error {
 return func(id int) error {
  fmt.Printf("正在执行%v个任务\n", id)
  //模拟任务执行,sleep
  //time.Sleep(1 * time.Second)
  return nil
 }
}

执行结果

异步执行任务
正在执行2个任务
正在执行1个任务
正在执行4个任务
正在执行3个任务
正在执行6个任务
正在执行5个任务
正在执行9个任务
正在执行7个任务
正在执行10个任务
正在执行13个任务
正在执行8个任务
正在执行11个任务
正在执行12个任务
正在执行0个任务
 runner_test.go:49: 执行结束
 runner_test.go:51: [<nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil>]

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • go语言同步教程之条件变量

    Go的标准库中有一个类型叫条件变量:sync.Cond.这种类型与互斥锁和读写锁不同,它不是开箱即用的,它需要与互斥锁组合使用: // NewCond returns a new Cond with Locker l. func NewCond(l Locker) *Cond { return &Cond{L: l} } // A Locker represents an object that can be locked and unlocked. type Locker interface

  • Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)

    前言 同步适合多个连续执行的,每一步的执行依赖于上一步操作,异步执行则和任务执行顺序无关(如从10个站点抓取数据) 同步执行类RunnerAsync 支持返回超时检测,系统中断检测 错误常量定义 //超时错误 var ErrTimeout = errors.New("received timeout") //操作系统系统中断错误 var ErrInterrupt = errors.New("received interrupt") 实现代码如下 package ta

  • Go语言Zap库Logger的定制化和封装详解

    目录 前言 Go 语言原生的Logger Go 语言原生Logger的缺点 Zap 日志库 Zap 的使用方法 安装zap 设置 Logger 定制 Zap 的 Logger 日志切割 封装 Logger 总结 前言 日志无论对于程序还是程序员都非常重要,有多重要呢,想要长期在公司健健康康的干下去就得学会阶段性划水,阶段性划水的一大关键的就是干活快过预期但是装作...不对,这个开头不对劲,下面重来. 日志无论对于程序还是程序员都非常重要,程序员解决问题的快慢除了经验外,就是看日志能不能有效地记录

  • Go语言学习教程之goroutine和通道的示例详解

    目录 goroutine 通道 Range 和 Close Select 官方留的两道练习题 等价的二叉树 网络爬虫 源码地址 goroutine goroutine是由Go运行时管理的轻量级线程. go f(x, y, z)在一个新的goroutine中开始执行f(x, y,z). goroutines运行在相同的地址空间中,所以对共享的内存访问必须同步.sync包提供了基本的同步原语(synchronization primitives),比如互斥锁(mutual exclusion loc

  • Go语言基础for循环语句的用法及示例详解

    目录 概述 语法 注意 示例一  死循环,读取文件 示例二  打印乘法表 示例三  遍历字符串,数组,map 概述 for循环是一个循环控制结构,可以执行指定次数的循环 语法 第一种 for {} //无线循环 第二种 for 条件语句{} 第三种 for 初始语句; 条件语句; 迭代后语句 {} 第四种 for key,value:=range 表达式/变量{} 注意 省略初始条件,相当于while循环体必须用 { } 括起来初始化语句和后置语句是可选的如果只剩下条件表达式了,那么那两个分号也

  • Go语言基础switch条件语句基本用法及示例详解

    目录 概述 语法 第一种[switch 带上表达式] 第二种[switch 不带表达式] 第三种[switch 初始化,表达式] 注意 示例一[根据今天的日期打印今天星期几] 示例二[根据分数打印A,B,C,D] 示例三[算数] 概述 传入条件的不同,会执行不同的语句 每一个case分支都是唯一的,从上到下逐一测试,直到匹配为止. 语法 第一种[switch 带上表达式] switch 表达式 { case 表达式1,表达式2, ... : 语句块1 case 表达式3, 表达式4, ... :

  • C语言数据结构二叉树之堆的实现和堆排序详解

    目录 一.本章重点 二.堆 2.1堆的介绍 2.2堆的接口实现 三.堆排序 一.本章重点 堆的介绍 堆的接口实现 堆排序 二.堆 2.1堆的介绍 一般来说,堆在物理结构上是连续的数组结构,在逻辑结构上是一颗完全二叉树. 但要满足 每个父亲节点的值都得大于孩子节点的值,这样的堆称为大堆. 每个父亲节点的值都得小于孩子节点的值,这样的堆称为小堆. 那么以下就是一个小堆. 百度百科: 堆的定义如下:n个元素的序列{k1,k2,ki,…,kn}当且仅当满足下关系时,称之为堆. 若将和此次序列对应的一维数

  • C语言多线程开发中死锁与读写锁问题详解

    目录 死锁 读写锁 死锁 有时,一个线程需要同时访问两个或更多不同的共享资源,而每个资源又都由不同的互斥量管理.当超过一个线程加锁同一组互斥量时,就有可能发生死锁: 两个或两个以上的进程在执行过程中,因争夺共享资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去.此时称系统处于死锁状态或系统产生了死锁. 死锁的几种场景: 忘记释放锁 重复加锁(重复加相同的锁) 多线程多锁,抢占锁资源 //多线程多锁,抢占锁资源 #include <stdio.h> #include <pt

  • C语言线性表的链式表示及实现详解

    目录 前言 代码实现 1. 单链表的结点构造 2. 构造一个空的头结点 3. 对线性表进行赋值 4.对线性表进行销毁 5.对线性表进行重置 6.判断线性表是否为空 7.获取线性表的长度 8.获取线性表某一位置对应的元素 9.在线性表某一位置插入元素 10.删除线性表某一位置的元素 11.求线性表某一元素的前驱 12.求线性表某一元素的后继 13.打印线性表 运行结果演示 源码 前言 线性表的顺序表示指的是用一组地址连续的存储单元依次存储线性表的数据元素,而线性表的链式存储特点则是用一组任意的存储

  • C语言实现短字符串压缩的三种方法详解

    目录 前言 一.通用算法的短字符压缩 二.短字符串压缩 (1)Smaz (2)Shoco (3)Unisox2 三.总结 前言 上一篇探索了LZ4的压缩和解压性能,以及对LZ4和ZSTD的压缩.解压性能进行了横向对比.文末的最后也给了一个彩蛋:任意长度的字符串都可以被ZSTD.LZ4之类的压缩算压缩得很好吗? 本篇我们就来一探究竟. 一.通用算法的短字符压缩 开门见山,我们使用一段比较短的文本:Narrator: It is raining today. So, Peppa and George

  • C语言中回调函数和qsort函数的用法详解

    目录 回调函数 指向函数指针数组的指针 qsort(qulick sort)-库函数 回调函数 通过函数指针调用的函数,如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指针被用来调用其所指向的函数时,我们就说这是回调函数. 回调函数不是由该函数的实现方直接调用,而是在特定的事件或条件发生时由另外的一方调用的,用于对该事件或条件进行响应. 举例: #include<stdio.h> void menu() { printf("*************************

随机推荐