JVM上高性能数据格式库包Apache Arrow入门和架构详解(Gkatziouras)

Apache Arrow是是各种大数据工具(包括BigQuery)使用的一种流行格式,它是平面和分层数据的存储格式。它是一种加快应用程序内存密集型。

数据处理和数据科学领域中的常用库: Apache Arrow。诸如Apache Parquet,Apache Spark,pandas之类的开放源代码项目以及许多商业或封闭源代码服务都使用Arrow。它提供以下功能:

  • 内存计算
  • 标准化的柱状存储格式
  • 一个IPC和RPC框架,分别用于进程和节点之间的数据交换

让我们看一看在Arrow出现之前事物是如何工作的:

我们可以看到,为了使Spark从Parquet文件中读取数据,我们需要以Parquet格式读取和反序列化数据。这要求我们通过将数据加载到内存中来制作数据的完整副本。首先,我们将数据读入内存缓冲区,然后使用Parquet的转换方法将数据(例如字符串或数字)转换为我们的编程语言的表示形式。这是必需的,因为Parquet表示的数字与Python编程语言表示的数字不同。

由于许多原因,这对于性能来说是一个很大的问题:

  • 我们正在复制数据并在其上运行转换步骤。数据的格式不同,我们需要对所有数据进行读取和转换,然后再对数据进行任何计算。
  • 我们正在加载的数据必须放入内存中。您只有8GB的RAM,数据是10GB吗?你真倒霉!

现在,让我们看一下Apache Arrow如何改进这一点:

Arrow无需复制和转换数据,而是了解如何直接读取和操作数据。为此,Arrow社区定义了一种新的文件格式以及直接对序列化数据起作用的操作。可以直接从磁盘读取此数据格式,而无需将其加载到内存中并转换/反序列化数据。当然,部分数据仍将被加载到RAM中,但您的数据不必放入内存中。Arrow使用其文件的内存映射功能,仅在必要和可能的情况下将尽可能多的数据加载到内存中。

Apache Arrow支持以下语言:

  • C++
  • C#
  • Go
  • Java
  • JavaScript
  • Rust
  • Python (through the C++ library)
  • Ruby (through the C++ library)
  • R (through the C++ library)
  • MATLAB (through the C++ library).

Arrow特点

Arrow首先是提供用于内存计算的列式数据结构的库,可以将任何数据解压缩并解码为Arrow柱状数据结构,以便随后可以对解码后的数据进行内存内分析。Arrow列格式具有一些不错的属性:随机访问为O(1),每个值单元格在内存中的前一个和后一个相邻,因此进行迭代非常有效。

Apache Arrow定义了一种二进制“序列化”协议,用于安排Arrow列数组的集合(称为“记录批处理”),该数组可用于消息传递和进程间通信。您可以将协议放在任何地方,包括磁盘上,以后可以对其进行内存映射或读入内存并发送到其他地方。

Arrow协议的设计目的是使您可以“映射”一个Arrow数据块而不进行任何反序列化,因此对磁盘上的Arrow协议数据执行分析可以使用内存映射并有效地支付零成本。该协议用于很多事情,例如Spark SQL和Python之间的流数据,用于针对Spark SQL数据块运行pandas函数,这些被称为“ pandas udfs”。

Arrow是为内存而设计的(但是您可以将其放在磁盘上,然后再进行内存映射)。它们旨在相互兼容,并在应用程序中一起使用,而其竞争对手Apache Parquet文件是为磁盘存储而设计的。

优点:Apache Arrow为平面和分层数据定义了一种独立于语言的列式存储格式,该格式组织为在CPU和GPU等现代硬件上进行高效的分析操作而组织。Arrow存储器格式还支持零拷贝读取,以实现闪电般的数据访问,而无需序列化开销。

Java的Apache Arrow

导入库:

