Golang处理parquet文件实战指南

目录
  • 前言
  • 创建结构体
  • 生成parquet文件
  • 读取parquet文件
  • 计算列平均值
  • 总结

前言

Parquet是Apache基金会支持的项目,是面向列存储二进制文件格式。支持不同类型的压缩方式,广泛用于数据科学和大数据环境,如Hadoop生态。

本文主要介绍Go如何生成和处理parquet文件。

创建结构体

首先创建struct,用于表示要处理的数据:

type user struct {
  ID        string    `parquet:"name=id, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  FirstName string    `parquet:"name=firstname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  LastName  string    `parquet:"name=lastname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Email     string    `parquet:"name=email, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Phone     string    `parquet:"name=phone, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Blog      string    `parquet:"name=blog, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Username  string    `parquet:"name=username, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Score     float64   `parquet:"name=score, type=DOUBLE"`
  CreatedAt time.Time //wont be saved in the parquet file
}

这里要提醒的是tag,用于说明struct中每个字段在生成parquet过程中如何被处理。

parquet-go包可以处理parquet数据,更多的tag可以参考其官网。

生成parquet文件

下面现给出生成parquet文件的代码,然后分别进行说明:

package main

import (
  "fmt"
  "log"
  "time"
  "github.com/bxcodec/faker/v3"
  "github.com/xitongsys/parquet-go-source/local"
  "github.com/xitongsys/parquet-go/parquet"
  "github.com/xitongsys/parquet-go/reader"
  "github.com/xitongsys/parquet-go/writer"
)

type user struct {
  ID        string    `parquet:"name=id, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  FirstName string    `parquet:"name=firstname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  LastName  string    `parquet:"name=lastname, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Email     string    `parquet:"name=email, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Phone     string    `parquet:"name=phone, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Blog      string    `parquet:"name=blog, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Username  string    `parquet:"name=username, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
  Score     float64   `parquet:"name=score, type=DOUBLE"`
  CreatedAt time.Time //wont be saved in the parquet file
}

const recordNumber = 10000

func main() {
  var data []*user
  //create fake data
  for i := 0; i < recordNumber; i++ {
    u := &user{
      ID:        faker.UUIDDigit(),
      FirstName: faker.FirstName(),
      LastName:  faker.LastName(),
      Email:     faker.Email(),
      Phone:     faker.Phonenumber(),
      Blog:      faker.URL(),
      Username:  faker.Username(),
      Score:     float64(i),
      CreatedAt: time.Now(),
    }
    data = append(data, u)
  }
  err := generateParquet(data)
  if err != nil {
    log.Fatal(err)
  }

}

func generateParquet(data []*user) error {
  log.Println("generating parquet file")
  fw, err := local.NewLocalFileWriter("output.parquet")
  if err != nil {
    return err
  }
  //parameters: writer, type of struct, size
  pw, err := writer.NewParquetWriter(fw, new(user), int64(len(data)))
  if err != nil {
    return err
  }
  //compression type
  pw.CompressionType = parquet.CompressionCodec_GZIP
  defer fw.Close()
  for _, d := range data {
    if err = pw.Write(d); err != nil {
      return err
    }
  }
  if err = pw.WriteStop(); err != nil {
    return err
  }
  return nil
}

定义结构体上面已经说明,但需要提醒的是类型与文档保持一致:

Primitive Type Go Type
BOOLEAN bool
INT32 int32
INT64 int64
INT96(deprecated) string
FLOAT float32
DOUBLE float64
BYTE_ARRAY string
FIXED_LEN_BYTE_ARRAY string

接着就是使用faker包生成模拟数据。然后调用err := generateParquet(data)方法。该方法大概逻辑为:

  • 首先准备输出文件,然后基于本地输出文件构造pw,用于写parquet数据:
  fw, err := local.NewLocalFileWriter("output.parquet")
  if err != nil {
    return err
  }
  //parameters: writer, type of struct, size
  pw, err := writer.NewParquetWriter(fw, new(user), int64(len(data)))
  if err != nil {
    return err
  }

  //compression type
  pw.CompressionType = parquet.CompressionCodec_GZIP
  defer fw.Close()

然后设置压缩类型,并通过defer操作确保关闭文件。下面开始写数据:

  for _, d := range data {
    if err = pw.Write(d); err != nil {
      return err
    }
  }
  if err = pw.WriteStop(); err != nil {
    return err
  }
  return nil

