流式图表拒绝增删改查之kafka核心消费逻辑下篇
目录
- 前篇回顾
- kafka消费者线程
- 任务提交
前篇回顾
- 流式图表框架搭建
- kafka核心消费逻辑线程池搭建
kafka消费者线程
突击检查八股文,实现线程的方法有哪些?嗯?没复习是吧,行没关系,那感谢参加本次面试哈。
常用的几种方式分别是:
- 继承Thread类,重写run方法
- 实现Runbale接口,重写run方法
- 实现Callable接口,重写call方法
这里我们直接创捷出一个任务类实现Runable方法,重写run方法,一个线程当作一个kafka client,所以要在任务类中声明一个KafkaConsumer的成员变量,另外创建任务需要指定当前任务的名称也就是线程名,还有要监听的topic主题。
private KafkaConsumer<String, String> consumer; private String topic; private String threadName;
name和topic通过构造方法传进来,同时在构造方法里完成对client的初始化操作。
/** * 封装必要信息 * @param bootServer 生产者ip * @param groupId 分组信息 * @param topic 订阅主题 */ public KafkaConsumerRunnable(String bootServer, String groupId, String topic) { this.topic = topic; Properties props = new Properties(); props.put("bootstrap.servers", bootServer); props.put("group.id", groupId); props.put("enable.auto.commit", "false"); props.put("auto.offset.reset", "latest"); props.put("max.poll.records", 5); props.put("session.timeout.ms", "60000"); props.put("max.poll.interval.ms", 300000); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //键反序列化方式 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<>(props); }
这里封装kafka client的必要信息,入参bootServer为kafka集群ip,groupId为threadName,我们规定一个线程为一个kafka消费链接,消费一个topic。
上一篇线程池保证了任务不会轻易挂掉,就算挂掉了也会重新提交,所以为了节省资源不做所谓的同groupId的负载操作。session.timeout.ms和max.poll.interval.ms可以根据当前的kafka资源灵活配置,不然可能会引发一些reblance。
enable.auto.commit设置为false,手动提交offset,auto.offset.reset这块由于业务特殊,本来就是流式图表瞬时的展示,如果真的出现了数据丢失那就丢了吧,从最新的数据读取。
接下来只需要处理下消费逻辑,consumer.subscribe(Collections.singletonList(this.topic))开始订阅监听kafka数据,搞一个while true不断的消费数据,try catch只需要对WakeupException做处理,kafka客户端会在关闭的时候抛出WakeupException异常。
finally里提交offset,无论这条offset对应的数据消费成功还是失败都是消费过了,失败了就过去了。
@Override public void run() { consumer.subscribe(Collections.singletonList(this.topic)); String key = "stream_chart:" + this.name; Thread.currentThread().setName(key); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); // 如果队列中没有消息 等待KAFKA_TIME_OUT后调用poll,如果有消息立即消费 for (ConsumerRecord<String, String> record : records) { String value = record.value(); log.info("线程 {} 消费kafka数据 -> {} \n", Thread.currentThread().getName(), value); RedisConfig.getRedisTemplate().opsForZSet().add(key, value, Instant.now().getEpochSecond() * 1000); } } } catch (WakeupException e) { log.info("ignore for shutdown", e); } finally { consumer.commitAsync(); } }
我们消费到数据直接放到redis的zset结构里,当前的时间戳作为score,最后留一个关闭客户端的后门
// 退出后关掉客户端 public void shutDown() { consumer.wakeup(); }
任务提交
任务提交这块只需要在业务service中注入线程池,创建对应的KafkaRunable任务封装对应的信息,执行execute即可。
这里有个坑需要注意下,第二次突击检查八股文,线程池提交方法submit与execute的区别说一下。不知道的立刻去熟读并背诵。
public class TestTheadPool { public static void main(String[] args) { ExecutorService executorService= Executors.newFixedThreadPool(1); executorService.submit(new task("submit")); executorService.execute(new task("execute")); } } class task implements Runnable{ private String name; public task(String name) { this.name = name; } @Override public void run() { System.out.println(this.name + " start task"); int i=1/0; } }
熟悉的同学通过示例代码可以看出来,submit提交的线程不会抛出异常代码,只有获取Future返回值并执行get方法才会捕获到异常。这块涉及到异步的东西不再赘述
try { Future<?> submit = executorService.submit(new task("submit")); submit.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
所以我们要使用execute执行,不然kafka消费线程里消费失败了拦截不到就不会被重新提交,导致线程挂掉。
以上就是流式图表拒绝增删改查之kafka核心消费逻辑下篇的详细内容,更多关于kafka消费流式图表的资料请关注我们其它相关文章!