Python如何把Spark数据写入ElasticSearch

这里以将Apache的日志写入到ElasticSearch为例,来演示一下如何使用Python将Spark数据导入到ES中。

实际工作中,由于数据与使用框架或技术的复杂性,数据的写入变得比较复杂,在这里我们简单演示一下。

如果使用Scala或Java的话,Spark提供自带了支持写入ES的支持库,但Python不支持。所以首先你需要去这里下载依赖的ES官方开发的依赖包包。

下载完成后,放在本地目录,以下面命令方式启动pyspark:

pyspark --jars elasticsearch-hadoop-6.4.1.jar

如果你想pyspark使用Python3,请设置环境变量:

export PYSPARK_PYTHON=/usr/bin/python3
理解如何写入ES的关键是要明白,ES是一个JSON格式的数据库,它有一个必须的要求。数据格式必须采用以下格式

{ "id: { the rest of your json}}

往下会展示如何转换成这种格式。

解析Apache日志文件
我们将Apache的日志文件读入,构建Spark RDD。然后我们写一个parse()函数用正则表达式处理每条日志,提取我们需要的字

rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'

p=re.compile(regex)
def parse(str):
  s=p.match(str)
  d = {}
  d['ip']=s.group(1)
  d['date']=s.group(4)
  d['operation']=s.group(5)
  d['uri']=s.group(6)
  return d

换句话说,我们刚开始从日志文件读入RDD的数据类似如下:

['83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"']

然后我们使用map函数转换每条记录:

rdd2 = rdd.map(parse)

rdd2.take(1)

[{'date': '17/May/2015:10:05:03 +0000', 'ip': '83.149.9.216', 'operation': 'GET', 'uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}]

现在看起来像JSON,但并不是JSON字符串,我们需要使用json.dumps将dict对象转换。

我们同时增加一个doc_id字段作为整个JSON的ID。在配置ES中我们增加如下配置“es.mapping.id”: “doc_id”告诉ES我们将这个字段作为ID。

这里我们使用SHA算法,将这个JSON字符串作为参数,得到一个唯一ID。
计算结果类似如下,可以看到ID是一个很长的SHA数值。

rdd3.take(1)

[('a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c', '{"date": "17/May/2015:10:05:03 +0000", "ip": "83.149.9.216", "operation": "GET", "doc_id": "a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c", "uri": "/presentations/logstash-monitorama-2013/images/kibana-search.png"}')]

现在我们需要制定ES配置,比较重要的两项是:

  • “es.resource” : ‘walker/apache': "walker"是索引,apache是类型,两者一般合称索引
  • “es.mapping.id”: “doc_id”: 告诉ES那个字段作为整个文档的ID,也就是查询结果中的_id

其他的配置自己去探索。

然后我们使用saveAsNewAPIHadoopFile()将RDD写入到ES。这部分代码对于所有的ES都是一样的,比较固定,不需要理解每一个细节

es_write_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : 'walker/apache',
    "es.input.json": "yes",
    "es.mapping.id": "doc_id"
  }

rdd3.saveAsNewAPIHadoopFile(
    path='-',
   outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_write_conf)

rdd3 = rdd2.map(addID)

def addId(data):
  j=json.dumps(data).encode('ascii', 'ignore')
  data['doc_id'] = hashlib.sha224(j).hexdigest()
  return (data['doc_id'], json.dumps(data))

最后我们可以使用curl进行查询

curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=*
{
    "_index" : "walker",
    "_type" : "apache",
    "_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
    "_score" : 1.0,
    "_source" : {
     "date" : "17/May/2015:10:05:32 +0000",
     "ip" : "91.177.205.119",
     "operation" : "GET",
     "doc_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
     "uri" : "/favicon.ico"
    }

如下是所有代码:

import json
import hashlib
import re

def addId(data):
  j=json.dumps(data).encode('ascii', 'ignore')
  data['doc_id'] = hashlib.sha224(j).hexdigest()
  return (data['doc_id'], json.dumps(data))

def parse(str):
  s=p.match(str)
  d = {}
  d['ip']=s.group(1)
  d['date']=s.group(4)
  d['operation']=s.group(5)
  d['uri']=s.group(6)
  return d  

regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'

p=re.compile(regex)

rdd = sc.textFile("/home/ubuntu/walker/apache_logs")

rdd2 = rdd.map(parse)

rdd3 = rdd2.map(addID)

es_write_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : 'walker/apache',
    "es.input.json": "yes",
    "es.mapping.id": "doc_id"
  }

rdd3.saveAsNewAPIHadoopFile(
    path='-',
   outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_write_conf)

也可以这么封装,其实原理是一样的

import hashlib
import json
from pyspark import Sparkcontext

def make_md5(line):
  md5_obj=hashlib.md5()
  md5_obj.encode(line)
  return md5_obj.hexdigest()

def parse(line):
  dic={}
  l = line.split('\t')
  doc_id=make_md5(line)
  dic['name']=l[1]
  dic['age'] =l[2]
  dic['doc_id']=doc_id
  return dic  #记得这边返回的是字典类型的,在写入es之前要记得dumps

def saveData2es(pdd, es_host, port,index, index_type, key):
  """
  把saprk的运行结果写入es
  :param pdd: 一个rdd类型的数据
  :param es_host: 要写es的ip
  :param index: 要写入数据的索引
  :param index_type: 索引的类型
  :param key: 指定文档的id,就是要以文档的那个字段作为_id
  :return:
  """
  #实例es客户端记得单例模式
  if es.exist.index(index):
    es.index.create(index, 'spo')
  es_write_conf = {
    "es.nodes": es_host,
    "es.port": port,
    "es.resource": index/index_type,
    "es.input.json": "yes",
    "es.mapping.id": key
  }

  (pdd.map(lambda _dic: ('', json.dumps(_dic))))  #这百年是为把这个数据构造成元组格式,如果传进来的_dic是字典则需要jdumps,如果传进来之前就已经dumps,这便就不需要dumps了
  .saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_write_conf)
  )