<dependency>
    <groupId>org.apache.arrow</groupId>
    <artifactId>arrow-memory-netty</artifactId>
    <version>${arrow.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.arrow</groupId>
    <artifactId>arrow-vector</artifactId>
    <version>${arrow.version}</version>
</dependency>

在开始之前,必须了解对于Arrow的读/写操作,使用了字节缓冲区。诸如读取和写入之类的操作是字节的连续交换。为了提高效率,Arrow附带了一个缓冲区分配器,该缓冲区分配器可以具有一定的大小,也可以具有自动扩展功能。支持分配管理的库是arrow-memory-netty和arrow-memory-unsafe。我们这里使用netty。

用Arrow存储数据需要一个模式,模式可以通过编程定义:

package com.gkatzioura.arrow;

import java.io.IOException;

import java.util.List;

import org.apache.arrow.vector.types.pojo.ArrowType;

import org.apache.arrow.vector.types.pojo.Field;

import org.apache.arrow.vector.types.pojo.FieldType;

import org.apache.arrow.vector.types.pojo.Schema;

public class SchemaFactory {

public static Schema DEFAULT_SCHEMA = createDefault();

public static Schema createDefault() {

var strField = new Field("col1", FieldType.nullable(new ArrowType.Utf8()), null);

var intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null);

return new Schema(List.of(strField, intField));

}

public static Schema schemaWithChildren() {

var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);

var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);

var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));

return new Schema(List.of(itemField));

}

public static Schema fromJson(String jsonString) {

try {

return Schema.fromJSON(jsonString);

} catch (IOException e) {

throw new ArrowExampleException(e);

}

}

}

他们也有一个可解析的json表示形式:

{
  "fields" : [ {
    "name" : "col1",
    "nullable" : true,
    "type" : {
      "name" : "utf8"
    },
    "children" : [ ]
  }, {
    "name" : "col2",
    "nullable" : true,
    "type" : {
      "name" : "int",
      "bitWidth" : 32,
      "isSigned" : true
    },
    "children" : [ ]
  } ]
}

另外,就像Avro一样,您可以在字段上设计复杂的架构和嵌入式值:

public static Schema schemaWithChildren() {
    var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null);
    var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null);
    var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency));

    return new Schema(List.of(itemField));
}

基于上面的的Schema,我们将为我们的类创建一个DTO:

package com.gkatzioura.arrow;

import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class DefaultArrowEntry {

    private String col1;
    private Integer col2;

}

我们的目标是将这些Java对象转换为Arrow字节流。

1. 使用分配器创建 DirectByteBuffer

这些缓冲区是 堆外的 。您确实需要释放所使用的内存,但是对于库用户而言,这是通过在分配器上执行 close() 操作来完成的。在我们的例子中,我们的类将实现 Closeable 接口,该接口将执行分配器关闭操作。

通过使用流api,数据将被流传输到使用Arrow格式提交的OutPutStream:

package com.gkatzioura.arrow;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.List;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.util.Text;

import static com.gkatzioura.arrow.SchemaFactory.DEFAULT_SCHEMA;

public class DefaultEntriesWriter implements Closeable {

    private final RootAllocator rootAllocator;
    private final VectorSchemaRoot vectorSchemaRoot;//向量分配器创建:

    public DefaultEntriesWriter() {
        rootAllocator = new RootAllocator();
        vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
    }

    public void write(List<DefaultArrowEntry> defaultArrowEntries, int batchSize, WritableByteChannel out) {
        if (batchSize <= 0) {
            batchSize = defaultArrowEntries.size();
        }

        DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider();
        try(ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, out)) {
            writer.start();

            VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
            IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
            childVector1.reset();
            childVector2.reset();

            boolean exactBatches = defaultArrowEntries.size()%batchSize == 0;
            int batchCounter = 0;

            for(int i=0; i < defaultArrowEntries.size(); i++) {
                childVector1.setSafe(batchCounter, new Text(defaultArrowEntries.get(i).getCol1()));
                childVector2.setSafe(batchCounter, defaultArrowEntries.get(i).getCol2());

                batchCounter++;

                if(batchCounter == batchSize) {
                    vectorSchemaRoot.setRowCount(batchSize);
                    writer.writeBatch();
                    batchCounter = 0;
                }
            }

            if(!exactBatches) {
                vectorSchemaRoot.setRowCount(batchCounter);
                writer.writeBatch();
            }

            writer.end();
        } catch (IOException e) {
            throw new ArrowExampleException(e);
        }
    }

    @Override
    public void close() throws IOException {
        vectorSchemaRoot.close();
        rootAllocator.close();
    }

}

为了在Arrow上显示批处理的支持,已在函数中实现了简单的批处理算法。对于我们的示例,只需考虑将数据分批写入。

让我们深入了解上面代码功能:

向量分配器创建:

public DefaultEntriesToBytesConverter() {
    rootAllocator = new RootAllocator();
    vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator);
}

然后在写入流时,实现并启动了Arrow流编写器

ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, Channels.newChannel(out));
writer.start();

