python操作kafka实践的示例代码

1、先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='xxxx:x')

msg_dict = {
  "sleep_time": 10,
  "db_config": {
    "database": "test_1",
    "host": "xxxx",
    "user": "root",
    "password": "root"
  },
  "table": "msg",
  "msg": "Hello World"
}
msg = json.dumps(msg_dict)
producer.send('test_rhj', msg, partition=0)
producer.close()

下面是消费者的简单代码:

from kafka import KafkaConsumer

consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

下面是结果:

2、如果想要完成负载均衡,就需要知道kafka的分区机制,同一个主题,可以为其分区,在生产者不指定分区的情况,kafka会将多个消息分发到不同的分区,消费者订阅时候如果不指定服务组,会收到所有分区的消息,如果指定了服务组,则同一服务组的消费者会消费不同的分区,如果2个分区两个消费者的消费者组消费,则,每个消费者消费一个分区,如果有三个消费者的服务组,则会出现一个消费者消费不到数据;如果想要消费同一分区,则需要用不同的服务组。以此为原理,我们对消费者做如下修改:

from kafka import KafkaConsumer

consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

然后我们开两个消费者进行消费,生产者分别往0分区和1分区发消息结果如下,可以看到,一个消费者只能消费0分区,另一个只能消费1分区:

3、kafka提供了偏移量的概念,允许消费者根据偏移量消费之前遗漏的内容,这基于kafka名义上的全量存储,可以保留大量的历史数据,历史保存时间是可配置的,一般是7天,如果偏移量定位到了已删除的位置那也会有问题,但是这种情况可能很小;每个保存的数据文件都是以偏移量命名的,当前要查的偏移量减去文件名就是数据在该文件的相对位置。要指定偏移量消费数据,需要指定该消费者要消费的分区,否则代码会找不到分区而无法消费,代码如下:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.assign([TopicPartition(topic='test_rhj', partition=0), TopicPartition(topic='test_rhj', partition=1)])
print consumer.partitions_for_topic("test_rhj") # 获取test主题的分区信息
print consumer.assignment()
print consumer.beginning_offsets(consumer.assignment())
consumer.seek(TopicPartition(topic='test_rhj', partition=0), 0)
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

因为指定的便宜量为0,所以从一开始插入的数据都可以查到,而且因为指定了分区,指定的分区结果都可以消费,结果如下:

4、有时候,我们并不需要实时获取数据,因为这样可能会造成性能瓶颈,我们只需要定时去获取队列里的数据然后批量处理就可以,这种情况,我们可以选择主动拉取数据

from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.subscribe(topics=('test_rhj',))
index = 0
while True:
  msg = consumer.poll(timeout_ms=5) # 从kafka获取消息
  print msg
  time.sleep(2)
  index += 1
  print '--------poll index is %s----------' % index

