python3连接kafka模块pykafka生产者简单封装代码

1.1安装模块

pip install pykafka

1.2基本使用

# -* coding:utf8 *-
from pykafka import KafkaClient
host = 'IP:9092, IP:9092, IP:9092'
client = KafkaClient(hosts = host)
# 生产者
topicdocu = client.topics['my-topic']
producer = topicdocu.get_producer()
for i in range(100):
 print i
 producer.produce('test message ' + str(i ** 2))
producer.stop()

1.3简单封装

class KafkaProduct():

 def __init__(self,hosts,topic):
  """
  初始化实例
  :param hosts: 连接地址
  :param topic:
  """
  self.__client = KafkaClient(hosts=hosts)
  self.__topic = self.__client.topics[topic.encode()]

 def __set_topic(self, topic):
  self.__topic = self.__client.topics[topic.encode()]

 def set_topic(self, topic):
  """
  设置topic
  :param topic:
  :return:
  """
  self.__set_topic(topic)

 def get_topics(self):
  """
  获取当前所有topic
  :return:
  """
  return self.__client.topics

 def get_topic(self):
  """
  获取当前topic
  :return:
  """
  return self.__topic

 def Producer(self):
  """
  生产者对象
  :return:
  """
  with self.__topic.get_producer(delivery_reports=True) as producer:
   next_data = ''
   while True:
    if next_data:
     producer.produce(str(next_data).encode())
    next_data = yield True

 def send_data(self,datas):
  """
  发送数据
  :param datas:需要传入的可迭代对象
  :return:
  """
  c = self.Producer()
  next(c)
  for i in datas:
   c.send(i)

if __name__ == '__main__':

hosts = "1.2.3.4:9999,2.3.4.5:9090" #连接hosts
topic = "test_523"
K = KafkaProduct(hosts=hosts, topic=topic) #
#K.set_topic("test") #切换设置新的topic
K.get_topic() #获取当前设置的topic
#K.get_topics() #获取所有topic
data = range(10000) #要发送的可迭代对象
K.send_data(data)

