python消费kafka数据批量插入到es的方法

1、es的批量插入

这是为了方便后期配置的更改,把配置信息放在logging.conf中

用elasticsearch来实现批量操作,先安装依赖包,sudo pip install Elasticsearch2

from elasticsearch import Elasticsearch
class ImportEsData:

  logging.config.fileConfig("logging.conf")
  logger = logging.getLogger("msg")

  def __init__(self,hosts,index,type):
    self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
    self.index = index
    self.type = type

  def set_date(self,data):
    # 批量处理
    # es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()})
    self.es.index(index=self.index,doc_type=self.index,body=data)

2、使用pykafka消费kafka

1.因为kafka是0.8,pykafka不支持zk,只能用get_simple_consumer来实现

2.为了实现多个应用同时消费而且不重消费,所以一个应用消费一个partition

3. 为是确保消费数据量在不满足10000这个批量值,能在一个时间范围内插入到es中,这里设置consumer_timeout_ms一个超时等待时间,退出等待消费阻塞。

4.退出等待消费阻塞后导致无法再消费数据,因此在获取self.consumer 的外层加入了while True 一个死循环

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pykafka import KafkaClient
import logging
import logging.config
from ConfigUtil import ConfigUtil
import datetime

class KafkaPython:
  logging.config.fileConfig("logging.conf")
  logger = logging.getLogger("msg")
  logger_data = logging.getLogger("data")

  def __init__(self):
    self.server = ConfigUtil().get("kafka","kafka_server")
    self.topic = ConfigUtil().get("kafka","topic")
    self.group = ConfigUtil().get("kafka","group")
    self.partition_id = int(ConfigUtil().get("kafka","partition"))
    self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms"))
    self.consumer = None
    self.hosts = ConfigUtil().get("es","hosts")
    self.index_name = ConfigUtil().get("es","index_name")
    self.type_name = ConfigUtil().get("es","type_name")

  def getConnect(self):
    client = KafkaClient(self.server)
    topic = client.topics[self.topic]
    p = topic.partitions
    ps={p.get(self.partition_id)}

    self.consumer = topic.get_simple_consumer(
      consumer_group=self.group,
      auto_commit_enable=True,
      consumer_timeout_ms=self.consumer_timeout_ms,
      # num_consumer_fetchers=1,
      # consumer_id='test1',
      partitions=ps
      )
    self.starttime = datetime.datetime.now()

  def beginConsumer(self):
    print("beginConsumer kafka-python")
    imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name)
    #创建ACTIONS
    count = 0
    ACTIONS = [] 

    while True:
      endtime = datetime.datetime.now()
      print (endtime - self.starttime).seconds
      for message in self.consumer:
        if message is not None:
          try:
            count = count + 1
            # print(str(message.partition.id)+","+str(message.offset)+","+str(count))
            # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))
            action = {
              "_index": self.index_name,
              "_type": self.type_name,
              "_source": message.value
            }
            ACTIONS.append(action)
            if len(ACTIONS) >= 10000:
              imprtEsData.set_date(ACTIONS)
              ACTIONS = []
              self.consumer.commit_offsets()
              endtime = datetime.datetime.now()
              print (endtime - self.starttime).seconds
              #break
          except (Exception) as e:
            # self.consumer.commit_offsets()
            print(e)
            self.logger.error(e)
            self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n")
            # self.logger_data.error(message.value+"\n")
          # self.consumer.commit_offsets()

      if len(ACTIONS) > 0:
        self.logger.info("等待时间超过,consumer_timeout_ms,把集合数据插入es")
        imprtEsData.set_date(ACTIONS)
        ACTIONS = []
        self.consumer.commit_offsets()

  def disConnect(self):
    self.consumer.close()

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class ImportEsData:

  logging.config.fileConfig("logging.conf")
  logger = logging.getLogger("msg")

  def __init__(self,hosts,index,type):
    self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
    self.index = index
    self.type = type

  def set_date(self,data):
    # 批量处理
    success = bulk(self.es, data, index=self.index, raise_on_error=True)
    self.logger.info(success)

3、运行

if __name__ == '__main__':
  kp = KafkaPython()
  kp.getConnect()
  kp.beginConsumer()
  # kp.disConnect()