if __name__ == '__main__':
  #实例化sp对象
  sc=Sparkcontext()
  #文件中的呢内容一行一行用sc的读取出来
  json_text=sc.textFile('./1.txt')
  #进行转换
  json_data=json_text.map(lambda line:parse(line))

  saveData2es(json_data,'127.0.01','9200','index_test','index_type','doc_id')

  sc.stop()

看到了把,面那个例子在写入es之前加了一个id,返回一个元组格式的,现在这个封装指定_id就会比较灵活了

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • python3实现elasticsearch批量更新数据

    废话不多说,直接上代码! updateBody = { "query":{ "range":{ "write_date": { "gte": "2019-01-15 12:30:17", "lte": "now" } } }, "script": { "inline": "ctx._source.index = par

  • 使用Python操作Elasticsearch数据索引的教程

    Elasticsearch是一个分布式.Restful的搜索及分析服务器,Apache Solr一样,它也是基于Lucence的索引服务器,但我认为Elasticsearch对比Solr的优点在于: 轻量级:安装启动方便,下载文件之后一条命令就可以启动: Schema free:可以向服务器提交任意结构的JSON对象,Solr中使用schema.xml指定了索引结构: 多索引文件支持:使用不同的index参数就能创建另一个索引文件,Solr中需要另行配置: 分布式:Solr Cloud的配置比较

  • Python 操作 ElasticSearch的完整代码

    官方文档:https://elasticsearch-py.readthedocs.io/en/master/ 1.介绍 python提供了操作ElasticSearch 接口,因此要用python来操作ElasticSearch,首先要安装python的ElasticSearch包,用命令pip install elasticsearch安装或下载安装:https://pypi.python.org/pypi/elasticsearch/5.4.0 2.创建索引 假如创建索引名称为ott,类型

  • python中的Elasticsearch操作汇总

    这篇文章主要介绍了python中的Elasticsearch操作汇总,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 导入包 from elasticsearch import Elasticsearch 本地连接 es = Elasticsearch(['127.0.0.1:9200']) 创建索引 es.indices.create(index="python_es01",ignore=400) ingore=400 ingore是

  • python批量导入数据进Elasticsearch的实例

    ES在之前的博客已有介绍,提供很多接口,本文介绍如何使用python批量导入.ES官网上有较多说明文档,仔细研究并结合搜索引擎应该不难使用. 先给代码 #coding=utf-8 from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch import helpers es = Elasticsearch() actions = [] f=open('index.txt') i=

  • Python中用Spark模块的使用教程

    在日常的编程中,我经常需要标识存在于文本文档中的部件和结构,这些文档包括:日志文件.配置文件.定界的数据以及格式更自由的(但还是半结构化的)报表格式.所有这些文档都拥有它们自己的"小语言",用于规定什么能够出现在文档内.我编写这些非正式解析任务的程序的方法总是有点象大杂烩,其中包括定制状态机.正则表达式以及上下文驱动的字符串测试.这些程序中的模式大概总是这样:"读一些文本,弄清是否可以用它来做些什么,然后可能再多读一些文本,一直尝试下去." 解析器将文档中部件和结构

  • Python-ElasticSearch搜索查询的讲解

    Elasticsearch 是一个开源的搜索引擎,建立在一个全文搜索引擎库 Apache Lucene™ 基础之上. Lucene 可能是目前存在的,不论开源还是私有的,拥有最先进,高性能和全功能搜索引擎功能的库.但是 Lucene 仅仅只是一个库.为了利用它,你需要编写 Java 程序,并在你的 java 程序里面直接集成 Lucene 包. 更坏的情况是,你需要对信息检索有一定程度的理解才能明白 Lucene 是怎么工作的.Lucene 是 很 复杂的. 在上一篇文章中介绍了ElasticS

  • Python如何把Spark数据写入ElasticSearch

    这里以将Apache的日志写入到ElasticSearch为例,来演示一下如何使用Python将Spark数据导入到ES中. 实际工作中,由于数据与使用框架或技术的复杂性,数据的写入变得比较复杂,在这里我们简单演示一下. 如果使用Scala或Java的话,Spark提供自带了支持写入ES的支持库,但Python不支持.所以首先你需要去这里下载依赖的ES官方开发的依赖包包. 下载完成后,放在本地目录,以下面命令方式启动pyspark: pyspark --jars elasticsearch-ha

  • python查询MySQL将数据写入Excel

    一.概述 现有一个用户表,需要将表数据写入到excel中. 环境说明 mysql版本:5.7 端口:3306 数据库:test 表名:users 表结构如下: CREATE TABLE `users` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `username` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '用户名', `password` varchar(255) CHARACTER SET u

  • Python使用Excel将数据写入多个sheet

    将一个列表数据写入output.xlsx的a,b,c--等sheet中 import pandas as pd df1 = pd.DataFrame({'a':[3,1],'b':[4,3]}) df2 = df1.copy() with pd.ExcelWriter('F:\\python入门\\数据2\\output.xlsx') as writer: str1 = ['a','b','c','d','e','f','g','h','i',\ 'j','k','l','m','n','o',

  • 使用python生成大量数据写入es数据库并查询操作(2)

    目录 方案一 方案二 1.顺序插入5000000条数据 2.批量插入5000000条数据 3.批量插入50000000条数据 前言 : 上一篇文章:如何使用python生成大量数据写入es数据库并查询操作 模拟学生个人信息写入es数据库,包括姓名.性别.年龄.特点.科目.成绩,创建时间. 方案一 在写入数据时未提前创建索引mapping,而是每插入一条数据都包含了索引的信息. 示例代码:[多线程写入数据][一次性写入10000*1000条数据]  [本人亲测耗时3266秒] from elast

  • 如何使用python生成大量数据写入es数据库并查询操作

    前言: 模拟学生成绩信息写入es数据库,包括姓名.性别.科目.成绩. 示例代码1:[一次性写入10000*1000条数据]  [本人亲测耗时5100秒] from elasticsearch import Elasticsearch from elasticsearch import helpers import random import time es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['刘

  • Python实现读写sqlite3数据库并将统计数据写入Excel的方法示例

    本文实例讲述了Python实现读写sqlite3数据库并将统计数据写入Excel的方法.分享给大家供大家参考,具体如下: src = 'F:\\log\\mha-041log\\rnd-log-dl.huawei.com\\test' # dst = sys.argv[2] dst = 'F:\\log\\mha-041log\\rnd-log-dl.huawei.com\\test\\mha-041log.db' # dst_anylyzed = sys.argv[3] dst_anylyze

  • python实现数据写入excel表格

    本文实例为大家分享了python数据写入excel表格的具体代码,供大家参考,具体内容如下 安装: xlsxwriter第三方库 code: #!/usr/bin/env/python #_*_coding:utf-8_*_ #Data:2017-08-13 #Auther:苏莫 #Link:http://blog.csdn.net/lingluofengzang #PythonVersion:python2.7 #filename:xlsx.py import sys # import os

  • Python实现将数据框数据写入mongodb及mysql数据库的方法

    本文实例讲述了Python实现将数据框数据写入mongodb及mysql数据库的方法.分享给大家供大家参考,具体如下: 主要内容: 1.数据框数据写入mongdb方法 2.数据框数据写入mysql方法 为了以后不重复造轮子,这里总结下,如何把数据框数据写入mysql和mongodb的方法记录下来,省得翻来翻去.下面记录的都是精华. 写入mongodb代码片段(使用pymongo库): ##########################写入mongodb 数据库#################

  • Python实现将数据写入netCDF4中的方法示例

    本文实例讲述了Python实现将数据写入netCDF4中的方法.分享给大家供大家参考,具体如下: nc文件为处理气象数据文件.用户可以去https://www.lfd.uci.edu/~gohlke/pythonlibs/ 搜索netCDF4,下载相应平台的whl文件,使用pip安装即可. 这里演示的写入数据操作代码如下: # -*- coding:utf-8 -*- import numpy as np ''' 输入的data的shape=(627,652) ''' def write_to_

随机推荐