Go批量操作excel导入到mongodb的技巧

目录
  • 1. 选择命令行包
  • 2. 读取配置,连接数据库
  • 3. 读取文件
    • 3.1. 并发读
    • 3.2. 使用excelize处理excel
    • 3.3. 使用mpb在命令行输出进度显示
  • 4. 写入mongodb
  • 5. 同步mysql
  • 6. 总结

需求:完成一个命令工具,批量处理某个目录下面的一些excel,将这些excel数据导入到mongodb,同时可以同步到mysql

:: 花了一天时间写完代码,代码库位置:https://gitee.com/foz/lib/tree/master/ecc

代码目录:

├─cmd
|  └─ecc.go     # 命令
├─configs
├─data
├─internal
│  └─importing  # 主要逻辑处理
├─pkg           # 处理文件读取、连接数据库等
│  ├─files
│  ├─mongo
│  └─mysql
├─queue
└─tools

1. 选择命令行包

平常使用的的命令工具包有:

  • urfave/cli
  • spf13/cobra

这里使用的是urfave/cli包,比较简单

var DirPath = "../data"     // 默认位置
var dir = DirPath
app := &cli.App{
		Name:  "Ecc",
		Usage: "Ecc is a tools for batch processing of excel data",
		Flags: []cli.Flag{
			&cli.StringFlag{
				Name:        "model",
				Aliases:     []string{"m"},
				Usage:       "The model of searching",
				Value:       "model",
				Destination: &model,
			},
			&cli.StringFlag{    // 设置一个 -d 的参数,用来确定目标文件夹位置
				Name:        "dir",
				Aliases:     []string{"d"},
				Usage:       "Folder location of data files",
				Destination: &dir,
				Value:       DirPath,
		},
		Action: func(c *cli.Context) error {
			importing.Load("../configs/cfg.yaml")  // 引入配置文件,读取mongodb、mysql等配置
			importing.Handle(dir)  ## 具体逻辑处理
			return nil
	}

2. 读取配置,连接数据库

读取配置使用spf13/viper库,需要读取一下配置,连接mongodb

var C Config

type Config struct {
	Env   string `yaml:"env"`
	Mongo struct {
		DNS        string `yaml:"dns"`
		Db         string `yaml:"db"`
		Collection string `yaml:"collection"`
	} `yaml:"mongo"`
	Mysql struct {
		Alias string `yaml:"alias"`
		Dns   string `yaml:"dns"`
	} `yaml:"mysql"`
}
func Load(cf string) {
	var err error
	viper.SetConfigFile(cf)
	if err = viper.ReadInConfig(); err != nil {
		log.Fatal(fmt.Errorf("fatal error config file: %s \n", err))
	}
	if err = viper.Unmarshal(&configs.C); err != nil {
		log.Fatal(fmt.Errorf("unmarshal conf failed, err:%s \n", err))
	if err = mongo.Conn(configs.C.Mongo.DNS, configs.C.Mongo.Db); err != nil {
		log.Fatal(color.RedString("%s:\n%v", "mongo connect err", err))
	if mongo.CheckCollection(configs.C.Mongo.Collection) {
		if err = mongo.DelCollection(configs.C.Mongo.Collection); err != nil {
			log.Fatal(color.RedString("%s:\n%v", "mongo del collection err", err))
		}
	if err = mongo.CreateCollection(configs.C.Mongo.Collection); err != nil {
		log.Fatal(color.RedString("%s:\n%v", "mongo create collection err", err))

3. 读取文件

先确定文件权限以及文件是否存在

func ReadDir(dir string) ([]os.FileInfo, error) {
	perm := checkPermission(dir)
	if perm == true {
		return nil, fmt.Errorf("permission denied dir: %s", dir)
	}

	if isNotExistDir(dir) {
		return nil, fmt.Errorf("does not exist dir: %s", dir)
	files, err := ioutil.ReadDir(dir)
	if err == nil {
		return files, err
	return nil, fmt.Errorf("ReadDir: %s, err: %v", dir, err)
}

拿到文件后就要并发读取每个excel文件数据

这里需求是一次任务必须读完所有的文件,任何一个文件有错误就退出程序。

:: 所以需要定义异常退出信道和一个完成读取两个信道,总的数据使用sync.Map安全并发写入。

3.1. 并发读

rWait   = true
rDone   = make(chan struct{})
rCrash  = make(chan struct{})

read(f, dir, data)
for rWait {  		// 使用for循环来阻塞读文件
	select {
	case <-rCrash:
		abort("-> Failure")
		return
	case <-rDone:
		rWait = false
	}
}
func read(fs []os.FileInfo, dir string, data *sync.Map) {
	for _, file := range fs {
		fileName := file.Name()
		_ext := filepath.Ext(fileName)
		if Include(strings.Split(Exts, ","), _ext) {
			wg.Add(1)
			inCh := make(chan File)
			go func() {
				defer wg.Done()
				select {
				case <-rCrash:
					return // 退出goroutine
				case f := <-inCh:
					e, preData := ReadExcel(f.FilePath, f.FileName, pb)
					if e != nil {
						tools.Red("%v", e)
						// 使用sync.once防止多个goroutine关闭同一个信道
						once.Do(func() {
							close(rCrash)
						})
						return
					}
					data.Store(f.FileName, preData)
				}
			}()
				inCh <- File{
					FileName: fileName,
					FilePath: dir + string(os.PathSeparator) + fileName,
		}
	go func() {
		wg.Wait()
		close(rDone)
	}()

3.2. 使用excelize处理excel

excelize是一个非常好用的excel处理库,这里使用这个库读取excel文件内容

type ExcelPre struct {
	FileName    string
	Data        [][]string
	Fields      []string
	Prefixes    string
	ProgressBar *mpb.Bar  // 进度条对象
}

func ReadExcel(filePath, fileName string, pb *mpb.Progress) (err error, pre *ExcelPre) {
	f, err := excelize.OpenFile(filePath)
	if err != nil {
		return err, nil
	}
	defer func() {
		if _e := f.Close(); _e != nil {
			fmt.Printf("%s: %v.\n\n", filePath, _e)
		}
	}()
	// 获取第一页数据
	firstSheet := f.WorkBook.Sheets.Sheet[0].Name
	rows, err := f.GetRows(firstSheet)
	lRows := len(rows)
	if lRows < 2 {
		lRows = 2
	rb := ReadBar(lRows, filePath, pb)
	wb := WriteBar(lRows-2, filePath, rb, pb)
	var fields []string
	var data [][]string
        // 进度条增加一格
	InCr := func(start time.Time) {
		rb.Increment()
		rb.DecoratorEwmaUpdate(time.Since(start))
	for i := 0; i < lRows; i++ {
		InCr(time.Now())
		// 这里对第一行处理,用来判断一些约定的条件
		if i == 0 {
			fields = rows[i]
			for index, field := range fields {
				if isChinese := regexp.MustCompile("[\u4e00-\u9fa5]"); isChinese.MatchString(field) || field == "" {
					err = errors.New(fmt.Sprintf("%s: line 【A%d】 field 【%s】 \n", filePath, index, field) + "The first line of the file is not a valid attribute name.")
					return err, nil
				}
			}
			continue
		// 过滤第二行,这一行通常是中文解释字段
		if i == 1 {
		data = append(data, rows[i])
	return nil, &ExcelPre{
		FileName:    fileName,
		Data:        data,
		Fields:      fields,
		Prefixes:    Prefix(fileName),
		ProgressBar: wb,

3.3. 使用mpb在命令行输出进度显示

mpb是一个很好用的命令行进度输出库,上面代码里里有两个进度条,一个是读进度条,第二个是写进度条,读进度条在文件读取的时候就显示了,返回的结构体里有写进度条对象,便于后面写操作时候显示。

下面是两个进度条显示的配置,具体参数可以看这个库的文档。

func ReadBar(total int, name string, pb *mpb.Progress) *mpb.Bar {
	return pb.AddBar(int64(total),
		mpb.PrependDecorators(
			decor.OnComplete(decor.Name(color.YellowString("reading"), decor.WCSyncSpaceR), color.YellowString("waiting")),
			decor.CountersNoUnit("%d / %d", decor.WCSyncWidth, decor.WCSyncSpaceR),
		),
		mpb.AppendDecorators(
			decor.NewPercentage("%.2f:", decor.WCSyncSpaceR),
			decor.EwmaETA(decor.ET_STYLE_MMSS, 0, decor.WCSyncWidth),
			decor.Name(": "+name),
	)
}

func WriteBar(total int, name string, beforeBar *mpb.Bar, pb *mpb.Progress) *mpb.Bar {
		mpb.BarQueueAfter(beforeBar, false),
		mpb.BarFillerClearOnComplete(),
			decor.OnComplete(decor.Name(color.YellowString("writing"), decor.WCSyncSpaceR), color.GreenString("done")),
			decor.OnComplete(decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR), ""),
			decor.OnComplete(decor.NewPercentage("%.2f:", decor.WCSyncSpaceR), ""),
			decor.OnComplete(decor.EwmaETA(decor.ET_STYLE_MMSS, 0, decor.WCSyncWidth), ""),
			decor.OnComplete(decor.Name(": "+name), name),

4. 写入mongodb

同写入操作,这里拿到所有数据,然后使用goroutine并发写入mongodb,在处理数据时候需要查重,还需要记录一下本次操作插入了哪些数据的_id值,在报错的时候进行删除(这里可以使用事务,直接删除简单些),所以定义了一个Shuttle结构体用来在记录并发时的数据。

wWait   = true
wDone   = make(chan struct{})
wCrash  = make(chan struct{})

type Shuttle struct {
	Hid []string  	// 用来判断是否是重复数据
	Mid []string  	// 用来记录本次插入的数据_id
	mu  sync.Mutex
}
func (s *Shuttle) Append(t string, str string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	switch t {
	case "h":
		s.Hid = append(s.Hid, str)
	case "m":
		s.Mid = append(s.Mid, str)
	}
write2mongo(data)
for wWait {
	select {
	case <-wCrash:
		abort("-> Failure")
		return
	case <-wDone:
		wWait = false
func write2mongo(data *sync.Map) {
	collection := mongo.GetCollection(configs.C.Mongo.Collection)
	data.Range(func(key, value interface{}) bool {
		if v, ok := value.(*ExcelPre); ok {
			wg.Add(1)
			inCh := make(chan []bson.M)
			go func() {
				defer wg.Done()
				select {
				case <-wCrash:
					return // exit
				case rows := <-inCh:
					e := Write2Mongo(rows, collection, v, &shuttle)
					if e != nil {
						tools.Red("%v", e)
						once.Do(func() {
							close(wCrash)
						})
						return
					}
				}
			}()
				inCh <- PreWrite(v)
		}
		return true
	})
	go func() {
		wg.Wait()
		close(wDone)
	}()
// 具体处理逻辑
func Write2Mongo(rows []bson.M, collection *mongoDb.Collection, v *ExcelPre, s *Shuttle) error {
	v.ProgressBar.SetCurrent(0)
	incr := func(t time.Time, b *mpb.Bar, n int64) {
		b.IncrInt64(n)
		b.DecoratorEwmaUpdate(time.Since(t))
	for _, row := range rows {
		start := time.Now()
		key := v.Prefixes + "@@" + row["_hid"].(string)
		s.mu.Lock()
		if Include(s.Hid, key) {
			s.mu.Unlock()
			incr(start, v.ProgressBar, 1)
			continue
		} else {
			s.Hid = append(s.Hid, key)
		var err error
		var id primitive.ObjectID
		if id, err = mongo.CreateDocs(collection, row); err != nil {
			return errors.New(fmt.Sprintf("%s:\n%v", "mongo create docs err", err))
		s.Append("m", id.Hex())
		incr(start, v.ProgressBar, 1)
	return nil

5. 同步mysql

因为同步mysql不是必要的,这里使用命令行输入进行判断:

tools.Yellow("-> Whether to sync data to mysql? (y/n)")
if !tools.Scan("aborted") {
	return
} else {
	tools.Yellow("-> Syncing data to mysql...")
	if err = write2mysql(); err != nil {
		tools.Red("-> Failure:" + err.Error())
	} else {
		tools.Green("-> Success.")
	}
}

连接mysql数据库,拿到当前monogodb的数据:

func write2mysql() error {
	if err := mysql.Conn(configs.C.Mysql.Dns); err != nil {
		return err
	}

	d, err := mongo.GetCollectionAllData(configs.C.Mongo.Collection)
	if err != nil {
	err = Write2Mysql(d)
	return err
}

创建表,直接拼sql就完事了:

func CreateTable(tableName string, fields []string) error {
	var err error
	delSql := fmt.Sprintf("DROP TABLE IF EXISTS `%s`", tableName)
	err = Db.Exec(delSql).Error
	if err != nil {
		return err
	}

	s := "id bigint(20) NOT NULL PRIMARY KEY"
	for _, field := range fields {
		s += fmt.Sprintf(",%s varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL", field)
	sql := fmt.Sprintf("CREATE TABLE `%s` (%s) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci", tableName, s)
	err = Db.Exec(sql).Error
	return nil
}

插入数据,bson.M本身就是一个map,转一下使用gorm分批插入数据,速度快一点:

func InsertData(tableName string, fields []string, data []bson.M) error {
	var err error
	var maps []map[string]interface{}
	for _, d := range data {
		row := make(map[string]interface{})
		for _, field := range fields {
			row[field] = d[field]
		}
		if row != nil {
			row["id"] = d["id"].(string)
			maps = append(maps, row)
	}

	if len(maps) > 0 {
		err = Db.Table(tableName).CreateInBatches(maps, 100).Error
		if err != nil {
			return err
	return err
}

6. 总结

做为golang新手,看了很多文档、文章,好似懂了,其实啥都不懂。

到此这篇关于Go批量操作excel导入到mongo的文章就介绍到这了,更多相关Go excel导入mongo内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • MongoDB数据库部署环境准备及使用介绍

    目录 一.MongoDB 简介 1.MongoDB 特点 2.MongoDB 适用场景 3.MongoDB 存储结构 4.MongoDB 数据类型 二.部署 MongoDB 数据库应用 1.准备系统环境 2.安装 MongoDB 3.创建 MongoDB 存储目录以及配置文件 4.编写服务启动脚本 一.MongoDB 简介 MongoDB 是由 C++ 语言编写的,是一个基于分布式文件存储的非关系型开源数据库系统.其优势在于可以存放海量数据,具备强大的查询功能,是一个独立的面向集合文档形式的.

  • mongoDB数据库索引快速入门指南

    目录 MongoDB 索引 1. 开始与准备数据 2. 创建索引前 3. 创建索引 createIndex 4. 创建索引后 6.唯一索引与符合索引 ①唯一索引 ②复合索引 MongoDB 索引 索引通常能够极大的提高查询的效率,如果没有索引,MongoDB在读取数据时必须扫描集合中的每个文件并选取那些符合查询条件的记录. 这种扫描全集合的查询效率是非常低的,特别在处理大量的数据时,查询可以要花费几十秒甚至几分钟,这对网站的性能是非常致命的. 索引是特殊的数据结构,索引存储在一个易于遍历读取的数

  • Go批量操作excel导入到mongodb的技巧

    目录 1. 选择命令行包 2. 读取配置,连接数据库 3. 读取文件 3.1. 并发读 3.2. 使用excelize处理excel 3.3. 使用mpb在命令行输出进度显示 4. 写入mongodb 5. 同步mysql 6. 总结 需求:完成一个命令工具,批量处理某个目录下面的一些excel,将这些excel数据导入到mongodb,同时可以同步到mysql :: 花了一天时间写完代码,代码库位置:https://gitee.com/foz/lib/tree/master/ecc 代码目录:

  • C#实现Excel导入sqlite的方法

    本文实例讲述了C#实现Excel导入sqlite的方法,是非常实用的技巧.分享给大家供大家参考.具体方法如下: 首先需要引用system.date.sqlite 具体实现代码如下: system.date.sqlite system.date.sqlite.linq //导入--Excel导入sqlite private void button2_Click(object sender, EventArgs e) { DAL.Sqlite da = new DAL.Sqlite("DataByE

  • Java实现Excel导入导出数据库的方法示例

    本文实例讲述了Java实现Excel导入导出数据库的方法.分享给大家供大家参考,具体如下: 由于公司需求,想通过Excel导入数据添加到数据库中,而导入的Excel的字段是不固定的,使用得通过动态创建数据表,每个Excel对应一张数据表,怎么动态创建数据表,可以参考前面一篇<java使用JDBC动态创建数据表及SQL预处理的方法>. 下面主要讲讲怎么将Excel导入到数据库中,直接上代码:干货走起~~ ExcellToObjectUtil 类 主要功能是讲Excel中的数据导入到数据库中,有几

  • ThinkPHP 框架实现的读取excel导入数据库操作示例

    本文实例讲述了ThinkPHP 框架实现的读取excel导入数据库操作.分享给大家供大家参考,具体如下: 入口文件中: require_once VENDOR_PATH.'PHPExcel/PHPExcel/IOFactory.php'; require_once VENDOR_PATH.'PHPExcel/PHPExcel.php'; PHP: namespace Home\Controller; class ExcelController extends CommonController {

  • Python批量操作Excel文件详解

    目录 批量操作 OS模块介绍 OS模块基本操作 获取当前工作路径 获取一个文件夹下的所有文件名 对文件名进行重命名 创建一个文件夹 删除一个文件夹 删除一个文件 利用OS模块进行批量操作 批量读取一个文件下的多个文件 批量创建文件夹 批量重命名文件 其他批量操作 批量合并多个文件 将一份文件按照指定列拆分成多个文件 批量操作 OS模块介绍 OS的全称是Operation System,指操作系统.在Python里面OS模块中主要提供了与操作系统即电脑系统之间进行交互的一些功能.我们很多的自动化操

  • Excel导入数据库时出现的文本截断问题解决方案

    问题 在把Excel导入到数据库中时,发生文本截断问题:即导入的数据每行只有一部分,原始的Excel数据为:     忽略错误 导入SQLServer2008过程中,如果源数据和目标数据类型不匹配会导入失败,所以导入数据时会忽略错误,这次导入题库也是,结果出现了这样的问题:  仔细观察你会发现,题目不完整,最长为25个汉字,很明显这是varchar(50)数据类型,但是源数据和目的数据明明都为文本类型. 不忽略错误 如果不忽略错误,导入数据最后一步会产生这样的问题: 复制代码 代码如下: -正在

  • C# 中Excel导入时判断是否被占用三种方法

    C# 中Excel导入时 判断是否被占用三种方法 Excel导入时 判断是否被占用,三种方法: 1:Win7可以,WIN10不可以 try { //原理,如果文件可以被移动,说明未被占用 string strPath = "C:\\123OK.Excel"; string strPath2 = "C:\\123OK22.Excel"; File.Move(strPath, strPath2); File.Move(strPath2, strPath); } catc

  • java实现excel导入数据的工具类

    导入Excel数据的工具类,调用也就几行代码,很简单的. 复制代码 代码如下: import jxl.Cell;import jxl.Sheet;import jxl.Workbook;import jxl.read.biff.BiffException;import org.apache.commons.beanutils.BeanUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory; import java.io.IOExc

  • thinkPHP实现将excel导入到数据库中的方法

    本文实例讲述了thinkPHP实现将excel导入到数据库中的方法.分享给大家供大家参考,具体如下: PHPExcel插件可点击此处本站下载. 这里使用的是thinkphp框架的3.1版本,下载好压缩包,框架中的extend中的vendor文件夹中新建一个名为PHPExcel的文件夹,把classes里面的内容放到里面 下面是前端页面 提示:我在测试的时候遇到报错exception 'PHPExcel_Reader_Exception' with message 'The filename 原因

  • 使用phpexcel类实现excel导入mysql数据库功能(实例代码)

    下载phpexcel文件,地址:phpexcel.codeplex.com/ 代码示例 require_once 'phpexcel/Classes/PHPExcel.php'; require_once 'phpexcel/Classes/PHPExcel/IOFactory.php'; require_once 'phpexcel/Classes/PHPExcel/Reader/Excel5.php'; $objReader = PHPExcel_IOFactory::createReade

随机推荐