循环写数据,最后调用pw.WriteStop()停止写。 成功写文件后,下面介绍如何读取parquet文件。

读取parquet文件

首先介绍如何一次性读取文件,主要用于读取较小的文件:

func readParquet() ([]*user, error) {
  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return nil, err
  }

  pr, err := reader.NewParquetReader(fr, new(user), recordNumber)
  if err != nil {
    return nil, err
  }

  u := make([]*user, recordNumber)
  if err = pr.Read(&u); err != nil {
    return nil, err
  }
  pr.ReadStop()
  fr.Close()
  return u, nil
}

大概流程如下:首先定义本地文件,然后构造pr用于读取parquet文件:

  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return nil, err
  }

  pr, err := reader.NewParquetReader(fr, new(user), recordNumber)
  if err != nil {
    return nil, err
  }

然后定义目标内容容器u,一次性读取数据:

  u := make([]*user, recordNumber)
  if err = pr.Read(&u); err != nil {
    return nil, err
  }
  pr.ReadStop()
  fr.Close()

但一次性大量记录加载至内存可能有问题。这是官方文档提示:

If the parquet file is very big (even the size of parquet file is small, the uncompressed size may be very large), please don’t read all rows at one time, which may induce the OOM. You can read a small portion of the data at a time like a stream-oriented file.

大意是不要一次读取文件至内存,可能造成OOM。实际应用中应该分页读取,下面通过代码进行说明:

func readPartialParquet(pageSize, page int) ([]*user, error) {
	fr, err := local.NewLocalFileReader("output.parquet")
	if err != nil {
		return nil, err
	}
	defer func() {
		_ = fr.Close()
	}()

	pr, err := reader.NewParquetReader(fr, new(user), int64(pageSize))
	if err != nil {
		return nil, err
	}
	defer pr.ReadStop()

	//num := pr.GetNumRows()

	pr.SkipRows(int64(pageSize * page))
	u := make([]*user, pageSize)
	if err = pr.Read(&u); err != nil {
		return nil, err
	}

	return u, nil
}

与上面函数差异不大,首先函数包括两个参数,用于指定页大小和页数,关键代码是跳过一定记录:

  pr.SkipRows(int64(pageSize * page))

根据这个方法可以获得总行数,pr.GetNumRows(),然后结合页大小计算总页数,最后循环可以实现分页查询。

计算列平均值

既然使用了Parquet列存储格式,下面演示下如何计算Score列的平均值。

func calcScoreAVG() (float64, error) {
  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return 0.0, err
  }
  pr, err := reader.NewParquetColumnReader(fr, recordNumber)
  if err != nil {
    return 0.0, err
  }
  num := int(pr.GetNumRows())

  data, _, _, err := pr.ReadColumnByPath("parquet_go_root\u0001score", num)
  if err != nil {
    return 0.0, err
  }
  var result float64
  for _, i := range data {
    result += i.(float64)
  }
  return (result / float64(num)), nil
}

首先打开文件,然后调用pr.GetNumRows()方法获取总行数。然后基于路径指定列,其中parquet_go_root为根路径,因为前面使用字节数组,这里分割符变为\u0001,完整路径为:parquet_go_root\u0001score

总结

