Spring boot集成Kafka+Storm的示例代码

前言

由于业务需求需要把Strom与kafka整合到spring boot项目里,实现其他服务输出日志至kafka订阅话题,storm实时处理该话题完成数据监控及其他数据统计,但是网上教程较少,今天想写的就是如何整合storm+kafka 到spring boot,顺带说一说我遇到的坑。

使用工具及环境配置

​ 1. java 版本jdk-1.8

​ 2. 编译工具使用IDEA-2017

​ 3. maven作为项目管理

​ 4.spring boot-1.5.8.RELEASE

需求体现

1.为什么需要整合到spring boot

为了使用spring boot 统一管理各种微服务,及同时避免多个分散配置

2.具体思路及整合原因

​ 使用spring boot统一管理kafka、storm、redis等所需要的bean,通过其他服务日志收集至Kafka,KafKa实时发送日志至storm,在strom bolt时进行相应的处理操作

遇到的问题

​ 1.使用spring boot并没有相关整合storm

​ 2.以spring boot启动方式不知道如何触发提交Topolgy

​ 3.提交Topology时遇到numbis not client localhost 问题

​ 4.Storm bolt中无法通过注解获得实例化bean进行相应的操作

解决思路

在整合之前我们需要知道相应的spring boot 的启动方式及配置(如果你在阅读本文时,默认你已经对storm,kafka及spring boot有相关了解及使用)

spring boot 对storm进行整合的例子在网上很少,但是因为有相应的需求,因此我们还是需要整合.

首先导入所需要jar包:

