Python测试Kafka集群(pykafka)实例

生产者代码:

# -* coding:utf8 *-
from pykafka import KafkaClient 

host = 'IP:9092, IP:9092, IP:9092'
client = KafkaClient(hosts = host) 

print client.topics 

# 生产者
topicdocu = client.topics['my-topic']
producer = topicdocu.get_producer()
for i in range(100):
  print i
  producer.produce('test message ' + str(i ** 2))
producer.stop()

消费者代码:

# -* coding:utf8 *-
from pykafka import KafkaClient 

host = 'IP:9092, IP:9092, IP:9092'
client = KafkaClient(hosts = host) 

print client.topics 

# 消费者
topic = client.topics['my-topic']
consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, auto_commit_interval_ms=1,
                   consumer_id='test')
for message in consumer:
  if message is not None:
    print message.offset, message.value 

以上这篇Python测试Kafka集群(pykafka)实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • python kafka 多线程消费者&手动提交实例

    官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html import threading import os import sys from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata from consumers.db_util import * from consumers.json_dispose import *

  • python hbase读取数据发送kafka的方法

    本例子实现从hbase获取数据,并发送kafka. 使用 #!/usr/bin/env python #coding=utf-8 import sys import time import json sys.path.append('/usr/local/lib/python3.5/site-packages') from thrift import Thrift from thrift.transport import TSocket from thrift.transport import

  • kafka-python 获取topic lag值方式

    说真,这个问题看上去很简单,但"得益"与kafka-python神奇的文档,真的不算简单,反正我是搜了半天还看了半天源码. 直接上代码吧 from kafka import SimpleClient, KafkaConsumer from kafka.common import OffsetRequestPayload, TopicPartition def get_topic_offset(brokers, topic): """ 获取一个topic的o

  • python每5分钟从kafka中提取数据的例子

    我就废话不多说了,直接上代码吧! import sys sys.path.append("..") from datetime import datetime from utils.kafka2file import KafkaDownloader import os """ 实现取kafka数据,文件按照取数据的间隔命名 如每5分钟从kafka取数据写入文件中,文件名为当前时间加5 """ TOPIC = "rtz

  • python读取Kafka实例

    1. 新建.py文件 # pip install kafka-python from kafka import KafkaConsumer import setting conf = setting.luyang_kafka_setting consumer = KafkaConsumer(bootstrap_servers=conf['host'], group_id=conf['groupid']) print('consumer start to consuming...') consum

  • python操作kafka实践的示例代码

    1.先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码. #!/usr/bin/env python # -*- coding: utf-8 -*- import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='xxxx:x') msg_dict = { "sleep_time": 10, "db_config": { "

  • python3连接kafka模块pykafka生产者简单封装代码

    1.1安装模块 pip install pykafka 1.2基本使用 # -* coding:utf8 *- from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_producer() for i in ran

  • Python测试Kafka集群(pykafka)实例

    生产者代码: # -* coding:utf8 *- from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) print client.topics # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_producer() for i in range(100):

  • Docker搭建Zookeeper&Kafka集群的实现

    最近在学习Kafka,准备测试集群状态的时候感觉无论是开三台虚拟机或者在一台虚拟机开辟三个不同的端口号都太麻烦了(嗯..主要是懒). 环境准备 一台可以上网且有CentOS7虚拟机的电脑 为什么使用虚拟机?因为使用的笔记本,所以每次连接网络IP都会改变,还要总是修改配置文件的,过于繁琐,不方便测试.(通过Docker虚拟网络的方式可以避免此问题,当时实验的时候没有了解到) Docker 安装 如果已经安装Docker请忽略此步骤 Docker支持以下的CentOS版本: CentOS 7 (64

  • Docker容器搭建Kafka集群的详细过程

    目录 一.Kafka集群的搭建 1.拉取相关镜像 2.运行zookeeper 3.运行kafka 4.设置topic 5.进行生产者和消费者测试 一.Kafka集群的搭建 1.拉取相关镜像 docker pull wurstmeister/kafka docker pull zookeeper 2.运行zookeeper docker run -d --name zookeeper -p 2181:2181 -t zookeeper 3.运行kafka Kafka0: docker run -d

  • docker搭建kafka集群的方法实现

    目录 一.原生Docker命令 二.镜像选择 三.集群规划 四.Zookeeper集群安装 五.Kafka集群安装 一.原生Docker命令 1. 删除所有dangling数据卷(即无用的Volume,僵尸文件) docker volume rm $(docker volume ls -qf dangling=true) 2. 删除所有dangling镜像(即无tag的镜像) docker rmi $(docker images | grep "^<none>" | awk

  • python测试mysql写入性能完整实例

    本文主要研究的是python测试mysql写入性能,分享了一则完整代码,具体介绍如下. 测试环境: (1) 阿里云服务器centos 6.5 (2) 2G内存 (3) 普通硬盘 (4) mysql 5.1.73 数据库存储引擎为 InnoDB (5) python 2.7 (6) 客户端模块 mysql.connector 测试方法: (1) 普通写入 (2) 批量写入 (3) 事务加批量写入 普通写入: def ordinary_insert(count): sql = "insert int

  • python连接mongodb集群方法详解

    简单的测试用例 #!/usr/bin/python # -*- coding: UTF-8 -*- import time from pymongo import MongoClient # 连接单机 # single mongo # c = MongoClient(host="192.168.89.151", port=27017) # 连接集群 c = MongoClient('mongodb://192.168.89.151,192.168.89.152,192.168.89.1

  • linux contos6.8下部署kafka集群的方法

    有3台服务器,ip分别为192.168.174.10,192.168.174.11,192.168.174.12. 1.官网下载,分别在每台机器上解压安装 # 创建kafka的安装目录 mkdir -p /usr/local/software/kafka # 解压 tar -xvf kafka_2.12-1.1.0.tgz -C /usr/local/software/kafka/ 2.修改每台服务器的/etc/profile文件,设置kafka环境变量,添加如下内容 export KAFKA_

  • php成功操作redis cluster集群的实例教程

    前言 java操作redis cluster集群可使用jredis php要操作redis cluster集群有两种方式: 1.使用phpredis扩展,这是个c扩展,性能更高,但是phpredis2.x扩展不行,需升级phpredis到3.0,但这个方案参考资料很少 2.使用predis,纯php开发,使用了命名空间,需要php5.3+,灵活性高 我用的是predis,下载地址:点击这里 步骤如下: 下载好后重命名为predis, server1:192.168.1.198 server2:1

  • Springcloud Eureka配置及集群代码实例

    springcloud微服务包含的技术种类众多,eureka作为其注册中心,一直处于主流,但在今年已经处于永久停更状态,但其优秀的能力还是值得学习. 整体价格采用聚合工程,后续也存在于聚合工程内. 1.首先配置pom工程的依赖 <dependencies> <!-- eureka-server --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId&

  • Python脚本实现集群检测和管理功能

    场景是这样的:一个生产机房,会有很多的测试机器和生产机器(也就是30台左右吧),由于管理较为混乱导致了哪台机器有人用.哪台机器没人用都不清楚,从而产生了一个想法--利用一台机器来管理所有的机器,记录设备责任人.设备使用状态等等信息....那么,为什么选择python,python足够简单并且拥有丰富的第三方库的支持. 最初的想法 由于刚参加工作不久,对这些东西也都没有接触过,轮岗到某个部门需要做出点东西来(项目是什么还没情况,就要做出东西来,没办法硬着头皮想点子吧)... 本想做一个简单点的自动

随机推荐