到此这篇关于Golang处理parquet文件的文章就介绍到这了,更多相关Golang处理parquet文件内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Golang处理parquet文件实战指南

    目录 前言 创建结构体 生成parquet文件 读取parquet文件 计算列平均值 总结 前言 Parquet是Apache基金会支持的项目,是面向列存储二进制文件格式.支持不同类型的压缩方式,广泛用于数据科学和大数据环境,如Hadoop生态. 本文主要介绍Go如何生成和处理parquet文件. 创建结构体 首先创建struct,用于表示要处理的数据: type user struct { ID string `parquet:"name=id, type=BYTE_ARRAY, encodi

  • python数据分析实战指南之异常值处理

    目录 异常值 1.异常值定义 2.异常值处理方式 2.1 均方差 2.2 箱形图 3.实战 3.1 加载数据 3.2 检测异常值数据 3.3 显示异常值的索引位置 总结 异常值 异常值是指样本中的个别值,其数值明显偏离其余的观测值.异常值也称离群点,异常值的分析也称为离群点的分析. 常用的异常值分析方法为3σ原则.箱型图分析.机器学习算法检测,一般情况下对异常值的处理都是删除和修正填补,即默认为异常值对整个项目的作用不大,只有当我们的目的是要求准确找出离群点,并对离群点进行分析时有必要用到机器学

  • 利用python写api接口实战指南

    目录 一.操作步骤 二.源码举例 总结 一.操作步骤 导入:import flask,json 实例化:api = flask.Flask(name) 定义接口访问路径及访问方式:@api.route(’/index’,methods=[‘get/post/PUT/DELETE’]) 定义函数,注意需与路径的名称一致,设置返回类型并支持中文:def index(): return json.dumps(ren,ensure_ascii=False) 三种格式入参访问接口:5.1 url格式入参:

  • 实现在线 + 离线模式进行迁移 Redis 数据实战指南

    目录 redis-full-check的使用背景 redis-full-check的基本介绍 编译源码 数据对比核心流程 参数信息介绍 对比结果差异分析不一致类型 key不一致 参考资料 redis-full-check的使用背景 在经历了之前的文章内容章节内容,已完成Redis迁移后,可能会存在以下问题需要进行数据迁移之后的对比.例如,如果Redis迁移的过程出现异常,源端与目的端Redis的数据将会不一致. 在Redis迁移完成后进行数据校验可以检查数据的一致性,该如何校验就是我们本文的内容

  • golang简单读写文件示例

    本文实例讲述了golang简单读写文件的方法.分享给大家供大家参考,具体如下: 这里演示golang读写文件的方法: 复制代码 代码如下: package main import (     "fmt"     "os" ) func main() {     f, err := os.OpenFile("file2.txt", os.O_RDWR | os.O_CREATE | os.O_APPEND, 0x644)     if err !=

  • python读取hdfs上的parquet文件方式

    在使用python做大数据和机器学习处理过程中,首先需要读取hdfs数据,对于常用格式数据一般比较容易读取,parquet略微特殊.从hdfs上使用python获取parquet格式数据的方法(当然也可以先把文件拉到本地再读取也可以): 1.安装anaconda环境. 2.安装hdfs3. conda install hdfs3 3.安装fastparquet. conda install fastparquet 4.安装python-snappy. conda install python-s

  • golang实现的文件上传下载小工具

    前言 虽然现在文件上传下载工具多如牛毛,比如http.ftp.sftp.scp等方案都可以用于文件传输,但都是需要安装服务器甚至客户端. 有一种场景是我只需要临时上传或下载一个文件,完了就不用服务器运行了,如果使用那些文件传输工具,不光安装麻烦,开启关闭也恼火额. 因此才想搞小工具,不过Python爱好者可以用python -m http.server 8080 --bind 192.168.1.100开启文件服务器,对我来说还是麻烦. 已经上传到[Github],随意鉴赏. 源码鉴赏 模拟一个

  • golang官方嵌入文件到可执行程序的示例详解

    前言 在go官方出嵌入文件的方法前我在网上找过,并且自己还研究过,虽然没有问题,但是既然官方支持还是用起来吧. 看了下go源码embed/embed.go很简单,并且看embed/internal/embedtest/embed_test.go就知道如何使用. 示例程序 嵌入文件直接访问 原理是使用go:embed标签来完成.下面是直接读取文件内容,需要注意下面几点. 文件不是utf8编码时,输出内容为中文会乱码. 测试过嵌入文件只能为源码文件同级目录和子目录下的文件,试过其他目录的绝对路径或相

  • Golang 实现超大文件读取的两种方法

    Golang超大文件读取的两个方案 流处理方式 分片处理 去年的面试中我被问到超大文件你怎么处理,这个问题确实当时没多想,回来之后仔细研究和讨论了下这个问题,对大文件读取做了一个分析 比如我们有一个log文件,运行了几年,有100G之大.按照我们之前的操作可能代码会这样写: func ReadFile(filePath string) []byte{ content, err := ioutil.ReadFile(filePath) if err != nil { log.Println("Re

  • Golang实现http文件上传小功能的案例

    看代码吧~ package main import ( "fmt" "io" "net/http" "os" ) func main() { http.HandleFunc("/", index) http.HandleFunc("/upload", upload) http.ListenAndServe(":1789", nil) } func upload(w h

随机推荐