对python操作kafka写入json数据的简单demo分享

如下所示:

安装kafka支持库pip install kafka-python
from kafka import KafkaProducer
import json

'''
 生产者demo
 向test_lyl2主题中循环写入10条json数据
 注意事项:要写入json数据需加上value_serializer参数,如下代码
'''
producer = KafkaProducer(
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667']
       )
for i in range(10):
 data={
  "name":"李四",
  "age":23,
  "gender":"男",
  "id":i
 }
 producer.send('test_lyl2', data)
producer.close()
from kafka import KafkaConsumer
import json

'''
 消费者demo
 消费test_lyl2主题中的数据
 注意事项:如需以json格式读取数据需加上value_deserializer参数
'''

consumer = KafkaConsumer('test_lyl2',group_id="lyl-gid1",
       bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667'],
       auto_offset_reset='earliest',value_deserializer=json.loads
       )
for message in consumer:
 print(message.value)

以上这篇对python操作kafka写入json数据的简单demo分享就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 在python环境下运用kafka对数据进行实时传输的方法

    背景: 为了满足各个平台间数据的传输,以及能确保历史性和实时性.先选用kafka作为不同平台数据传输的中转站,来满足我们对跨平台数据发送与接收的需要. kafka简介: Kafka is a distributed,partitioned,replicated commit logservice.它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现.kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外ka

  • 详解Spring Kafka中关于Kafka的配置参数

    SpringKafka文档地址:https://docs.spring.io/spring-kafka/reference/htmlsingle kafka文档地址:http://kafka.apache.org/documentation SpringKafka中配置的Java配置实现类:https://github.com/spring-projects/spring-boot/blob/v1.5.4.RELEASE/spring-boot-autoconfigure/src/main/ja

  • python hbase读取数据发送kafka的方法

    本例子实现从hbase获取数据,并发送kafka. 使用 #!/usr/bin/env python #coding=utf-8 import sys import time import json sys.path.append('/usr/local/lib/python3.5/site-packages') from thrift import Thrift from thrift.transport import TSocket from thrift.transport import

  • 通过pykafka接收Kafka消息队列的方法

    没有Kafka环境,所以也没有进行验证.感觉今后应该能用到,所以借抄在此,备查. pykafka使用示例,自动消费最新消息,不重复消费: # -* coding:utf8 *- from pykafka import KafkaClient host = '192.168.200.38' client = KafkaClient(hosts="%s:9092" % host) print client.topics # 生产者 # topicdocu = client.topics['

  • kafka-python批量发送数据的实例

    如下所示: from kafka import KafkaClient from kafka.producer import SimpleProducer def send_data_2_kafka(datas): ''' 向kafka解析队列发送数据 ''' client = KafkaClient(hosts=KAFKABROKER.split(","), timeout=30) producer = SimpleProducer(client, async=False) curc

  • Docker部署Kafka以及Spring Kafka实现

    这篇文章主要介绍了Docker部署Kafka以及Spring Kafka实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 从https://hub.docker.com/查找kafka 第三个活跃并stars数量多 进去看看使用 我们使用docker-compose来构建镜像 查看使用文档中的docker-compose.yml 因为kafka要搭配zookeeper一起使用,所以文档中包含了zookeeper 我修改了一下版本号 以及变量参

  • 在Spring Boot应用程序中使用Apache Kafka的方法步骤详解

    第1步:生成我们的项目: Spring Initializr来生成我们的项目.我们的项目将提供Spring MVC / Web支持和Apache Kafka支持. 第2步:发布/读取Kafka主题中的消息: <b>public</b> <b>class</b> User { <b>private</b> String name; <b>private</b> <b>int</b> age

  • Docker搭建Zookeeper&Kafka集群的实现

    最近在学习Kafka,准备测试集群状态的时候感觉无论是开三台虚拟机或者在一台虚拟机开辟三个不同的端口号都太麻烦了(嗯..主要是懒). 环境准备 一台可以上网且有CentOS7虚拟机的电脑 为什么使用虚拟机?因为使用的笔记本,所以每次连接网络IP都会改变,还要总是修改配置文件的,过于繁琐,不方便测试.(通过Docker虚拟网络的方式可以避免此问题,当时实验的时候没有了解到) Docker 安装 如果已经安装Docker请忽略此步骤 Docker支持以下的CentOS版本: CentOS 7 (64

  • 对python操作kafka写入json数据的简单demo分享

    如下所示: 安装kafka支持库pip install kafka-python from kafka import KafkaProducer import json ''' 生产者demo 向test_lyl2主题中循环写入10条json数据 注意事项:要写入json数据需加上value_serializer参数,如下代码 ''' producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8'

  • Python如何读取、写入JSON数据

    问题 你想读写JSON(JavaScript Object Notation)编码格式的数据. 解决方案 json 模块提供了一种很简单的方式来编码和解码JSON数据.其中两个主要的函数是 json.dumps() 和 json.loads() ,要比其他序列化函数库如pickle的接口少得多.下面演示如何将一个Python数据结构转换为JSON: import json data = { 'name' : 'ACME', 'shares' : 100, 'price' : 542.23 } j

  • python操作docx写入内容,并控制文本的字体颜色

    功能:读取txt文本,然后将目的字符串标红,再将处理过的字符串写入docx中 txt文本内容:啊打发发烧鳌太路线点击点击诶的骄傲 计划将鳌太标红 代码: f = open('D:\\test\\路线.txt', 'r') content = f.read() print(content) #此方法在切分字符时可以保留切分符 pt = r'(鳌太)' res = re.split(pt, content) print(res[0]) document = Document() p = docume

  • 在python 脚本下解析json数据

    在项目中遇到了个json数据需要解析,利用Python脚本尝试分享给大家 如下: import os import pandas as pd import numpy as np path=r'C:\users\....' #文件的上一层路径 key=['SS','AA',....] #字段名 files = os.listdir(path) all_data = [] for file in files:         filepath = os.path.join(path,file) d

  • python操作小程序云数据库实现简单的增删改查功能

    不止python,你可以利用任何语言那实现通过http请求来操作你自己的小程序云数据库了 背景 也是在最近吧,小程序更新了云开发 HTTP API 文档,提供了小程序外访问云开发资源的能力,使用 HTTP API 开发者可在已有服务器上访问云资源,实现与云开发的互通. 原本云数据库还是相对封闭的,只能通过自己的小程序或者云函数来进行访问,而现在,你只要调用官方提供的接口就能实现对云函数的增删改查了. 这里通过 python 作为演示来进行简单的测试,当然你也可以使用 java , php 等任何

  • python操作kafka实践的示例代码

    1.先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码. #!/usr/bin/env python # -*- coding: utf-8 -*- import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='xxxx:x') msg_dict = { "sleep_time": 10, "db_config": { "

  • 如何利用Python解析超大的json数据(GB级别)

    使用Python解析各种格式的数据都很方便,比如json.txt.xml.csv等.用于处理简单的数据完全足够用了,而且代码简单易懂. 前段时间我遇到一个问题,如何解析超大的json文件呢?刚开始天真的我在使用json.load直接加载json文件,然而内存报错却给了我当头一棒,json.load它是直接将数据加载到内存中然后解析出来的,这说明什么呢?当你的json文件过于庞大的时候,你的电脑内存装不下你的json文件,这时候就相当尴尬了,加载不了,解析不了!! 怎么办呢?我赶紧上网查阅资料,网

  • vue.js配合$.post从后台获取数据简单demo分享

    首先导入 <script type="text/javascript" src="/island/stage/js/vue.min.js"></script> html样式: <div id="main-content" class="wrap-container zerogrid"> <article id="news_content" v-for="i

  • java把字符串写入文件里的简单方法分享

    这个示例可以很简单的把字符串写入到文件,大家可以试试哟,这是跟一个外国朋友学的代码,大家可以学习一下了 复制代码 代码如下: import java.io.IOException;import java.nio.file.Files;import java.nio.file.Paths; public class StringToFile {    public static void main(String[] args) throws IOException {        String

  • Python如何处理JSON数据详解

    目录 什么是JSON? JSON作用 为什么使用JSON JSON的使用 最后 什么是JSON? JSON是一种轻量级的数据交互格式,采用完全独立于编程语言的文本格式来存储和表示数据.和xml相比,它更小巧,但描述能力却不差,更适合于在网络上传输数据. JSON是一种有着特殊格式的字符串,格式与对象或者数组是非常类似的,只不过属性名是带双引号的. JSON用于对象和数组的序列化.(序列化:格式转换)用于对象和数组与字符串进行相互转换. JSON作用 与 XML一样,它是格式化数据的一种方式.We

随机推荐