Python测试Kafka集群(pykafka)实例
生产者代码:
# -* coding:utf8 *- from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) print client.topics # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_producer() for i in range(100): print i producer.produce('test message ' + str(i ** 2)) producer.stop()
消费者代码:
# -* coding:utf8 *- from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) print client.topics # 消费者 topic = client.topics['my-topic'] consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, auto_commit_interval_ms=1, consumer_id='test') for message in consumer: if message is not None: print message.offset, message.value
以上这篇Python测试Kafka集群(pykafka)实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。
相关推荐
-
python hbase读取数据发送kafka的方法
本例子实现从hbase获取数据,并发送kafka. 使用 #!/usr/bin/env python #coding=utf-8 import sys import time import json sys.path.append('/usr/local/lib/python3.5/site-packages') from thrift import Thrift from thrift.transport import TSocket from thrift.transport import
-
python每5分钟从kafka中提取数据的例子
我就废话不多说了,直接上代码吧! import sys sys.path.append("..") from datetime import datetime from utils.kafka2file import KafkaDownloader import os """ 实现取kafka数据,文件按照取数据的间隔命名 如每5分钟从kafka取数据写入文件中,文件名为当前时间加5 """ TOPIC = "rtz
-
python操作kafka实践的示例代码
1.先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码. #!/usr/bin/env python # -*- coding: utf-8 -*- import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='xxxx:x') msg_dict = { "sleep_time": 10, "db_config": { "
-
python读取Kafka实例
1. 新建.py文件 # pip install kafka-python from kafka import KafkaConsumer import setting conf = setting.luyang_kafka_setting consumer = KafkaConsumer(bootstrap_servers=conf['host'], group_id=conf['groupid']) print('consumer start to consuming...') consum
-
kafka-python 获取topic lag值方式
说真,这个问题看上去很简单,但"得益"与kafka-python神奇的文档,真的不算简单,反正我是搜了半天还看了半天源码. 直接上代码吧 from kafka import SimpleClient, KafkaConsumer from kafka.common import OffsetRequestPayload, TopicPartition def get_topic_offset(brokers, topic): """ 获取一个topic的o
-
python3连接kafka模块pykafka生产者简单封装代码
1.1安装模块 pip install pykafka 1.2基本使用 # -* coding:utf8 *- from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_producer() for i in ran
-
python kafka 多线程消费者&手动提交实例
官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html import threading import os import sys from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata from consumers.db_util import * from consumers.json_dispose import *
-
Python测试Kafka集群(pykafka)实例
生产者代码: # -* coding:utf8 *- from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) print client.topics # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_producer() for i in range(100):
-
Docker搭建Zookeeper&Kafka集群的实现
最近在学习Kafka,准备测试集群状态的时候感觉无论是开三台虚拟机或者在一台虚拟机开辟三个不同的端口号都太麻烦了(嗯..主要是懒). 环境准备 一台可以上网且有CentOS7虚拟机的电脑 为什么使用虚拟机?因为使用的笔记本,所以每次连接网络IP都会改变,还要总是修改配置文件的,过于繁琐,不方便测试.(通过Docker虚拟网络的方式可以避免此问题,当时实验的时候没有了解到) Docker 安装 如果已经安装Docker请忽略此步骤 Docker支持以下的CentOS版本: CentOS 7 (64
-
Docker容器搭建Kafka集群的详细过程
目录 一.Kafka集群的搭建 1.拉取相关镜像 2.运行zookeeper 3.运行kafka 4.设置topic 5.进行生产者和消费者测试 一.Kafka集群的搭建 1.拉取相关镜像 docker pull wurstmeister/kafka docker pull zookeeper 2.运行zookeeper docker run -d --name zookeeper -p 2181:2181 -t zookeeper 3.运行kafka Kafka0: docker run -d
-
docker搭建kafka集群的方法实现
目录 一.原生Docker命令 二.镜像选择 三.集群规划 四.Zookeeper集群安装 五.Kafka集群安装 一.原生Docker命令 1. 删除所有dangling数据卷(即无用的Volume,僵尸文件) docker volume rm $(docker volume ls -qf dangling=true) 2. 删除所有dangling镜像(即无tag的镜像) docker rmi $(docker images | grep "^<none>" | awk
-
python测试mysql写入性能完整实例
本文主要研究的是python测试mysql写入性能,分享了一则完整代码,具体介绍如下. 测试环境: (1) 阿里云服务器centos 6.5 (2) 2G内存 (3) 普通硬盘 (4) mysql 5.1.73 数据库存储引擎为 InnoDB (5) python 2.7 (6) 客户端模块 mysql.connector 测试方法: (1) 普通写入 (2) 批量写入 (3) 事务加批量写入 普通写入: def ordinary_insert(count): sql = "insert int
-
python连接mongodb集群方法详解
简单的测试用例 #!/usr/bin/python # -*- coding: UTF-8 -*- import time from pymongo import MongoClient # 连接单机 # single mongo # c = MongoClient(host="192.168.89.151", port=27017) # 连接集群 c = MongoClient('mongodb://192.168.89.151,192.168.89.152,192.168.89.1
-
linux contos6.8下部署kafka集群的方法
有3台服务器,ip分别为192.168.174.10,192.168.174.11,192.168.174.12. 1.官网下载,分别在每台机器上解压安装 # 创建kafka的安装目录 mkdir -p /usr/local/software/kafka # 解压 tar -xvf kafka_2.12-1.1.0.tgz -C /usr/local/software/kafka/ 2.修改每台服务器的/etc/profile文件,设置kafka环境变量,添加如下内容 export KAFKA_
-
php成功操作redis cluster集群的实例教程
前言 java操作redis cluster集群可使用jredis php要操作redis cluster集群有两种方式: 1.使用phpredis扩展,这是个c扩展,性能更高,但是phpredis2.x扩展不行,需升级phpredis到3.0,但这个方案参考资料很少 2.使用predis,纯php开发,使用了命名空间,需要php5.3+,灵活性高 我用的是predis,下载地址:点击这里 步骤如下: 下载好后重命名为predis, server1:192.168.1.198 server2:1
-
Springcloud Eureka配置及集群代码实例
springcloud微服务包含的技术种类众多,eureka作为其注册中心,一直处于主流,但在今年已经处于永久停更状态,但其优秀的能力还是值得学习. 整体价格采用聚合工程,后续也存在于聚合工程内. 1.首先配置pom工程的依赖 <dependencies> <!-- eureka-server --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId&
-
Python脚本实现集群检测和管理功能
场景是这样的:一个生产机房,会有很多的测试机器和生产机器(也就是30台左右吧),由于管理较为混乱导致了哪台机器有人用.哪台机器没人用都不清楚,从而产生了一个想法--利用一台机器来管理所有的机器,记录设备责任人.设备使用状态等等信息....那么,为什么选择python,python足够简单并且拥有丰富的第三方库的支持. 最初的想法 由于刚参加工作不久,对这些东西也都没有接触过,轮岗到某个部门需要做出点东西来(项目是什么还没情况,就要做出东西来,没办法硬着头皮想点子吧)... 本想做一个简单点的自动
随机推荐
- JS数组的赋值介绍
- 详解Python的Django框架中的模版相关知识
- Python学习笔记之os模块使用总结
- 在Linux下调试Python代码的各种方法
- Chrome下ifame父窗口调用子窗口的问题示例探讨
- ASP.NET GridView的Bootstrap分页样式
- Ruby 魔法 学习笔记之一
- SQLSERVER如何查看索引缺失及DMV使用介绍
- PHP 设置MySQL连接字符集的方法
- 使用get方式提交表单在地址栏里面不显示提交信息
- NodeJS学习笔记之Connect中间件模块(二)
- java md5工具类分享
- spring boot 日志配置详解
- 在启动栏制作android studio启动图标
- JS拖拽排序插件Sortable.js用法实例分析
- Java实现的KNN算法示例
- Linux 添加开机启动方法(服务/脚本)
- Java数据库操作库DButils类的使用方法与实例详解
- jquery传参及获取方式(两种方式)
- Python基于requests实现模拟上传文件