Java Flink与kafka实现实时告警功能过程

目录
  • 引出问题
  • demo设计
  • 环境搭建
  • flink程序代码
  • 项目演示
  • 告警系统架构

引出问题

项目使用告警系统的逻辑是将实时数据保存到本地数据库再使用定时任务做判断,然后产生告警数据。这种方式存在告警的延时实在是太高了。数据从产生到保存,从保存到判断都会存在时间差,按照保存数据定时5分钟一次,定时任务5分钟一次。最高会产生10分钟的误差,这种告警就没什么意义了。

demo设计

为了简单的还原业务场景,做了简单的demo假设

实现一个对于学生成绩评价的实时处理程序

数学成绩,基准范围是90-140,超出告警

物理成绩,基准范围是60-95,超出告警

环境搭建

使用windows环境演示

准备工作

1、安装jdk

2、安装zookeeper

解压压缩包

zoo_sample.cfg将它重命名为zoo.cfg

修改配置 dataDir=D://tools//apache-zookeeper-3.5.10-bin//data

配置环境变量

3、安装kafka

解压压缩包

修改config/server.properties

log.dirs=D://tools//kafka_2.11-2.1.0//log

flink程序代码

pom

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
    <scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.62</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

主程序

public class StreamAlertDemo {
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
		Properties properties = new Properties();
		properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
		DataStreamSource<String> inputDataStream = env.addSource(kafkaConsumer);
		DataStream<String> resultStream = inputDataStream.flatMap(new AlertFlatMapper());
		resultStream.print().setParallelism(4);
		resultStream.addSink(new FlinkKafkaProducer<>("demo",new SimpleStringSchema(),properties));
		env.execute();
	}
}
主程序,配置告警规则后期可以使用推送或者拉去方式获取数据
public class RuleMap {
	private RuleMap(){}
	public final static Map<String,List<AlertRule>> initialRuleMap;
	private static List<AlertRule> ruleList = new ArrayList<>();
	private static List<String> ruleStringList = new ArrayList<>(Arrays.asList(
			"{\"target\":\"MathVal\",\"type\":\"0\",\"criticalVal\":90,\"descInfo\":\"You Math score is too low\"}",
			"{\"target\":\"MathVal\",\"type\":\"2\",\"criticalVal\":140,\"descInfo\":\"You Math score is too high\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"0\",\"criticalVal\":60,\"descInfo\":\"You Physics score is too low\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"2\",\"criticalVal\":95,\"descInfo\":\"You Physics score is too high\"}"));
	static {
		for (String i : ruleStringList) {
			ruleList.add(JSON.parseObject(i, AlertRule.class));
		}
		initialRuleMap = ruleList.stream().collect(Collectors.groupingBy(AlertRule::getTarget));
	}
}

AlertFlatMapper,处理告警逻辑