注:简单的写了一个从kafka中读取数据到一个list里,当数据达到一个阈值时,在批量插入到 es的插件

现在还在批量的压测中。。。

以上这篇python消费kafka数据批量插入到es的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 在python中使用requests 模拟浏览器发送请求数据的方法

    如下所示: import requests url='http://####' proxy={'http':'http://####:80'} headers={ "Accept":"text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Accept-Lang

  • python批量从es取数据的方法(文档数超过10000)

    如下所示: """ 提取文档数超过10000的数据 按照某个字段的值具有唯一性进行升序, 按照@timestamp进行降序, 第一次查询,先将10000条数据取出, 取出最后一个时间戳, 在第二次查询中,设定@timestamp小于将第一次得到的最后一个时间戳, 同时设定某个字段的值具有唯一性进行升序, 按照@timestamp进行降序, """ from elasticsearch import Elasticsearch import os

  • Python使用requests提交HTTP表单的方法

    Python的requests库, 其口号是HTTP for humans,堪称最好用的HTTP库. 使用requests库,可以使用数行代码实现自动化的http操作.以http post,即浏览器提交一个表格数据到web服务器,为例,来说明requests的使用. 无cookie import requests url = 'www.test.org' data = {'username': 'user', 'password': '123456'} response = requests.p

  • 详解Python requests 超时和重试的方法

    网络请求不可避免会遇上请求超时的情况,在 requests 中,如果不设置你的程序可能会永远失去响应. 超时又可分为连接超时和读取超时. 连接超时 连接超时指的是在你的客户端实现到远端机器端口的连接时(对应的是 connect() ),Request 等待的秒数. import time import requests url = 'http://www.google.com.hk' print(time.strftime('%Y-%m-%d %H:%M:%S')) try: html = re

  • python 实现分页显示从es中获取的数据方法

    注意:使用该方法,获取的数据总数目不能超过1万,否则出错 #在python3上运行 from elasticsearch import Elasticsearch from urllib3.connectionpool import xrange def get_page_data(result): for hit in result['hits']['hits']: print(hit) if __name__=='__main__': es_host = "0.0.0.0" por

  • 对python requests发送json格式数据的实例详解

    requests是常用的请求库,不管是写爬虫脚本,还是测试接口返回数据等.都是很简单常用的工具. 这里就记录一下如何用requests发送json格式的数据,因为一般我们post参数,都是直接post,没管post的数据的类型,它默认有一个类型的,貌似是 application/x-www-form-urlencoded. 但是,我们写程序的时候,最常用的接口post数据的格式是json格式.当我们需要post json格式数据的时候,怎么办呢,只需要添加修改两处小地方即可. 详见如下代码: i

  • Python 脚本获取ES 存储容量的实例

    最近有需求统计ES存储容量,之前用PHP实现的,考虑到以后可能会经常写脚本查询,故用python写了一个脚本,代码如下: import urllib import urllib2 import sys es_service_addr = sys.argv[1] url = "http://" + es_service_addr + "/_cat/indices?v"; req = urllib2.Request(url) res_data = urllib2.url

  • python消费kafka数据批量插入到es的方法

    1.es的批量插入 这是为了方便后期配置的更改,把配置信息放在logging.conf中 用elasticsearch来实现批量操作,先安装依赖包,sudo pip install Elasticsearch2 from elasticsearch import Elasticsearch class ImportEsData: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg&

  • python 消费 kafka 数据教程

    1.安装python模块 pip install --user kafka-python==1.4.3 如果报错压缩相关的错尝试安装下面的依赖 yum install snappy-devel yum install lz4-devel pip install python-snappy pip install lz4 2.生产者 #!/usr/bin/env python # coding : utf-8 from kafka import KafkaProducer import json

  • c# 用Dictionary实现日志数据批量插入

    背景 最近再做一个需求,就是对站点的一些事件进行埋点,说白了就是记录用户的访问行为.那么这些数据怎么保存呢,人家点一下保存一下?显然不合适,肯定是需要批量保存,提高效率. 问题窥探 首先,我想到的是Dictionary,对于C#中的Dictionary类相信大家都不陌生,这是一个Collection(集合)类型,可以通过Key/Value(键值对的形式来存放数据:该类最大的优点就是它查找元素的时间复杂度接近O(1),实际项目中常被用来做一些数据的本地缓存,提升整体效率.Dictionary是非线

  • 巧用Dictionary实现日志数据批量插入

    背景 最近再做一个需求,就是对站点的一些事件进行埋点,说白了就是记录用户的访问行为.那么这些数据怎么保存呢,人家点一下保存一下?显然不合适,肯定是需要批量保存,提高效率. 问题窥探 首先,我想到的是Dictionary,对于C#中的Dictionary类相信大家都不陌生,这是一个Collection(集合)类型,可以通过Key/Value(键值对的形式来存放数据:该类最大的优点就是它查找元素的时间复杂度接近O(1),实际项目中常被用来做一些数据的本地缓存,提升整体效率.Dictionary是非线

  • c#几种数据库的大数据批量插入(SqlServer、Oracle、SQLite和MySql)

    在之前只知道SqlServer支持数据批量插入,殊不知道Oracle.SQLite和MySql也是支持的,不过Oracle需要使用Orace.DataAccess驱动,今天就贴出几种数据库的批量插入解决方法. 首先说一下,IProvider里有一个用于实现批量插入的插件服务接口IBatcherProvider,此接口在前一篇文章中已经提到过了. /// <summary> /// 提供数据批量处理的方法. /// </summary> public interface IBatch

  • Python随机生成数据后插入到PostgreSQL

    用Python随机生成学生姓名,三科成绩和班级数据,再插入到PostgreSQL中. 模块用psycopg2 random import random import psycopg2 fname=['金','赵','李','陈','许','龙','王','高','张','侯','艾','钱','孙','周','郑'] mname=['玉','明','玲','淑','偑','艳','大','小','风','雨','雪','天','水','奇','鲸','米','晓','泽','恩','葛','玄'

  • c#实现几种数据库的大数据批量插入

    在之前只知道SqlServer支持数据批量插入,殊不知道Oracle.SQLite和MySQL也是支持的,不过Oracle需要使用Orace.DataAccess驱动,今天就贴出几种数据库的批量插入解决方法. 首先说一下,IProvider里有一个用于实现批量插入的插件服务接口IBatcherProvider,此接口在前一篇文章中已经提到过了. /// <summary> /// 提供数据批量处理的方法. /// </summary> public interface IBatch

  • python数据批量写入ScrolledText的优化方法

    如下所示: for i in data[::-1]: self.maintenance_text.insert(tk.END, str(i['payload']) + '\n\n') self.maintenance_text.see(tk.END) 改为: str_data = '\n\n'.join([str(i) for i in data[::-1]]) self.maintenance_text.insert(tk.END, str_data) self.maintenance_tex

  • .NET多种数据库大数据批量插入、更新(支持SqlServer、MySql、PgSql和Oracle)

    功能介绍 (需要版本5.0.45) 大数据操作ORM性能瓶颈在实体转换上面,并且不能使用常规的Sql去实现 当列越多转换越慢,SqlSugar将转换性能做到极致,并且采用数据库最佳API 操作数据库达到极限性能 功能用法 BulkCopy性能远强于现有市场的 ORM框架,比 EFCore Efcore.Bulkextension快30% BulkUpdate吊打现有所有框架是EFCoreEfcore.Bulkextension 2-3倍之快 //插入 100万 10秒不到 db.Fastest<

  • .NET多种数据库大数据批量插入、更新(支持SqlServer、MySql、PgSql和Oracle)

    目录 功能介绍(需要版本5.0.45) 功能用法 数据库支持 性能测试 使用疑问 老版本 源码下载: 功能介绍 (需要版本5.0.45) 大数据操作ORM性能瓶颈在实体转换上面,并且不能使用常规的Sql去实现 当列越多转换越慢,SqlSugar将转换性能做到极致,并且采用数据库最佳API 操作数据库达到极限性能 功能用法 BulkCopy性能远强于现有市场的 ORM框架,比 EFCore Efcore.Bulkextension快30% BulkUpdate吊打现有所有框架是EFCoreEfco

随机推荐