Kafka简单客户端编程实例

今天,我们给大家带来一篇如何利用Kafka的API进行客户端编程的文章,这篇文章很简单,就是利用Kafka的API创建一个生产者和消费者,生产者不断向Kafka写入消息,消费者则不断消费Kafka的消息。下面是具体的实例代码。

一、创建配置类Config

这个类很简单,只是存放了两个常量,一个是话题TOPIC,一个是线程数THREADS

package com.lya.kafka; 

/**
 * 配置项
 * @author liuyazhuang
 *
 */
public class Config { 

 /**
  * 话题
  */
 public static final String TOPIC = "wordcount";
 /**
  * 线程数
  */
 public static final Integer THREADS = 1;
}

二、编程生产者类ProducerDemo

这个类的主要作用就是向Kafka写入相应的消息,并且将消息写入wordcount话题。

package com.lya.kafka; 

import java.util.Properties; 

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig; 

/**
 * 生产者实例
 * @author liuyazhuang
 *
 */
public class ProducerDemo {
 public static void main(String[] args) throws Exception {
  Properties props = new Properties();
  props.put("zk.connect", "192.168.209.121:2181");
  props.put("metadata.broker.list","192.168.209.121:9092");
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  props.put("zk.connectiontimeout.ms", "15000");
  ProducerConfig config = new ProducerConfig(props);
  Producer<String, String> producer = new Producer<String, String>(config); 

  // 发送业务消息
  // 读取文件 读取内存数据库 读socket端口
  for (int i = 1; i <= 100; i++) {
   Thread.sleep(500);
   producer.send(new KeyedMessage<String, String>(Config.TOPIC,
     "this number ===>>> " + i));
  } 

 }
}

三、编写消息者类ConsumerDemo

这个类的主要作用就是消费Kafka中wordcount话题的消息。

package com.lya.kafka; 

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties; 

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata; 

/**
 * 消费者实例
 * @author liuyazhuang
 *
 */
public class ConsumerDemo { 

 public static void main(String[] args) { 

  Properties props = new Properties();
  props.put("zookeeper.connect", "192.168.209.121:2181");
  props.put("group.id", "1111");
  props.put("auto.offset.reset", "smallest");
  props.put("zk.connectiontimeout.ms", "15000"); 

  ConsumerConfig config = new ConsumerConfig(props);
  ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  topicCountMap.put(Config.TOPIC, Config.THREADS);
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(Config.TOPIC); 

  for(final KafkaStream<byte[], byte[]> kafkaStream : streams){
   new Thread(new Runnable() {
    @Override
    public void run() {
     for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
      String msg = new String(mm.message());
      System.out.println(msg);
     }
    } 

   }).start(); 

  }
 }
}

四、运行实例

首先,运行消费者类ConsumerDemo
运行结果如下:

没有打印任何信息。
此时,我们运行生产者类ProducerDemo
我们再次打开消费者的控制台查看如下:

打印出了生产者生产的消息。
至此,Kafka简单客户端编程实例结束。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Kafka使用Java客户端进行访问的示例代码

    本文环境如下: 操作系统:CentOS 6 32位 JDK版本:1.8.0_77 32位 Kafka版本:0.9.0.1(Scala 2.11) 1. maven依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependen

  • Kafka简单客户端编程实例

    今天,我们给大家带来一篇如何利用Kafka的API进行客户端编程的文章,这篇文章很简单,就是利用Kafka的API创建一个生产者和消费者,生产者不断向Kafka写入消息,消费者则不断消费Kafka的消息.下面是具体的实例代码. 一.创建配置类Config 这个类很简单,只是存放了两个常量,一个是话题TOPIC,一个是线程数THREADS package com.lya.kafka; /** * 配置项 * @author liuyazhuang * */ public class Config

  • Python简单网络编程示例【客户端与服务端】

    本文实例讲述了Python简单网络编程.分享给大家供大家参考,具体如下: 内容目录 1. 客户端(client.py) 2. 服务端(server.py) 一.客户端(client.py) import socket import sys port = 70 host = sys.argv[1] filename = sys.argv[2] s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((host, port))

  • Java网络编程实例——简单模拟在线聊天

    1.前提知识 需要知道简单的IO流操作,以及简单的UDP发送数据包的原理. 需要用到的类:DatagramSocket.DatagramPacket UDP数据包基于DatagramSocket发送和接收,DatagramPacket用于封装数据包 看下案例: 客户端发送消息: 正常情况下从控制台读信息,封装到DatagramPacket之中,再由DatagramSocket的send方法发出 读取到bye的时候退出聊天 public class UdpOnlineClient { public

  • PHP基于socket实现的简单客户端和服务端通讯功能示例

    本文实例讲述了PHP基于socket实现的简单客户端和服务端通讯功能.分享给大家供大家参考,具体如下: 服务器端: <?php set_time_limit(0); $host="localhost"; $port=1001; //创建一个连接 $socket=socket_create(AF_INET,SOCK_STREAM,SOL_TCP)or die("cannot create socket\n"); //绑定socket到端口 $result=soc

  • C++中Socket网络编程实例详解

    C++中Socket网络编程实例详解 现在几乎所有C/C++的后台程序都需要进行网络通讯,其实现方法无非有两种:使用系统底层socket或者使用已有的封装好的网络库.本文对两种方式进行总结,并介绍一个轻量级的网络通讯库ZeroMQ.  1.基本的Scoket编程 关于基本的scoket编程网络上已有很多资料,作者在这里引用一篇文章中的内容进行简要说明. 基于socket编程,基本上就是以下6个步骤: 1.socket()函数 2.bind()函数 3.listen().connect()函数 4

  • C#串口编程实例代码

    由于工作需要,第一次接触串口编程.所以不得不在网上查阅各种编程实例.最后结合自己的理解与实践,最终有了如下代码. 本代码只经过了简单的软件测试,与简单的硬件测试. using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Runtime.InteropServices; using System.Threading; using System.IO.Port

  • Java thrift服务器和客户端创建实例代码

    Thrift是一个软件框架,用来进行可扩展且跨语言的服务的开发.它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 等等编程语言间无缝结合的.高效的服务. Thrift最初由facebook开发,07年四月开放源码,08年5月进入apache孵化器.thrift允许你定义一个简单的定义文

  • C++ 中 socket编程实例详解

    C++ 中 socket编程实例详解 sockets(套接字)编程有三种,流式套接字(SOCK_STREAM),数据报套接字(SOCK_DGRAM),原始套接字(SOCK_RAW):基于TCP的socket编程是采用的流式套接字.在这个程序中,将两个工程添加到一个工作区.要链接一个ws2_32.lib的库文件. 服务器端编程的步骤: 1:加载套接字库,创建套接字(WSAStartup()/socket()): 2:绑定套接字到一个IP地址和一个端口上(bind()): 3:将套接字设置为监听模式

  • websocket++简单使用及实例分析

    前言 html5支持使用websocket协议与服务器保持一个长连接,方便双方互相传输数据,而且服务器也能主动发送信息给客户端,而在这之前使用HTTP是很难做到的.下面介绍使用C++实现的websocket++的简单使用.websocket++更详细介绍点此. websocket++需要boost的支持,所以工程中需要包含boost的头文件和库.boost在VS中的如何使用参考此文章. C++代码 #include "stdafx.h" #include <iostream>

  • java fastdfs客户端使用实例代码

    本文研究的主要是java fastdfs客户端使用实例的相关内容,具体实现如下. 什么是FastDFS? FastDFS是用c语言编写的一款开源的分布式文件系统.FastDFS为互联网量身定制,充分考虑了冗余备份.负载均衡.线性扩容等机制,并注重高可用.高性能等指标,使用FastDFS很容易搭建一套高性能的文件服务器集群提供文件上传.下载等服务. FastDFS架构 FastDFS架构包括 Tracker server和Storage server.客户端请求Tracker server进行文件

随机推荐