Java API如何实现向Hive批量导入数据

Java API实现向Hive批量导入数据

Java程序中产生的数据,如果导入oracle或者mysql库,可以通过jdbc连接insert批量操作完成,但是当前版本的hive并不支持批量insert操作,因为需要先将结果数据写入hdfs文件,然后插入Hive表中。

package com.enn.idcard;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
 * <p>Description: </p>
 * @author kangkaia
 * @date 2017年12月26日 下午1:42:24
 */
public class HiveJdbc {
    public static void main(String[] args) throws IOException {
    	List<List> argList = new ArrayList<List>();
		List<String> arg = new ArrayList<String>();
		arg.add("12345");
		arg.add("m");
		argList.add(arg);
		arg = new ArrayList<String>();
		arg.add("54321");
		arg.add("f");
		argList.add(arg);
//		System.out.println(argList.toString());
		String dst = "/test/kk.txt";
		createFile(dst,argList);
		loadData2Hive(dst);
    }

    /**
     * 将数据插入hdfs中,用于load到hive表中,默认分隔符是"\001"
     * @param dst
     * @param contents
     * @throws IOException
     */
    public static void createFile(String dst , List<List> argList) throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path dstPath = new Path(dst); //目标路径
        //打开一个输出流
        FSDataOutputStream outputStream = fs.create(dstPath);
        StringBuffer sb = new StringBuffer();
        for(List<String> arg:argList){
			for(String value:arg){
				sb.append(value).append("\001");
			}
			sb.deleteCharAt(sb.length() - 4);//去掉最后一个分隔符
			sb.append("\n");
		}
        sb.deleteCharAt(sb.length() - 2);//去掉最后一个换行符
        byte[] contents =  sb.toString().getBytes();
        outputStream.write(contents);
        outputStream.close();
        fs.close();
        System.out.println("文件创建成功!");
    }
    /**
     * 将HDFS文件load到hive表中
     * @param dst
     */
    public static void loadData2Hive(String dst) {
    	String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
    	String CONNECTION_URL = "jdbc:hive2://server-13:10000/default;auth=noSasl";
    	String username = "admin";
        String password = "admin";
        Connection con = null;

		try {
			Class.forName(JDBC_DRIVER);
			con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);
			Statement stmt = con.createStatement();
			String sql = " load data inpath '"+dst+"' into table population.population_information ";

			stmt.execute(sql);
			System.out.println("loadData到Hive表成功!");
		} catch (SQLException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}finally {
			// 关闭rs、ps和con
			if(con != null){
				try {
					con.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

注意:

本例使用mvn搭建,conf配置文件放在src/main/resources目录下。

Hive提供的默认文件存储格式有textfile、sequencefile、rcfile等。用户也可以通过实现接口来自定义输入输的文件格式。

在实际应用中,textfile由于无压缩,磁盘及解析的开销都很大,一般很少使用。Sequencefile以键值对的形式存储的二进制的格式,其支持针对记录级别和块级别的压缩。rcfile是一种行列结合的存储方式(text file和sequencefile都是行表[row table]),其保证同一条记录在同一个hdfs块中,块以列式存储。一般而言,对于OLTP而言,行表优势大于列表,对于OLAP而言,列表的优势大于行表,特别容易想到当做聚合操作时,列表的复杂度将会比行表小的多,虽然单独rcfile的列运算不一定总是存在的,但是rcfile的高压缩率确实减少文件大小,因此实际应用中,rcfile总是成为不二的选择,达观数据平台在选择文件存储格式时也大量选择了rcfile方案。

通过hdfs导入hive的表默认是textfile格式的,因此可以改变存储格式,具体方法是先创建sequencefile、rcfile等格式的空表,然后重新插入数据即可。

insert overwrite table seqfile_table select * from textfile_table;
……
insert overwrite table rcfile_table select * from textfile_table;

java 批量插入hive中转在HDFS

稍微修改了下,这文章是通过将数据存盘后,加载到HIVE.

模拟数据放到HDFS然后加载到HIVE,请大家记得添加HIVE JDBC依赖否则会报错。

加载前的数据表最好用外部表,否则会drop表的时候元数据会一起删除!

  <dependency>
   <groupId>org.apache.hive</groupId>
   <artifactId>hive-jdbc</artifactId>
   <version>1.1.0</version>
  </dependency>

代码

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class Demo {
	    public static void main(String[] args) throws Exception {
	    	List<List> argList = new ArrayList<List>();
			List<String> arg = new ArrayList<String>();
			arg.add("12345");
			arg.add("m");
			argList.add(arg);
			arg = new ArrayList<String>();
			arg.add("54321");
			arg.add("f");
			argList.add(arg);
//			System.out.println(argList.toString());
			String dst = "/test/kk.txt";
			createFile(dst,argList);
//			loadData2Hive(dst);
	    }
	    /**
	     * 将数据插入hdfs中,用于load到hive表中,默认分隔符是"|"
	     * @param dst
	     * @param contents
	     * @throws IOException
	     * @throws Exception
	     * @throws InterruptedException
	     */
	    public static void createFile(String dst , List<List> argList) throws IOException, InterruptedException, Exception{
	        Configuration conf = new Configuration();
			FileSystem fs = FileSystem.get(new URI("hdfs://hadoop:9000"),conf,"root");
	        Path dstPath = new Path(dst); //目标路径
	        //打开一个输出流
	        FSDataOutputStream outputStream = fs.create(dstPath);
	        StringBuffer sb = new StringBuffer();
	        for(List<String> arg:argList){
				for(String value:arg){
					sb.append(value).append("|");
				}
				sb.deleteCharAt(sb.length() - 1);//去掉最后一个分隔符
				sb.append("\n");
			}
	        byte[] contents =  sb.toString().getBytes();
	        outputStream.write(contents);
			outputStream.flush();;
	        outputStream.close();
	        fs.close();
	        System.out.println("文件创建成功!");

	    }
	    /**
	     * 将HDFS文件load到hive表中
	     * @param dst
	     */
	    public static void loadData2Hive(String dst) {
	    	String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
	    	String CONNECTION_URL = "jdbc:hive2://hadoop:10000/default";
	    	String username = "root";
	        String password = "root";
	        Connection con = null;

			try {
				Class.forName(JDBC_DRIVER);
				con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);
				Statement stmt = con.createStatement();

				String sql = " load data inpath '"+dst+"' into table test ";//test 为插入的表

				stmt.execute(sql);
				System.out.println("loadData到Hive表成功!");
			} catch (SQLException e) {
				e.printStackTrace();
			} catch (ClassNotFoundException e) {
				e.printStackTrace();
			}finally {
				// 关闭rs、ps和con
				if(con != null){
					try {
						con.close();
					} catch (SQLException e) {
						e.printStackTrace();
					}
				}
			}
		}

	}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • SpringBoot连接Hive实现自助取数的示例

    原文链接: http://www.ikeguang.com/?p=815 公司运营免不了让我们数据做一些临时取数,这些取数有时候是重复的,或者可以做成可配置的.需要开发成界面,供他们选择,自然想到SpringBoot连接Hive,可以把取数做成一键生成,或者让他们自己写sql,通常大多人是不会sql的. 1. 需要的依赖配置 为了节省篇幅,这里给出hiveserver2方式连接hive主要的maven依赖,父工程springboot依赖省略. <!-- 版本信息 --> <propert

  • Java实现后台发送及接收json数据的方法示例

    本文实例讲述了Java实现后台发送及接收json数据的方法.分享给大家供大家参考,具体如下: 本篇博客试用于编写java后台接口以及两个项目之间的接口对接功能: 具体的内容如下: 1.java后台给指定接口发送json数据 package com.utils; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.Htt

  • 基于Java8 Stream API实现数据抽取收集

    目标&背景 我们以"处理订单数据"为例,假设我们的应用是一个分布式应用,有"订单应用","物流应用","商品应用"等都是独立的服务.本次我们的目的需要展示订单列表完整数据: 1.查询订单列表. 2.批量查询物流信息. 3.将物流信息填充到订单主信息中. 假设我们定义了一个订单类,具有几个关键的属性:订单号,状态,订单价,快递信息.如下所示: class Order{ String orderSeq; String st

  • java中栈和队列的实现和API的用法(详解)

    在java中要实现栈和队列,需要用到java集合的相关知识,特别是Stack.LinkedList等相关集合类型. 一.栈的实现 栈的实现,有两个方法:一个是用java本身的集合类型Stack类型:另一个是借用LinkedList来间接实现Stack. 1.Stack实现 直接用Stack来实现非常方便,常用的api函数如下: boolean        isEmpty() // 判断当前栈是否为空 synchronized E        peek() //获得当前栈顶元素 synchro

  • hive从mysql导入数据量变多的解决方案

    原始导数命令: bin/sqoop import -connect jdbc:mysql://192.168.169.128:3306/yubei -username root -password 123456 -table yl_city_mgr_evt_info --split-by rec_id -m 4 --fields-terminated-by "\t" --lines-terminated-by "\n" --hive-import --hive-ov

  • Java API如何实现向Hive批量导入数据

    Java API实现向Hive批量导入数据 Java程序中产生的数据,如果导入oracle或者mysql库,可以通过jdbc连接insert批量操作完成,但是当前版本的hive并不支持批量insert操作,因为需要先将结果数据写入hdfs文件,然后插入Hive表中. package com.enn.idcard; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; impor

  • python Django批量导入数据

    前言: 这期间有研究了Django网页制作过程中,如何将数据批量导入到数据库中. 这个过程真的是惨不忍睹,犯了很多的低级错误,这会在正文中说到的.再者导入数据用的是py脚本,脚本内容参考至自强学堂--中级教程--数据导入.  注:本文主要介绍自己学习的经验总结,而非教程! 正文:首先说明采用Django中bulk_create()函数来实现数据批量导入功能,为什么会选择它呢? 1 bulk_create()是执行一条SQL存入多条数据,使得导入速度更快; 2 bulk_create()减少了SQ

  • asp.net线程批量导入数据时通过ajax获取执行状态

    前言 最近因为工作中遇到一个需求,需要做了一个批量导入功能,但长时间运行没个反馈状态,很容易让人看了心急,产生各种臆想!为了解决心里障碍,写了这么个功能. 通过线程执行导入,并把正在执行的状态存入session,既共享执行状态,通过ajax调用session里的执行状态,从而实现反馈导入状态的功能! 上代码: 前端页面 <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF

  • Django框架利用ajax实现批量导入数据功能

    本文实例为大家分享了网页中利用ajax实现批量导入数据功能的实现方法,供大家参考,具体内容如下 url.py代码: 复制代码 代码如下: url(r'^workimport/$', 'keywork.views.import_keywork', name='import_keywork') view.py代码: from keywork.models import DevData from django.http import JsonResponse #django ajax部分 def im

  • php实现refresh刷新页面批量导入数据的方法

    本文实例讲述了php实现refresh刷新页面批量导入数据的方法.分享给大家供大家参考.具体分析如下: 这个功能是参考dedecms生成html页面的原理,只是dedecms使用的是js跳转而我使用的是refresh进行跳转,效果是一样的,下面我们一起来看一个php实现批量导入数据的方法. 因为我有1000W数据一次导入数据库肯定是不可行了,所以我就导致了每一次导入50条或更多数据,然后下次再刷新一次这样就可以解决这个问题了,代码如下: 复制代码 代码如下: <?php set_time_lim

  • layui 上传文件_批量导入数据UI的方法

    使用layui的文件上传组件,可以方便的弹出文件上传界面. 效果如下: 点击[批量导入]按钮调用js脚本importData(config)就可以实现数据上传到服务器. 脚本: /*** * 批量导入 * config.downUrl 下载模板url * config.uploadUrl 上传文件url * config.msg * config.done 上传结束后执行. */ function importData(config){ var default_config = { msg:"数

  • python批量导入数据进Elasticsearch的实例

    ES在之前的博客已有介绍,提供很多接口,本文介绍如何使用python批量导入.ES官网上有较多说明文档,仔细研究并结合搜索引擎应该不难使用. 先给代码 #coding=utf-8 from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch import helpers es = Elasticsearch() actions = [] f=open('index.txt') i=

  • Java实现Excel批量导入数据

    Excel的批量导入是很常见的功能,这里采用 Jxl实现,数据量或样式要求较高可以采用 poi 框架环境:Spring + SpringMvc(注解实现) 1.首先导入依赖jar包 <dependency> <groupId>net.sourceforge.jexcelapi</groupId> <artifactId>jxl</artifactId> <version>2.6.10</version> </depe

  • 基于Spring Batch向Elasticsearch批量导入数据示例

    1.介绍 当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率.Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据.由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入. 2.示例 2.1 pom.xml 本文使用spr

  • SQL Server数据库中批量导入数据的四种方法总结

    在软件项目实施的时候,数据导入一直是项目人员比较头疼的问题.其实,在SQL Server中集成了很多成批导入数据的方法.有些项目实施顾问头疼的问题,在我们数据库管理员眼中,是小菜一碟.现在的重点就是,如何让用户了解这些方法,让数据导入变得轻松一些. 第一:使用Select Into语句 若企业数据库都采用的是SQL Server数据库的话,则可以利用Select Into语句来实现数据的导入.Select Into语句,他的作用就是把数据从另外一个数据库中查询出来,然后加入到某个用户指定的表中.

随机推荐