public class AlertFlatMapper implements FlatMapFunction<String, String> {
	@Override
	public void flatMap(String inVal, Collector<String> out) throws Exception {
		Achievement user = JSON.parseObject(inVal, Achievement.class);
		Map<String, List<AlertRule>> initialRuleMap = RuleMap.initialRuleMap;
		List<AlertInfo> resList = new ArrayList<>();
		List<AlertRule> mathRule = initialRuleMap.get("MathVal");
		for (AlertRule rule : mathRule) {
			if (checkVal(user.getMathVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		List<AlertRule> physicsRule = initialRuleMap.get("PhysicsVal");
		for (AlertRule rule : physicsRule) {
			if (checkVal(user.getPhysicsVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		String result = JSON.toJSONString(resList);
		out.collect(result);
	}
	private static boolean checkVal(Integer actVal, Integer targetVal, Integer type) {
		switch (type) {
			case 0:
				return actVal < targetVal;
			case 1:
				return actVal.equals(targetVal);
			case 2:
				return actVal > targetVal;
			default:
				return false;
		}
	}
}

三个实体类

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class Achievement implements Serializable {
    private static final long serialVersionUID = -1L;
    private String name;
    private Integer mathVal;
    private Integer physicsVal;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertInfo implements Serializable {
    private static final long serialVersionUID = -1L;
    private String name;
    private String descInfo;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertRule implements Serializable {
	private static final long serialVersionUID = -1L;
	private String target;
	//0小于 1等于 2大于
	private Integer type;
	private Integer criticalVal;
	private String descInfo;
}

项目演示

创建kafka生产者 test

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

创建kafka消费者 demo

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning

启动flink应用

给topic test发送消息

{"name":"liu","MathVal":45,"PhysicsVal":76}

消费topic demo

告警系统架构

到此这篇关于Java Flink与kafka实现实时告警功能过程的文章就介绍到这了,更多相关Java Flink与kafka实时告警内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • ajax实现简单实时验证功能

    什么是ajax Ajax 即"Asynchronous Javascript And XML"(异步 JavaScript 和 XML),是指一种创建交互式网页应用的网页开发技术. Ajax = 异步 JavaScript 和 XML(标准通用标记语言的子集). Ajax 是一种用于创建快速动态网页的技术. Ajax 是一种在无需重新加载整个网页的情况下,能够更新部分网页的技术. 通过在后台与服务器进行少量数据交换,Ajax 可以使网页实现异步更新.这意味着可以在不重新加载整个网页的情

  • Spring Boot 2 整合 QuartJob 实现定时器实时管理功能

    一.QuartJob简介 1.一句话描述 Quartz是一个完全由java编写的开源作业调度框架,形式简易,功能强大. 2.核心API (1).Scheduler 代表一个 Quartz 的独立运行容器,Scheduler 将 Trigger 绑定到特定 JobDetail, 这样当 Trigger 触发时, 对应的 Job 就会被调度. (2).Trigger 描述 Job 执行的时间触发规则.主要有 SimpleTrigger 和 CronTrigger 两个子类,通过一个 TriggerK

  • Java 用Prometheus搭建实时监控系统过程详解

    上帝之火 本系列讲述的是开源实时监控告警解决方案Prometheus,这个单词很牛逼.每次我都能联想到带来上帝之火的希腊之神,普罗米修斯.而这个开源的logo也是火,个人挺喜欢这个logo的设计. 本系列着重介绍Prometheus以及如何用它和其周边的生态来搭建一套属于自己的实时监控告警平台. 本系列受众对象为初次接触Prometheus的用户,大神勿喷,偏重于操作和实战,但是重要的概念也会精炼出提及下.系列主要分为以下几块 Prometheus各个概念介绍和搭建,如何抓取数据(本次分享内容)

  • VUE + OPENLAYERS实现实时定位功能

    前言 本系列文章介绍一个简单的实时定位示例,示例的组成主要包括: 服务后端,使用 Java 语言编写,模拟生成 GeoJSON 数据. 前端展示,使用 Vue + OpenLayers ,负责定时向后端服务请求 GeoJSON 数据,并在以标签的形式展现定位数据. 实现的效果: 一.定义标签样式 var image = new CircleStyle({ radius: 5, fill: new Fill({ color: "rgba(255, 0, 0, 1)" }), stroke

  • 详解Java如何实现百万数据excel导出功能

    目录 前言 1.异步处理 1.1 使用job 1.2 使用mq 2.使用easyexcel 3.分页查询 4.多个sheet 5.计算limit的起始位置 6.文件上传到OSS 7.通过WebSocket推送通知 8.总条数可配置 9.order by商品编号 总结 前言 最近我做过一个MySQL百万级别数据的excel导出功能,已经正常上线使用了. 这个功能挺有意思的,里面需要注意的细节还真不少,现在拿出来跟大家分享一下,希望对你会有所帮助. 原始需求:用户在UI界面上点击全部导出按钮,就能导

  • AngularJs实现聊天列表实时刷新功能

    昨天在做app的聊天列表时,遇到了一个问题,消息监听器监听到了一个新的消息,但是如果这时已经处于消息列表的页面那么消息列表并不会及时更新. 我的想法是在service层中的监听器方法里,当监听到了一个新的消息,那么就在根作用域中发出一个广播,告诉controller层需要去获取最新的消息列表了. service层中发出广播: controller层中接听广播: rootScope是所有scope的父级 它的广播(broadcast)和监听(on) 可以在无交集的controller间使用 sco

  • Java使用Ajax实现跨域上传图片功能

    说明 : 图片服务器是用Nginx搭建的,用的是PHP语言 这个功能 需要 用到两个js文件: jquery.js和jQuery.form.js <script type="text/JavaScript" src="js/jquery.js"></script> <script type="text/javascript" src="js/jquery.form.js"></scri

  • java IO实现电脑搜索、删除功能的实例

    一.递归方法 1.递归就是自己调用本身的方法,前提是有方法. 2.递归使用 找出递归的规律 递归要有出口条件,也就是结束条件 3.注意事项 递归次数不能太多,否则会出现堆栈溢出现象 递归不能嵌套使用,否则出现死递归 二.IO介绍 1. i为Input输入,O为Output输出,API单独把输入和输出流封装成一个类,提供大量的方法供我们使用. 2. IO技术可以把数据写入到持久化设备(包括硬盘.光盘.U盘等),集合.数组虽然可以存储读取,但是仅仅暂存在内存中,当重新启动程序就不存在了. 3. 相对

  • Java 发送http请求上传文件功能实例

    废话不多说了,直接给大家贴代码了,具体代码如下所示: package wxapi.WxHelper; import java.io.BufferedReader; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputSt

  • Java基于正则实现的日期校验功能示例

    本文实例讲述了Java基于正则实现的日期校验功能.分享给大家供大家参考,具体如下: private void checkDate() throws IOException { // 4种分隔符 String sep = "[-\\./_]"; // 年份 String strPattern = "^(19[4-9]\\d|20\\d{2})" + sep; strPattern += "("; // 月(1,3,5,7,8,10,12) strP

随机推荐