<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.10.1.1</version>
 </dependency>

 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-stream-kafka</artifactId>
 <exclusions>
 <exclusion>
  <artifactId>zookeeper</artifactId>
  <groupId>org.apache.zookeeper</groupId>
 </exclusion>
 <exclusion>
  <artifactId>spring-boot-actuator</artifactId>
  <groupId>org.springframework.boot</groupId>
 </exclusion>
 <exclusion>
  <artifactId>kafka-clients</artifactId>
  <groupId>org.apache.kafka</groupId>
 </exclusion>
 </exclusions>
 </dependency>

 <dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
 <exclusions>
 <exclusion>
  <artifactId>kafka-clients</artifactId>
  <groupId>org.apache.kafka</groupId>
 </exclusion>
 </exclusions>
 </dependency>

 <dependency>
 <groupId>org.springframework.data</groupId>
 <artifactId>spring-data-hadoop</artifactId>
 <version>2.5.0.RELEASE</version>
 <exclusions>
 <exclusion>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-log4j12</artifactId>
 </exclusion>
 <exclusion>
  <artifactId>commons-logging</artifactId>
  <groupId>commons-logging</groupId>
 </exclusion>
 <exclusion>
  <artifactId>netty</artifactId>
  <groupId>io.netty</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jackson-core-asl</artifactId>
  <groupId>org.codehaus.jackson</groupId>
 </exclusion>
 <exclusion>
  <artifactId>curator-client</artifactId>
  <groupId>org.apache.curator</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jettison</artifactId>
  <groupId>org.codehaus.jettison</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jackson-mapper-asl</artifactId>
  <groupId>org.codehaus.jackson</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jackson-jaxrs</artifactId>
  <groupId>org.codehaus.jackson</groupId>
 </exclusion>
 <exclusion>
  <artifactId>snappy-java</artifactId>
  <groupId>org.xerial.snappy</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jackson-xc</artifactId>
  <groupId>org.codehaus.jackson</groupId>
 </exclusion>
 <exclusion>
  <artifactId>guava</artifactId>
  <groupId>com.google.guava</groupId>
 </exclusion>
 <exclusion>
  <artifactId>hadoop-mapreduce-client-core</artifactId>
  <groupId>org.apache.hadoop</groupId>
 </exclusion>
 <exclusion>
  <artifactId>zookeeper</artifactId>
  <groupId>org.apache.zookeeper</groupId>
 </exclusion>
 <exclusion>
  <artifactId>servlet-api</artifactId>
  <groupId>javax.servlet</groupId>
 </exclusion>

 </exclusions>
 </dependency>
 <dependency>
 <groupId>org.apache.zookeeper</groupId>
 <artifactId>zookeeper</artifactId>
 <version>3.4.10</version>
 <exclusions>
 <exclusion>
  <artifactId>slf4j-log4j12</artifactId>
  <groupId>org.slf4j</groupId>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupId>org.apache.hbase</groupId>
 <artifactId>hbase-client</artifactId>
 <version>1.2.4</version>
 <exclusions>
 <exclusion>
  <artifactId>log4j</artifactId>
  <groupId>log4j</groupId>
 </exclusion>
 <exclusion>
  <artifactId>zookeeper</artifactId>
  <groupId>org.apache.zookeeper</groupId>
 </exclusion>
 <exclusion>
  <artifactId>netty</artifactId>
  <groupId>io.netty</groupId>
 </exclusion>
 <exclusion>
  <artifactId>hadoop-common</artifactId>
  <groupId>org.apache.hadoop</groupId>
 </exclusion>
 <exclusion>
  <artifactId>guava</artifactId>
  <groupId>com.google.guava</groupId>
 </exclusion>
 <exclusion>
  <artifactId>hadoop-annotations</artifactId>
  <groupId>org.apache.hadoop</groupId>
 </exclusion>
 <exclusion>
  <artifactId>hadoop-yarn-common</artifactId>
  <groupId>org.apache.hadoop</groupId>
 </exclusion>
 <exclusion>
  <artifactId>slf4j-log4j12</artifactId>
  <groupId>org.slf4j</groupId>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-common</artifactId>
 <version>2.7.3</version>
 <exclusions>
 <exclusion>
  <artifactId>commons-logging</artifactId>
  <groupId>commons-logging</groupId>
 </exclusion>
 <exclusion>
  <artifactId>curator-client</artifactId>
  <groupId>org.apache.curator</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jackson-mapper-asl</artifactId>
  <groupId>org.codehaus.jackson</groupId>
 </exclusion>
 <exclusion>
  <artifactId>jackson-core-asl</artifactId>
  <groupId>org.codehaus.jackson</groupId>
 </exclusion>
 <exclusion>
  <artifactId>log4j</artifactId>
  <groupId>log4j</groupId>
 </exclusion>
 <exclusion>
  <artifactId>snappy-java</artifactId>
  <groupId>org.xerial.snappy</groupId>
 </exclusion>
 <exclusion>
  <artifactId>zookeeper</artifactId>
  <groupId>org.apache.zookeeper</groupId>
 </exclusion>
 <exclusion>
  <artifactId>guava</artifactId>
  <groupId>com.google.guava</groupId>
 </exclusion>
 <exclusion>
  <artifactId>hadoop-auth</artifactId>
  <groupId>org.apache.hadoop</groupId>
 </exclusion>
 <exclusion>
  <artifactId>commons-lang</artifactId>
  <groupId>commons-lang</groupId>
 </exclusion>
 <exclusion>
  <artifactId>slf4j-log4j12</artifactId>
  <groupId>org.slf4j</groupId>
 </exclusion>
 <exclusion>
  <artifactId>servlet-api</artifactId>
  <groupId>javax.servlet</groupId>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-mapreduce-examples</artifactId>
 <version>2.7.3</version>
 <exclusions>
 <exclusion>
  <artifactId>commons-logging</artifactId>
  <groupId>commons-logging</groupId>
 </exclusion>
 <exclusion>
  <artifactId>netty</artifactId>
  <groupId>io.netty</groupId>
 </exclusion>
 <exclusion>
  <artifactId>guava</artifactId>
  <groupId>com.google.guava</groupId>
 </exclusion>
 <exclusion>
  <artifactId>log4j</artifactId>
  <groupId>log4j</groupId>
 </exclusion>
 <exclusion>
  <artifactId>servlet-api</artifactId>
  <groupId>javax.servlet</groupId>
 </exclusion>
 </exclusions>
 </dependency>

 <!--storm-->
 <dependency>
 <groupId>org.apache.storm</groupId>
 <artifactId>storm-core</artifactId>
 <version>${storm.version}</version>
 <scope>${provided.scope}</scope>
 <exclusions>
 <exclusion>
  <groupId>org.apache.logging.log4j</groupId>
  <artifactId>log4j-slf4j-impl</artifactId>
 </exclusion>
 <exclusion>
  <artifactId>servlet-api</artifactId>
  <groupId>javax.servlet</groupId>
 </exclusion>
 </exclusions>
 </dependency>

 <dependency>
 <groupId>org.apache.storm</groupId>
 <artifactId>storm-kafka</artifactId>
 <version>1.1.1</version>
 <exclusions>
 <exclusion>
  <artifactId>kafka-clients</artifactId>
  <groupId>org.apache.kafka</groupId>
 </exclusion>
 </exclusions>
 </dependency>