我们将数据填充向量,然后还重置它们,但让预分配的缓冲区 存在 :

VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0);
IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1);
childVector1.reset();
childVector2.reset();

写入数据时,我们使用 setSafe 操作。如果需要分配更多的缓冲区,应采用这种方式。对于此示例,此操作在每次写入时都完成,但是在考虑了所需的操作和缓冲区大小后可以避免:

childVector1.setSafe(i, new Text(defaultArrowEntries.get(i).getCol1()));
childVector2.setSafe(i, defaultArrowEntries.get(i).getCol2());

然后,将批处理写入流中:

vectorSchemaRoot.setRowCount(batchSize);
writer.writeBatch();

最后但并非最不重要的一点是,我们关闭了writer:

@Override
public void close() throws IOException {
    vectorSchemaRoot.close();
    rootAllocator.close();
}

以上就是JVM上高性能数据格式库包Apache Arrow入门和架构详解(Gkatziouras)的详细内容,更多关于Apache Arrow入门的资料请关注我们其它相关文章!

(0)

相关推荐

  • java中JVM中如何存取数据和相关信息详解

    前言: 我们每天都在编写Java代码,编译,执行.很多人已经知道Java源代码文件(.java后缀)会被Java编译器编译为字节码文件(.class后缀),然后由JVM中的类加载器加载各个类的字节码文件,加载完毕之后,交由JVM执行引擎执行. 那在整个程序执行过程中,JVM中怎么存取数据和相关信息呢? 事实上在JVM中是用一段空间来存储程序执行期间需要用到的数据和相关信息,这段空间一般被称作为Runtime Data Area(运行时数据区),也就是我们常说的JVM内存. 一.运行时数据区域包括

  • Springboot添加jvm监控实现数据可视化

    1.简介 最近越发觉得,任何一个系统上线,运维监控都太重要了.本文介绍Prometheus + Grafana的方法监控Springboot 2.X,实现美观漂亮的数据可视化. 2.添加监控 Spring-boot-actuator module 可帮助您在将应用程序投入生产时监视和管理应用程序.您可以选择使用 HTTP 端点或 JMX 来管理和监控您的应用程序.Auditing, health, and metrics gathering 也可以自动应用于您的应用程序.引入依赖如下: <!--

  • Java内存模型与JVM运行时数据区的区别详解

    首先,这两者是完全不同的概念,绝对不能混为一谈. 1.什么是Java内存模型? Java内存模型是Java语言在多线程并发情况下对于共享变量读写(实际是共享变量对应的内存操作)的规范,主要是为了解决多线程可见性.原子性的问题,解决共享变量的多线程操作冲突问题. 多线程编程的普遍问题是: 所见非所得 无法肉眼检测程序的准确性 不同的运行平台表现不同 错误很难复现 故JVM规范规定了Java虚拟机对多线程内存操作的一些规则,主要集中体现在volatile和synchronized这两个关键字. vo

  • JVM上高性能数据格式库包Apache Arrow入门和架构详解(Gkatziouras)

    Apache Arrow是是各种大数据工具(包括BigQuery)使用的一种流行格式,它是平面和分层数据的存储格式.它是一种加快应用程序内存密集型. 数据处理和数据科学领域中的常用库: Apache Arrow.诸如Apache Parquet,Apache Spark,pandas之类的开放源代码项目以及许多商业或封闭源代码服务都使用Arrow.它提供以下功能: 内存计算 标准化的柱状存储格式 一个IPC和RPC框架,分别用于进程和节点之间的数据交换 让我们看一看在Arrow出现之前事物是如何

  • apache commons工具集代码详解

    Apache Commons包含了很多开源的工具,用于解决平时编程经常会遇到的问题,减少重复劳动.下面是我这几年做开发过程中自己用过的工具类做简单介绍. 组件 功能介绍 BeanUtils 提供了对于JavaBean进行各种操作,克隆对象,属性等等. Betwixt XML与Java对象之间相互转换. Codec 处理常用的编码方法的工具类包 例如DES.SHA1.MD5.Base64等. Collections java集合框架操作. Compress java提供文件打包 压缩类库. Con

  • C++调用libcurl开源库实现邮件的发送功能流程详解

    目录 1.为啥要选择libcurl库去实现邮件的发送 2.调用libcurl库的API接口实现邮件发送 3.构造待发送的邮件内容 4.开通163发送邮件账号的SMTP服务 5.排查接收的邮件内容为空的问题 libcurl中封装了支持这些协议的网络通信模块,支持跨平台,支持Windows,Unix,Linux等多个操作系统.libcurl提供了一套统一样式的API接口,我们不用关注各种协议下网络通信的实现细节,只需要调用这些API就能轻松地实现基于这些协议的数据通信.本文将简单地讲述一下使用lib

  • Apache和Nginx的优缺点详解_动力节点Java学院整理

    Apache和Nginx比较 功能对比 Nginx和Apache一样,都是HTTP服务器软件,在功能实现上都采用模块化结构设计,都支持通用的语言接口,如PHP.Perl.Python等,同时还支持正向和反向代理.虚拟主机.URL重写.压缩传输.SSL加密传输等. 在功能实现上,Apache的所有模块都支持动.静态编译,而Nginx模块都是静态编译的, 对FastCGI的支持,Apache对Fcgi的支持不好,而Nginx对Fcgi的支持非常好: 在处理连接方式上,Nginx支持epoll,而Ap

  • 用python标准库difflib比较两份文件的异同详解

    [需求背景] 有时候我们要对比两份配置文件是不是一样,或者比较两个文本是否异样,可以使用linux命令行工具diff a_file b_file,但是输出的结果读起来不是很友好.这时候使用python的标准库difflib就能满足我们的需求. 下面这个脚本使用了difflib和argparse,argparse用于解析我们给此脚本传入的两个参数(即两份待比较的文件),由difflib执行比较,比较的结果放到了一个html里面,只要找个浏览器打开此html文件,就能直观地看到比较结果,两份文件有差

  • 如何在Linux操作系统下安装Apache服务的方法实例详解

    链接下载: 操作环境 VMware虚拟机中CentOS 7.6 SecureCRT Xftp(Xmanager) 需求分析 使用Apache服务实现访问http 操作步骤 1.挂载光盘 [root@localhost ~]# mount /dev/cdrom /mnt 查看是否挂载 [root@localhost ~]# df -Th 2.从源码包编译安装程序 (编译安装) [root@localhost Packages]# yum -y install gcc gcc-c++ make 3.

  • Android性能优化之RecyclerView分页加载组件功能详解

    目录 引言 1 分页加载组件 1.1 功能定制 1.2 手写分页列表 1.3 生命周期管理 2 github 引言 在Android应用中,列表有着举足轻重的地位,几乎所有的应用都有列表的身影,但是对于列表的交互体验一直是一个大问题.在性能比较好的设备上,列表滑动几乎看不出任何卡顿,但是放在低端机上,卡顿会比较明显,而且列表中经常会伴随图片的加载,卡顿会更加严重,因此本章从手写分页加载组件入手,并对列表卡顿做出对应的优化 1 分页加载组件 为什么要分页加载,通常列表数据存储在服务端会超过100条

  • Windows和夜神模拟器上抓包程序mitmproxy的安装使用详解

    目录 windows 一.介绍说明 二.安装以及配置 三.mitmdump的使用 夜神模拟器 一.安装过程 1.准备 2.安装命令,在.mitmproxy目录下运行cmd 3.重命名+推送至手机 4.安装mitm证书到手机adb shell操作: 5.查看是否成功: ** windows ** 一.介绍说明 mitmproxy是一个支持HTTP和HTTPS的抓包程序,有类似Fiddler.Charles的功能,只不过它是一个控制台的形式操作. mitmproxy还有两个关联组件.一个是mitmd

  • Apache Doris Join 优化原理详解

    目录 背景 & 目标 Doris 数据划分 Partition Bucket Join 方式 总览 Broadcast / Shuffle Join Bucket Shuffle Join Plan Rule Colocate Join Runtime Filter 优化 Join Reorder 优化 Join 调优建议 背景 & 目标 掌握 Apache Doris Join 优化手段及其实现原理 为代码阅读提供理论基础 Doris 数据划分 不同的 Join 方式非常依赖于对 Dor

  • jQuery Json数据格式排版高亮插件json-viewer.js使用方法详解

    jQuery Json数据格式排版高亮插件json-viewer.js使用方法详解 1.插件介绍: jquery.json-viewer.js是一款查看json格式数据的jquery插件.它可以将混乱的json数据漂亮的方式展示在页面中,并支持节点的伸展和收缩和语法高亮等功能. 2.代码演示: 1).首先引入jquery和json.viewer.js插件 <script src="http://www.jq22.com/jquery/jquery-1.10.2.js">&l

随机推荐