结果如下,可以看到,每次拉取到的都是前面生产的数据,可能是多条的列表,也可能没有数据,如果没有数据,则拉取到的为空:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • python hbase读取数据发送kafka的方法

    本例子实现从hbase获取数据,并发送kafka. 使用 #!/usr/bin/env python #coding=utf-8 import sys import time import json sys.path.append('/usr/local/lib/python3.5/site-packages') from thrift import Thrift from thrift.transport import TSocket from thrift.transport import

  • kafka-python批量发送数据的实例

    如下所示: from kafka import KafkaClient from kafka.producer import SimpleProducer def send_data_2_kafka(datas): ''' 向kafka解析队列发送数据 ''' client = KafkaClient(hosts=KAFKABROKER.split(","), timeout=30) producer = SimpleProducer(client, async=False) curc

  • python消费kafka数据批量插入到es的方法

    1.es的批量插入 这是为了方便后期配置的更改,把配置信息放在logging.conf中 用elasticsearch来实现批量操作,先安装依赖包,sudo pip install Elasticsearch2 from elasticsearch import Elasticsearch class ImportEsData: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg&

  • 在python环境下运用kafka对数据进行实时传输的方法

    背景: 为了满足各个平台间数据的传输,以及能确保历史性和实时性.先选用kafka作为不同平台数据传输的中转站,来满足我们对跨平台数据发送与接收的需要. kafka简介: Kafka is a distributed,partitioned,replicated commit logservice.它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现.kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外ka

  • 对python操作kafka写入json数据的简单demo分享

    如下所示: 安装kafka支持库pip install kafka-python from kafka import KafkaProducer import json ''' 生产者demo 向test_lyl2主题中循环写入10条json数据 注意事项:要写入json数据需加上value_serializer参数,如下代码 ''' producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8'

  • python操作kafka实践的示例代码

    1.先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码. #!/usr/bin/env python # -*- coding: utf-8 -*- import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='xxxx:x') msg_dict = { "sleep_time": 10, "db_config": { "

  • Python操作MySQL数据库的示例代码

    1. MySQL Connector 1.1 创建连接 import mysql.connector config={ "host":"localhost","port":"3306", "user":"root","password":"password", "database":"demo" } con=

  • python操作toml文件的示例代码

    # -*- coding: utf-8 -*- # @Time : 2019-11-18 09:31 # @Author : cxa # @File : toml_demo.py # @Software: PyCharm import toml import os BASE_DIR = os.path.dirname(os.path.abspath(__file__)) class FileOperation: def __init__(self): self.dic = dict() self

  • python 使用paramiko模块进行封装,远程操作linux主机的示例代码

    import time import paramiko class HandleParamiko: ''' 定义一个linux处理类 ''' def __init__(self, hostname, password, port=22, username='root'): ''' 构造器 :param hostname: 主机ip,type:str :param password: 密码,type:str :param port: 端口,type:int 默认22 :param username

  • python+appium实现自动化测试的示例代码

    目录 1.什么是Appium 2.启动一个app自动化程序的步骤 3.appium服务介绍 4. appium客户端使用 5.adb的使用 6.Appium启动过程分析 1.什么是Appium appium是一个开源的测试自动化框架,可以与原生的.混合的和移动的web应用程序一直使用.它使用WebDriver协议驱动IOS(内置的测试引擎xcuitest).Android(uiautomator2,Espresso)和Windows应用程序 原生应用程序:安卓程序是用JAVA或kotlin开发出

  • Android进程间通信实践的示例代码

    本文介绍了Android进程间通信实践的示例代码,分享给大家,具体如下: 因为线程间的内存是共享的,所以它们之间的通信简单,比如可以通过共享变量等方式实现.而进程间想要通信就要麻烦许多了.要想实现进程间通信,我们需要在不同进程之间定义一套它们可以共同理解的接口描述语言,也即 IDL.比较常用的 IDL 有 JSON.Protocol Buffers 等.而 Android 不同进程之间的通信也有个特别的语言,叫 AIDL(Android Interface Definition Language

  • Python中字符串与编码示例代码

    在最新的Python 3版本中,字符串是以Unicode编码的,即Python的字符串支持多语言 编码和解码 字符串在内存中以Unicode表示,在操作字符串时,经常需要str和bytes互相转换   如果在网络上传输或保存到磁盘上,则从内存读到的数据就是str,要把str变为以字节为单位的bytes,称为编码   如果从网络或磁盘上读取字节流,则从网络或磁盘上读到的数据就是bytes,要把bytes变为str,称为解码   为避免乱码问题,应当始终坚持使用UTF-8编码对str和bytes进行

  • python+selenium+chromedriver实现爬虫示例代码

    下载好所需程序 1.Selenium简介 Selenium是一个用于Web应用程序测试的工具,直接运行在浏览器中,就像真正的用户在操作一样. 2.Selenium安装 方法一:在Windows命令行(cmd)输入pip install selenium即可自动安装,安装完成后,输入pip show selenium可查看当前的版本 方法二:直接下载selenium包: selenium下载网址 Pychome安装selenium如果出现无法安装,参考以下博客 解决Pycharm无法使用已经安装S

  • 拿来就用!Python批量合并PDF的示例代码

    大家好,今天分享一个实用的办公脚本:将多个PDF合并为一个PDF,例如我手上现在有如下3个PDF分册,需要整合成一个完整的PDF 如果换成你操作的话,是不是打开百度搜索:PDF合并,然后去第三方网站操作,可能会收费不说还担心文件泄漏,现在有请Python出场,简单快速,光速合并,拿走就用! 首先导入需要的库和路径设置 import os from PyPDF2 import PdfFileReader, PdfFileWriter if __name__ == '__main__': # 设置存

随机推荐