其中去除jar包是因为需要相与项目构建依赖有多重依赖问题,storm版本为1.1.0  spring boot相关依赖为

```java

<!-- spring boot -->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter</artifactId>
   <exclusions>
    <exclusion>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-logging</artifactId>
    </exclusion>
   </exclusions>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-aop</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-log4j2</artifactId>
  </dependency>
  <dependency>
   <groupId>org.mybatis.spring.boot</groupId>
   <artifactId>mybatis-spring-boot-starter</artifactId>
   <version>${mybatis-spring.version}</version>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-configuration-processor</artifactId>
   <optional>true</optional>
  </dependency>

ps:maven的jar包仅因为项目使用需求,不是最精简,仅供大家参考.

项目结构:

config-存储不同环境配置文件

存储构建spring boot 相关实现类 其他如构建名

启动spring boot的时候我们会发现

其实开始整合前,对storm了解的较少,属于刚开始没有接触过,后面参考发现整合到spring boot里面启动spring boot之后并没有相应的方式去触发提交Topolgy的函数,所以也造成了以为启动spring boot之后就完事了结果等了半个小时什么事情都没发生才发现没有实现触发提交函数.

为了解决这个问题我的想法是: 启动spring boot->创建kafka监听Topic然后启动Topolgy完成启动,可是这样的问题kafka监听这个主题会重复触发Topolgy,这明显不是我们想要的.看了一会后发现spring 有相关启动完成之后执行某个时间方法,这个对我来说简直是救星啊.所以现在触发Topolgy的思路变为:

启动spring boot ->执行触发方法->完成相应的触发条件

构建方法为:

/**
 * @author Leezer
 * @date 2017/12/28
 * spring加载完后自动自动提交Topology
 **/
@Configuration
@Component
public class AutoLoad implements ApplicationListener<ContextRefreshedEvent> {

 private static String BROKERZKSTR;
 private static String TOPIC;
 private static String HOST;
 private static String PORT;
 public AutoLoad(@Value("${storm.brokerZkstr}") String brokerZkstr,
     @Value("${zookeeper.host}") String host,
     @Value("${zookeeper.port}") String port,
     @Value("${kafka.default-topic}") String topic
 ){
  BROKERZKSTR = brokerZkstr;
  HOST= host;
  TOPIC= topic;
  PORT= port;
 }

 @Override
 public void onApplicationEvent(ContextRefreshedEvent event) {
  try {
   //实例化topologyBuilder类。
   TopologyBuilder topologyBuilder = new TopologyBuilder();
   //设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
   BrokerHosts brokerHosts = new ZkHosts(BROKERZKSTR);
   // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
   SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, TOPIC, "/storm", "s32");
   spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
   spoutConfig.zkServers = Collections.singletonList(HOST);
   spoutConfig.zkPort = Integer.parseInt(PORT);
   //从Kafka最新输出日志读取
   spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
   KafkaSpout receiver = new KafkaSpout(spoutConfig);
   topologyBuilder.setSpout("kafka-spout", receiver, 1).setNumTasks(2);
   topologyBuilder.setBolt("alarm-bolt", new AlarmBolt(), 1).setNumTasks(2).shuffleGrouping("kafka-spout");
   Config config = new Config();
   config.setDebug(false);
   /*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程,如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。
   */
   config.setNumWorkers(1);
   LocalCluster cluster = new LocalCluster();
   cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

​ 注:

启动项目时因为使用的是内嵌tomcat进行启动,可能会报如下错误

[Tomcat-startStop-1] ERROR o.a.c.c.ContainerBase - A child container failed during start
java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbeddedContext[]]
 at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_144]
 at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_144]
 at org.apache.catalina.core.ContainerBase.startInternal(ContainerBase.java:939) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at org.apache.catalina.core.StandardHost.startInternal(StandardHost.java:872) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1419) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1409) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_144]
 at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_144]
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]

这是因为有相应导入的jar包引入了servlet-api版本低于内嵌版本,我们需要做的就是打开maven依赖把其去除

<exclusion>
 <artifactId>servlet-api</artifactId>
 <groupId>javax.servlet</groupId>
</exclusion>

然后重新启动就可以了.

启动过程中还有可能报:

代码如下:

org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90

这个问题我思考了很久,发现网上的解释都是因为storm配置问题导致不对,可是我的storm是部署在服务器上的.并没有相关的配置,按理也应该去服务器上读取相关配置,可是结果并不是这样的。最后尝试了几个做法发现都不对,这里才发现,在构建集群的时候storm提供了相应的本地集群

LocalCluster cluster = new LocalCluster();

进行本地测试,如果在本地测试就使用其进行部署测试,如果部署到服务器上需要把:

cluster.submitTopology("kafka-spout", config, topologyBuilder.createTopology());
//修正为:
StormSubmitter.submitTopology("kafka-spout", config, topologyBuilder.createTopology());

进行任务提交;

以上解决了上面所述的问题1-3

问题4:是在bolt中使用相关bean实例,我发现我把其使用@Component加入spring中也无法获取到实例:我的猜想是在我们构建提交Topolgy的时候,它会在:

代码如下:

topologyBuilder.setBolt("alarm-bolt",new AlarmBolt(),1).setNumTasks(2).shuffleGrouping("kafka-spout");

执行bolt相关:

@Override
 public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
  this.collector = collector;
  StormLauncher stormLauncher = StormLauncher.getStormLauncher();
  dataRepositorys =(AlarmDataRepositorys)   stormLauncher.getBean("alarmdataRepositorys");
 }

而不会实例化bolt,导致线程不一而spring 获取不到.(这里我也不是太明白,如果有大佬知道可以分享一波)

而我们使用spring boot的意义就在于这些获取这些繁杂的对象,这个问题困扰了我很久.最终想到,我们可以通过上下文getbean获取实例不知道能不能行,然后我就开始了定义:

例如我需要在bolt中使用一个服务:

/**
 * @author Leezer
 * @date 2017/12/27
 * 存储操作失败时间
 **/
@Service("alarmdataRepositorys")
public class AlarmDataRepositorys extends RedisBase implements IAlarmDataRepositorys {
 private static final String ERRO = "erro";
 /**
  * @param type 类型
  * @param key key值
  * @return 错误次数
  **/
 @Override
 public String getErrNumFromRedis(String type,String key) {
  if(type==null || key == null){
   return null;
  }else {
   ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();
   return valueOper.get(String.format("%s:%s:%s",ERRO,type,key));
  }
 }

 /**
  * @param type 错误类型
  * @param key key值
  * @param value 存储值
  **/
 @Override
 public void setErrNumToRedis(String type, String key,String value) {
  try {
   ValueOperations<String, String> valueOper = primaryStringRedisTemplate.opsForValue();
   valueOper.set(String.format("%s:%s:%s", ERRO,type, key), value, Dictionaries.ApiKeyDayOfLifeCycle, TimeUnit.SECONDS);
  }catch (Exception e){
   logger.info(Dictionaries.REDIS_ERROR_PREFIX+String.format("key为%s存入redis失败",key));
  }
 }

这里我指定了该bean的名称,则在bolt执行prepare时:使用getbean方法获取了相关bean就能完成相应的操作.

然后kafka订阅主题发送至我bolt进行相关的处理.而这里getbean的方法是在启动bootmain函数定义:

@SpringBootApplication
@EnableTransactionManagement
@ComponentScan({"service","storm"})
@EnableMongoRepositories(basePackages = {"storm"})
@PropertySource(value = {"classpath:service.properties", "classpath:application.properties","classpath:storm.properties"})
@ImportResource(locations = {
 "classpath:/configs/spring-hadoop.xml",
 "classpath:/configs/spring-hbase.xml"})
public class StormLauncher extends SpringBootServletInitializer {
 //设置 安全线程launcher实例
 private volatile static StormLauncher stormLauncher;
 //设置上下文
 private ApplicationContext context;
 public static void main(String[] args) {
  SpringApplicationBuilder application = new SpringApplicationBuilder(StormLauncher.class);
  // application.web(false).run(args);该方式是spring boot不以web形式启动
 application.run(args);
 StormLauncher s = new StormLauncher();
 s.setApplicationContext(application.context());
 setStormLauncher(s);
 }

 private static void setStormLauncher(StormLauncher stormLauncher) {
 StormLauncher.stormLauncher = stormLauncher;
 }
 public static StormLauncher getStormLauncher() {
 return stormLauncher;
 }

 @Override
 protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
 return application.sources(StormLauncher.class);
 }

 /**
 * 获取上下文
 *
 * @return the application context
 */
 public ApplicationContext getApplicationContext() {
 return context;
 }

 /**
 * 设置上下文.
 *
 * @param appContext 上下文
 */
 private void setApplicationContext(ApplicationContext appContext) {
 this.context = appContext;
 }

 /**
 * 通过自定义name获取 实例 Bean.
 *
 * @param name the name
 * @return the bean
 */
 public Object getBean(String name) {
 return context.getBean(name);
 }

 /**
 * 通过class获取Bean.
 *
 * @param <T> the type parameter
 * @param clazz the clazz
 * @return the bean
 */
 public <T> T getBean(Class<T> clazz) {
 return context.getBean(clazz);
 }

 /**
 * 通过name,以及Clazz返回指定的Bean
 *
 * @param <T> the type parameter
 * @param name the name
 * @param clazz the clazz
 * @return the bean
 */
 public <T> T getBean(String name, Class<T> clazz) {
 return context.getBean(name, clazz);
 }

到此集成storm 和kafka至spring boot已经结束了,相关kafka及其他配置我会放入github上面

对了这里还有一个kafkaclient的坑:

Async loop died! java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.

项目会报kafka client 问题,这是因为storm-kafka中,kafka使用的是0.8版本,而NetworkSend是0.9以上的版本,这里集成需要与你集成的kafka相关版本一致.

虽然集成比较简单,但是参考都比较少,加之刚开始接触storm所以思考比较多,也在这记录一下.

项目地址 - github

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • spring boot与kafka集成的简单实例

    本文介绍了spring boot与kafka集成的简单实例,分享给大家,具体如下: 引入相关依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.

  • spring boot整合spring-kafka实现发送接收消息实例代码

    前言 由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选MQ没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,ZK.只是用做一些简单数据同步的话,有点大材小用. 没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大

  • springboot 1.5.2 集成kafka的简单例子

    本文介绍了springboot 1.5.2 集成kafka的简单例子 ,分享给大家,具体如下: 随着spring boot 1.5版本的发布,在spring项目中与kafka集成更为简便. 添加依赖 compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE") 添加application.properties #kafka # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-server

  • 在springboot中对kafka进行读写的示例代码

    springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo. 1.POM配置 只需要在dependencies中增加 spring-kafka的配置即可.完整效果如下: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifa

  • Spring boot集成Kafka+Storm的示例代码

    前言 由于业务需求需要把Strom与kafka整合到spring boot项目里,实现其他服务输出日志至kafka订阅话题,storm实时处理该话题完成数据监控及其他数据统计,但是网上教程较少,今天想写的就是如何整合storm+kafka 到spring boot,顺带说一说我遇到的坑. 使用工具及环境配置 ​ 1. java 版本jdk-1.8 ​ 2. 编译工具使用IDEA-2017 ​ 3. maven作为项目管理 ​ 4.spring boot-1.5.8.RELEASE 需求体现 1.

  • Spring Boot集成Kafka的示例代码

    本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记 系统环境 使用远程服务器上搭建的kafka服务 Ubuntu 16.04 LTS kafka_2.12-0.11.0.0.tgz zookeeper-3.5.2-alpha.tar.gz 集成过程 1.创建spring boot工程,添加相关依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns=&qu

  • Spring Boot 集成 Kafkad的实现示例

    Spring Boot 作为主流微服务框架,拥有成熟的社区生态.市场应用广泛,为了方便大家,整理了一个基于spring boot的常用中间件快速集成入门系列手册,涉及RPC.缓存.消息队列.分库分表.注册中心.分布式配置等常用开源组件,大概有几十篇文章,陆续会开放出来,感兴趣同学请提前关注&收藏 消息通信有两种基本模型,即发布-订阅(Pub-Sub)模型和点对点(Point to Point)模型,发布-订阅支持生产者消费者之间的一对多关系,而点对点模型中有且仅有一个消费者. 前言 Kafka是

  • 深入研究spring boot集成kafka之spring-kafka底层原理

    目录 前言 简单集成 引入依赖 添加配置 测试发送和接收 Spring-kafka-test嵌入式KafkaServer 引入依赖 启动服务 创建新的Topic 程序启动时创建TOPIC 代码逻辑中创建 PS:其他的方式创建TOPIC 引入依赖 api方式创建 命令方式创建 消息发送之KafkaTemplate探秘 获取发送结果 异步获取 同步获取 KAFKA事务消息 REPLYINGKAFKATEMPLATE获得消息回复 Spring-kafka消息消费用法探秘 @KAFKALISTENER的

  • Spring Boot中使用RabbitMQ的示例代码

    很久没有写Spring Boot的内容了,正好最近在写Spring Cloud Bus的内容,因为内容会有一些相关性,所以先补一篇关于AMQP的整合. Message Broker与AMQP简介 Message Broker是一种消息验证.传输.路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 执行消息的聚集.消息的分解,并将结果发送到他们的目的地,然后重新组合相应返回给消息用户 调用Web服务来检索数据 响应事件或错误 使用发布-订阅模式

  • Spring boot 集成Dubbox的方法示例

    前言 因为工作原因,需要在项目中集成dubbo,所以去查询dubbo相关文档,发现dubbo目前已经不更新了,所以把目光投向了dubbox,dubbox是当当网基于dubbo二次开发的一个项目,dubbox,因为公司项目中一个是基于spring mvc 3.0的,一个是基于spring boot的,而spring boot相对来说文档少一点,所以此文记录下spring boot下如何继承dubbox 一.安装zookeeper 1.zookeeper简介 ZooKeeper是一个分布式的,开放源

  • Spring boot集成Kafka消息中间件代码实例

    一.创建Spring boot项目,添加如下依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <ar

  • Spring Boot集成Java DSL的实现代码

    Spring Integration Java DSL已经融合到Spring Integration Core 5.0,这是一个聪明而明显的举动,因为: 基于Java Config启动新Spring项目的每个人都使用它 SI Java DSL使您可以使用Lambdas等新的强大Java 8功能 您可以使用 基于IntegrationFlowBuilder的Builder模式构建流 让我们看看基于ActiveMQ JMS的示例如何使用它. Maven依赖: <dependencies> <

  • Spring Boot实现文件上传示例代码

    使用SpringBoot进行文件上传的方法和SpringMVC差不多,本文单独新建一个最简单的DEMO来说明一下. 主要步骤包括: 1.创建一个springboot项目工程,本例名称(demo-uploadfile). 2.配置 pom.xml 依赖. 3.创建和编写文件上传的 Controller(包含单文件上传和多文件上传). 4.创建和编写文件上传的 HTML 测试页面. 5.文件上传相关限制的配置(可选). 6.运行测试. 项目工程截图如下: 文件代码: <dependencies>

  • spring boot整合mybatis+mybatis-plus的示例代码

    Spring boot对于我来说是一个刚接触的新东西,学习过程中,发现这东西还是很容易上手的,Spring boot没配置时会默认使用Spring data jpa,这东西可以说一个极简洁的工具,可是我还是比较喜欢用mybatis,工具是没有最好的,只有这合适自己的. 说到mybatis,最近有一个很好用的工具--------mybatis-Plus(官网),现在更新的版本是2.1.2,这里使用的也是这个版本.我比较喜欢的功能是代码生成器,条件构造器,这样就可以更容易的去开发了. mybatis

随机推荐