以上这篇python3连接kafka模块pykafka生产者简单封装代码就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • python hbase读取数据发送kafka的方法

    本例子实现从hbase获取数据,并发送kafka. 使用 #!/usr/bin/env python #coding=utf-8 import sys import time import json sys.path.append('/usr/local/lib/python3.5/site-packages') from thrift import Thrift from thrift.transport import TSocket from thrift.transport import

  • kafka-python 获取topic lag值方式

    说真,这个问题看上去很简单,但"得益"与kafka-python神奇的文档,真的不算简单,反正我是搜了半天还看了半天源码. 直接上代码吧 from kafka import SimpleClient, KafkaConsumer from kafka.common import OffsetRequestPayload, TopicPartition def get_topic_offset(brokers, topic): """ 获取一个topic的o

  • Python测试Kafka集群(pykafka)实例

    生产者代码: # -* coding:utf8 *- from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) print client.topics # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_producer() for i in range(100):

  • python kafka 多线程消费者&手动提交实例

    官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html import threading import os import sys from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata from consumers.db_util import * from consumers.json_dispose import *

  • python每5分钟从kafka中提取数据的例子

    我就废话不多说了,直接上代码吧! import sys sys.path.append("..") from datetime import datetime from utils.kafka2file import KafkaDownloader import os """ 实现取kafka数据,文件按照取数据的间隔命名 如每5分钟从kafka取数据写入文件中,文件名为当前时间加5 """ TOPIC = "rtz

  • python操作kafka实践的示例代码

    1.先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码. #!/usr/bin/env python # -*- coding: utf-8 -*- import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='xxxx:x') msg_dict = { "sleep_time": 10, "db_config": { "

  • python读取Kafka实例

    1. 新建.py文件 # pip install kafka-python from kafka import KafkaConsumer import setting conf = setting.luyang_kafka_setting consumer = KafkaConsumer(bootstrap_servers=conf['host'], group_id=conf['groupid']) print('consumer start to consuming...') consum

  • python3连接kafka模块pykafka生产者简单封装代码

    1.1安装模块 pip install pykafka 1.2基本使用 # -* coding:utf8 *- from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_producer() for i in ran

  • pymysql的简单封装代码实例

    这篇文章主要介绍了pymysql的简单封装代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 #coding=utf-8 #!/usr/bin/python import pymysql class MYSQL: """ 对pymysql的简单封装 """ def __init__(self,host,user,pwd,db): self.host = host self.user = us

  • python网页请求urllib2模块简单封装代码

    对python网页请求模块urllib2进行简单的封装. 例子: 复制代码 代码如下: #!/usr/bin/python#coding: utf-8import base64import urllibimport urllib2import time class SendRequest:  '''  This class use to set and request the http, and get the info of response.  e.g. set Authorization

  • Python3连接MySQL(pymysql)模拟转账实现代码

    本文实例为大家分享了Python3连接MySQL模拟转账的具体实现代码,供大家参考,具体内容如下 # coding:utf8 import sys import pymysql class TransferMoney(object): def __init__(self,conn): self.conn=conn def check_acct_available(self,acctid): cursor = self.conn.cursor() try: sql="select * from a

  • nodejs连接mysql数据库简单封装示例-mysql模块

    本人最近在学习研究nodejs,下面我来记录一下,有需要了解nodejs连接mysql数据库简单封装的朋友可参考.希望此文章对各位有所帮助. 安装mysql模块 npm install mysql 测试是否连接成功 mysql.js代码: var mysql = require('mysql'); var connection = mysql.createConnection({ host : 'localhost', user : 'root', password : '123456', da

  • nodejs基于mssql模块连接sqlserver数据库的简单封装操作示例

    本文实例讲述了nodejs基于mssql模块连接sqlserver数据库的简单封装操作.分享给大家供大家参考,具体如下: 注意:开启sqlserver服务器允许远程连接的步骤,自行百度,很多经验,nodejs连接sqlserver,最好把防火墙的入站规则关闭,或者允许入站.这个封装只是sql语句查询,看懂这个,其他的可以自行扩展,支持连接池. 一 安装mssql模块 npm install mssql 二 封装连接数据库代码 db.js: /*2016年7月14日17:02:15 QQ: 452

  • django 数据库连接模块解析及简单长连接改造方法

    工作中纯服务端的项目用到了线程池和django的ORM部分.django 的数据库连接在每一个线程中开启一份,并在查询完毕后自动关闭连接. 线程池处理任务时,正常使用的连接中不会被关闭,但由于数据库端有最长连接时间的限制(默认为8小时),在超时后会发生InterfaceError: (0, '')(连接关闭后使用连接/游标)或Error(2006, 'MySQL server has gone away')(mysql 服务器主动关闭连接)这类错误,所以一般会在每个任务线程中调用django.d

  • Java语言实现简单FTP软件 FTP连接管理模块实现(8)

    本文为大家介绍了FTP连接管理模块的实现方法,供大家参考,具体内容如下 (1)FTP连接 运行FTP客户端后,首先是连接FTP服务器,需要输入FTP服务器的IP地址及用户名.密码以及端口号后点击连接按钮开始连接FTP服务器,连接流程图如下图所示. 点击"连接"按钮后,会调用com.oyp.ftp.FTPClientFrame类的linkButtonActionPerformed(ActionEvent evt)方法,其主要代码程序如下 /** * 连接按钮的事件处理方法 */ riva

  • Python3连接Mysql8.0遇到的问题及处理步骤

    最近在使用Python开发系统,需连接mysql数据库,我用的是Python3连接MySQL8.0,其中老是报错以下问题: 网上给了各种各样的方法,大多是通过各种方式修改密码. 最简单的方法是更换了root密码的认证方式解决的,新版mysql使用的caching_sha2_password,换成mysql_native_password我就可以连上了. 步骤是在cmd命令行连接mysql(管理员身份运行cmd)  进入cmd的命令行之后,输入cd+bin目录的地址,访问mysql的bin目录.通

  • python3光学字符识别模块tesserocr与pytesseract的使用详解

    OCR,即Optical Character Recognition,光学字符识别,是指通过扫描字符,然后通过其形状将其翻译成电子文本的过程,对应图形验证码来说,它们都是一些不规则的字符,这些字符是由字符稍加扭曲变换得到的内容,我们可以使用OCR技术来讲其转化为电子文本,然后将结果提取交给服务器,便可以达到自动识别验证码的过程 tesserocr与pytesseract是Python的一个OCR识别库,但其实是对tesseract做的一层Python API封装,pytesseract是Goog

随机推荐