基于FLink实现实时安全检测的示例代码

目录
  • 研发背景
  • 场景描述
  • 组件版本
  • 日志结构
  • 技术方案
  • 关键代码
    • 主入口类
    • mapper算子
    • filter算子
    • keyBy算子
    • 窗口函数(核心代码)
    • 最后一次map算子
    • ElasticSearch工具类
    • 事件实体类
    • 消息实体类

研发背景

公司安全部目前针对内部系统的网络访问日志的安全审计,大部分都是T+1时效,每日当天,启动Python编写的定时任务,完成昨日的日志审计和检测,定时任务运行完成后,统一进行企业微信告警推送。这种方案在目前的网络环境和人员规模下,呈现两个痛点,一是面对日益频繁的网络攻击、钓鱼链接,T+1的定时任务,难以及时进行告警,因此也难以有效避免如关键信息泄露等问题,二是目前以Python为主的单机定时任务,针对不同场景的处理时效,从一小时到十几小时不等,效率低下。为解决以上问题,本人协助公司安全部同时对告警采集平台进行改造,由之前的python单机任务处理,切换到基于Flink集群的并行处理,且告警推送时效,由之前的T+1天,提升到秒级实时告警。本次改造涉及网络日志审计的多个常见场景,如端口扫描、黑名单统计、异常流量、连续恶意登录等。本次以一段时间内连续登录失败20次后,下一次登录成功场景来进行介绍。

场景描述

针对一个内部系统,如邮件系统,公司员工的访问行为日志,存放于kafka,我们希望对于一个用户账号在同一个IP下,任意的3分钟时间内,连续登录邮件系统20次失败,下一次登录成功,这种场景能够及时获取并推送到企业微信某个指定的安全接口人。kafka中的数据,能够通过某个关键字,区分当前网络访问是否一次登录事件,且有访问时间(也就是事件时间)。在解析到符合需求的用户账号之后,第一时间进行企业微信告警推送,并将其这段时间内的访问行为,写入下游ElasticSearch。

组件版本

  • Flink-1.14.4
  • Java8
  • ElasticSearch-7.3.2
  • Kafka-2.12_2.8.1

日志结构

IP和账号皆为测试使用。

{
   "user": "wangxm",
   "client_ip": "110.68.6.182",
   "source": "login",
   "loginname": "wangxm@test.com",
   "IP": "110.8.148.58",
   "timestamp": "17:58:12",
   "@timestamp": "2022-04-20T09:58:13.647Z",
   "ip": "110.7.231.25",
   "clienttype": "POP3",
   "result": "success",
   "@version": "1"
 }

技术方案

上述场景,可考虑使用FlinkCEP及Flink的滑动窗口进行实现。由于本人在采用FlinkCEP的方案进行代码编写调试后,发现并不能满足,因此改用滑动窗口进行实现。

关键代码

主入口类

主入口类,创建了flink环境、设置了基础参数,创建了kafkaSource,接入消息后,进行了映射、过滤,并设置了水位线,进行了分组,之后设置了滑动窗口,在窗口内进行了事件统计,将复合条件的事件收集返回并写入ElasticSearch。

针对map、filter、keyBy、window等算子,都单独进行了编写,后面会一一列出来。

package com.data.dev.flink.mailTopic.main;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import com.data.dev.elasticsearch.ElasticSearchInfo;
import com.data.dev.elasticsearch.SinkToEs;
import com.data.dev.flink.FlinkEnv;
import com.data.dev.flink.mailTopic.OperationForLoginFailCheck.*;
import com.data.dev.kafka.KafkaSourceBuilder;
import com.data.dev.key.ConfigurationKey;
import com.data.dev.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.time.Duration;

/**
 * Flink处理在3分钟内连续登录失败20次后登录成功的场景
 * 采用滑动窗口来实现
 * @author wangxiaomin 2022-06-01
 */

@Slf4j
public class MailMsg extends BaseBean {

    /**
     * Flink作业名称
     */
    public static final  String JobName = "告警采集平台——连续登录失败后登录成功告警";
    /**
     * Kafka消息名
     */
    public static final  String KafkaSourceName = "Kafka Source for AlarmPlatform About Mail Topic";

    public MailMsg(){
        log.info("初始化滑动窗口场景告警程序");
    }

    /**
     * 执行逻辑统计场景,实现告警推送
     */
    public static void execute(){

        //① 创建Flink执行环境并设置checkpoint等必要的参数
        StreamExecutionEnvironment env = FlinkEnv.getFlinkEnv();
        KafkaSource<String> kafkaSource = KafkaSourceBuilder.getKafkaSource(ConfigurationKey.KAFKA_MAIL_TOPIC_NAME,ConfigurationKey.KAFKA_MAIL_CONSUMER_GROUP_ID) ;
        DataStreamSource<String> kafkaMailMsg = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)), KafkaSourceName);

        //② 筛选登录消息,创建初始登录事件流
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginMapDs = kafkaMailMsg.map(new MsgToBeanMapper()).name("Map算子加工");
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginFilterDs = loginMapDs.filter(new MailMsgForLoginFilter()).name("Filter算子加工");

        //③ 设置水位线
        WatermarkStrategy<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> watermarkStrategy = WatermarkStrategy.<com.data.dev.common.javabean.kafkaMailTopic.MailMsg>forBoundedOutOfOrderness(Duration.ofMinutes(1))
                        .withTimestampAssigner((mailMsg, timestamp) -> TimeUtils.switchUTCToBeijingTimestamp(mailMsg.getTimestamp_datetime()));
        SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginWmDs = loginFilterDs.assignTimestampsAndWatermarks(watermarkStrategy.withIdleness(Duration.ofMinutes(3))).name("增加水位线");

        //④ 设置主键
        KeyedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String> loginKeyedDs = loginWmDs.keyBy(new LoginKeySelector());

        //⑥ 转化为滑动窗口
        WindowedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String, TimeWindow> loginWindowDs = loginKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(180L),Time.seconds(90L)));

        //⑦ 在窗口内进行逻辑统计
        SingleOutputStreamOperator<MailMsgAlarm> loginWindowsDealDs  = loginWindowDs.process(new WindowProcessFuncImpl()).name("窗口处理逻辑");

        //⑧ 将结果转化为通用DataStream<String>格式
        SingleOutputStreamOperator<String> resultDs  = loginWindowsDealDs.map(new AlarmMsgToStringMapper()).name("窗口结果转化为标准格式");

        //⑨ 将最终结果写入ES
        resultDs.addSink(SinkToEs.getEsSinkBuilder(ElasticSearchInfo.ES_LOGIN_FAIL_INDEX_NAME,ElasticSearchInfo.ES_INDEX_TYPE_DEFAULT).build());

        //⑩ 提交Flink集群进行执行
        FlinkEnv.envExec(env,JobName);

    }
}

mapper算子

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.alibaba.fastjson.JSON;
import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;

/**
 *  逻辑统计场景告警推送ES消息体
 *  @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public   class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> {

    @Override
    public String map(MailMsgAlarm mailMsgAlarm) throws Exception {
        return JSON.toJSONString(mailMsgAlarm);
    }
}

filter算子

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FilterFunction;

/**
 * ② 消费mail主题的消息,过滤其中login的事件
 * @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public class MailMsgForLoginFilter extends BaseBean implements FilterFunction<MailMsg> {
    @Override
    public boolean filter(MailMsg mailMsg) {
        if("login".equals(mailMsg.getSource())) {
            log.info("筛选原始的login事件:【" + mailMsg + "】");
        }
        return "login".equals(mailMsg.getSource());
    }
}

keyBy算子

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.functions.KeySelector;

/**
 * CEP 编程,需要进行key选取
 */
@Slf4j
public class LoginKeySelector extends BaseBean implements KeySelector<MailMsg, String> {
    @Override
    public String getKey(MailMsg mailMsg) {
        return mailMsg.getUser() + "@" + mailMsg.getClient_ip();
    }
}

窗口函数(核心代码)

这里我们主要考虑使用一个事件列表,用来存储每一个窗口期内得到的连续登录,当检测到登陆失败的事件,即存入事件列表中,之后判断下一次登录失败事件,如果检测到登录成功事件,但此时登录失败的次数不足20次,则清空loginEventList,等待下一次检测。一旦符合窗口内连续登录失败超过20次且下一次登录成功这个事件,则清空此时的loginEventList并将当前登录成功的事件进行告警推送。

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import com.data.dev.utils.HttpUtils;
import com.data.dev.utils.IPUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 *  滑动窗口内复杂事件解析逻辑实现
 *  @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public   class WindowProcessFuncImpl extends  ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow> implements Serializable {
    @Override
    public void process(String key, ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow>.Context context, Iterable<MailMsg> iterable, Collector<MailMsgAlarm> collector) {

        List<MailMsg> loginEventList = new ArrayList<>();
        MailMsgAlarm mailMsgAlarm;
        for (MailMsg mailMsg : iterable) {
            log.info("收集到的登录事件【" + mailMsg + "】");

            if (mailMsg.getResult().equals("fail")) { //开始检测当前窗口内的事件,并将失败的事件收集到loginEventList
                log.info("开始检测当前窗口内的事件,并将失败的事件收集到loginEventList");
                loginEventList.add(mailMsg);
            } else if (mailMsg.getResult().equals("success") && loginEventList.size() < 20) {//如果检测到登录成功事件,但此时登录失败的次数不足20次,则清空loginEventList,等待下一次检测
                log.info("检测到登录成功事件,但此时登录失败的次数为【" + loginEventList.size() + "】不足20次,清空loginEventList,等待下一次检测");
                loginEventList.clear();
            } else if (mailMsg.getResult().equals("success") && loginEventList.size() >= 20) {
                mailMsgAlarm = getMailMsgAlarm(loginEventList,mailMsg);
                log.info("检测到登录成功的事件,此时窗口内连续登录失败的次数为【" + mailMsgAlarm.getFailTimes() + "】");

                //一旦符合窗口内连续登录失败超过20次且下一次登录成功这个事件,则清空此时的loginEventList并将当前登录成功的事件进行告警推送;
                loginEventList.clear();
                doAlarmPush(mailMsgAlarm);

                collector.collect(mailMsgAlarm);//将当前登录成功的事件进行收集上报
            } else {
                log.info(mailMsg.getUser() + "当前已连续:【" + loginEventList.size() + "】 次登录失败");
            }
        }
    }

    /**
     * 2022年6月17日15:03:06
     * @param eventList:当前窗口内的事件列表
     * @param eventCurrent:当前登录成功的事件
     * @return mailMsgAlarm:告警消息体
     */
    public static MailMsgAlarm getMailMsgAlarm(List<MailMsg> eventList,MailMsg eventCurrent){

        String alarmKey = eventCurrent.getUser() + "@" + eventCurrent.getClient_ip();
        String loginFailStartTime = eventList.get(0).getTimestamp_datetime();
        String loginSuccessTime = eventCurrent.getTimestamp_datetime();
        int loginFailTimes = eventList.size();

        MailMsgAlarm mailMsgAlarm = new MailMsgAlarm();
        mailMsgAlarm.setMailMsg(eventCurrent);
        mailMsgAlarm.setAlarmKey(alarmKey);
        mailMsgAlarm.setStartTime(loginFailStartTime);
        mailMsgAlarm.setEndTime(loginSuccessTime);
        mailMsgAlarm.setFailTimes(loginFailTimes);

        return mailMsgAlarm;
    }

    /**
     * 2022年6月17日14:47:53
     * @param mailMsgAlarm :当前构建的需要告警的事件
     */
    public void doAlarmPush(MailMsgAlarm mailMsgAlarm){
        String userKey = mailMsgAlarm.getAlarmKey();
        String clientIp = mailMsgAlarm.mailMsg.getClient_ip();
        boolean isWhiteListIp = IPUtils.isWhiteListIp(clientIp);
        if(isWhiteListIp){//如果是白名单IP,不告警
            log.info("当前登录用户【" + userKey + "】属于白名单IP");
        }else {
            //IP归属查询结果、企业微信推送告警
            String user = HttpUtils.getUserByClientIp(clientIp);
            HttpUtils.pushAlarmMsgToWechatWork(user,mailMsgAlarm.toString());
        }
    }
}

最后一次map算子

package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;

import com.alibaba.fastjson.JSON;
import com.data.dev.common.javabean.BaseBean;
import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;

/**
 *  逻辑统计场景告警推送ES消息体
 *  @author wangxiaoming-ghq 2022-06-01
 */
@Slf4j
public   class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> {

    @Override
    public String map(MailMsgAlarm mailMsgAlarm) throws Exception {
        return JSON.toJSONString(mailMsgAlarm);
    }
}

ElasticSearch工具类

package com.data.dev.elasticsearch;

import com.data.dev.common.javabean.BaseBean;
import com.data.dev.key.ConfigurationKey;
import com.data.dev.key.ElasticSearchKey;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 2022年6月17日15:15:06
 * @author wangxiaoming-ghq
 * Flink流计算结果写入ES公共方法
 */
@Slf4j
public class SinkToEs extends BaseBean {
    public static final long serialVersionUID = 2L;
    private static final HashMap<String,String> ES_PROPS_MAP = ConfigurationKey.getApplicationProps();
    private static final String HOST = ES_PROPS_MAP.get(ConfigurationKey.ES_HOST);
    private static final String PASSWORD = ES_PROPS_MAP.get(ConfigurationKey.ES_PASSWORD);
    private static final String USERNAME = ES_PROPS_MAP.get(ConfigurationKey.ES_USERNAME);
    private static final String PORT = ES_PROPS_MAP.get(ConfigurationKey.ES_PORT);

    /**
     * 2022年6月17日15:17:55
     * 获取ES连接信息
     * @return esInfoMap:ES连接信息持久化
     */
    public static HashMap<String,String > getElasticSearchInfo(){
        log.info("获取ES连接信息:【 " + "HOST="+HOST + "PORT="+PORT+"USERNAME="+USERNAME+"PASSWORD=********" + " 】");
        HashMap<String,String> esInfoMap = new HashMap<>();
        esInfoMap.put(ElasticSearchKey.HOST,HOST);
        esInfoMap.put(ElasticSearchKey.PASSWORD,PASSWORD);
        esInfoMap.put(ElasticSearchKey.USERNAME,USERNAME);
        esInfoMap.put(ElasticSearchKey.PORT,PORT);

        return esInfoMap;
    }

    /**
     * @param esIndexName:写入索引名称
     * @param esType:写入索引类型
     * @return ElasticsearchSink.Builder<String>:构建器
     */
    public static ElasticsearchSink.Builder<String> getEsSinkBuilder(String esIndexName,String esType){
        HashMap<String, String> esInfoMap = getElasticSearchInfo();
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost(String.valueOf(esInfoMap.get(ElasticSearchKey.HOST)), Integer.parseInt(esInfoMap.get(ElasticSearchKey.PORT)), "http"));

        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<String>() {

                    public IndexRequest createIndexRequest() {
                        Map<String, String> json = new HashMap<>();
                        //log.info("写入ES的data:【"+json+"】");
                        IndexRequest index  = Requests.indexRequest()
                                .index(esIndexName)
                                .type(esType)
                                .source(json);
                        return index;
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest());
                    }
                }
        );

        //定义es的连接配置  带用户名密码
        RestClientFactory restClientFactory = restClientBuilder -> {
            CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(
                    AuthScope.ANY,
                    new UsernamePasswordCredentials(
                            String.valueOf(esInfoMap.get(ElasticSearchKey.USERNAME)),
                            String.valueOf(esInfoMap.get(ElasticSearchKey.PASSWORD))
                    )
            );
            restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
                httpAsyncClientBuilder.disableAuthCaching();
                return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            });
        };

        esSinkBuilder.setRestClientFactory(restClientFactory);
        return esSinkBuilder;
    }

}

事件实体类

package com.data.dev.common.javabean.kafkaMailTopic;

import com.data.dev.common.javabean.BaseBean;
import lombok.Data;

import java.util.Objects;

/**
 * @author wangxiaoming-ghq 2022-05-15
 * 逻辑统计场景告警事件
 */
@Data
public class MailMsgAlarm extends BaseBean {

    /**
     * 当前登录成功的事件
     */
   public  MailMsg mailMsg;

    /**
     * 当前捕获的告警主键:username@client_ip
     */
   public  String alarmKey;

    /**
     * 第一次登录失败的事件时间
     */
   public  String startTime;

    /**
     * 连续登录失败后下一次登录成功的事件时间
     */
   public  String endTime;

    /**
     * 连续登录失败的次数
     */
   public  int failTimes;

    @Override
    public String toString() {
        return "{" +
                "  'mailMsg_login_success':'" + mailMsg + "'" +
                ", 'alarmKey':'" + alarmKey + "'" +
                ", 'start_login_time_in3min':'"  +startTime + "'" +
                ", 'end_login_time_in3min':'"  +endTime + "'" +
                ", 'login_fail_times':'"  +failTimes +  "'" +
                "}";
    }

    public MailMsgAlarm() {
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof MailMsgAlarm)) return false;
        MailMsgAlarm that = (MailMsgAlarm) o;
        return getFailTimes() == that.getFailTimes() && getMailMsg().equals(that.getMailMsg()) && getAlarmKey().equals(that.getAlarmKey()) && getStartTime().equals(that.getStartTime()) && getEndTime().equals(that.getEndTime());
    }

    @Override
    public int hashCode() {
        return Objects.hash(getMailMsg(), getAlarmKey(), getStartTime(), getEndTime(), getFailTimes());
    }
}

消息实体类

package com.data.dev.common.javabean.kafkaMailTopic;

import com.data.dev.common.javabean.BaseBean;
import lombok.Data;

import java.util.Objects;

/**
 * {
 *   "user": "wangxm",
 *   "client_ip": "110.68.6.182",
 *   "source": "login",
 *   "loginname": "wangxm@test.com",
 *   "IP": "110.8.148.58",
 *   "timestamp": "17:58:12",
 *   "@timestamp": "2022-04-20T09:58:13.647Z",
 *   "ip": "110.7.231.25",
 *   "clienttype": "POP3",
 *   "result": "success",
 *   "@version": "1"
 * }
 *
 * user登录用户
 * client_ip 来源ip
 * source 类型
 * loginname 登录用户邮箱地址
 * ip 目标前端ip
 * timestamp 发送时间
 * @timestamp  发送日期时间
 * IP 邮件日志发送来源IP
 * clienttype 客户端登录类型
 * result 登录状态
 */

@Data
public class MailMsg extends BaseBean {
    public String user;
    public String client_ip;
    public String source;
    public String loginName;
    public String mailSenderSourceIp;
    public String timestamp_time;
    public String timestamp_datetime;
    public String ip;
    public String clientType;
    public String result;
    public String version;

    public MailMsg() {
    }

    public MailMsg(String user, String client_ip, String source, String loginName, String mailSenderSourceIp, String timestamp_time, String timestamp_datetime, String ip, String clientType, String result, String version) {
        this.user = user;
        this.client_ip = client_ip;
        this.source = source;
        this.loginName = loginName;
        this.mailSenderSourceIp = mailSenderSourceIp;
        this.timestamp_time = timestamp_time;
        this.timestamp_datetime = timestamp_datetime;
        this.ip = ip;
        this.clientType = clientType;
        this.result = result;
        this.version = version;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof MailMsg)) return false;
        MailMsg mailMsg = (MailMsg) o;
        return getUser().equals(mailMsg.getUser()) && getClient_ip().equals(mailMsg.getClient_ip()) && getSource().equals(mailMsg.getSource()) && getLoginName().equals(mailMsg.getLoginName()) && getMailSenderSourceIp().equals(mailMsg.getMailSenderSourceIp()) && getTimestamp_time().equals(mailMsg.getTimestamp_time()) && getTimestamp_datetime().equals(mailMsg.getTimestamp_datetime()) && getIp().equals(mailMsg.getIp()) && getClientType().equals(mailMsg.getClientType()) && getResult().equals(mailMsg.getResult()) && getVersion().equals(mailMsg.getVersion());
    }

    @Override
    public int hashCode() {
        return Objects.hash(getUser(), getClient_ip(), getSource(), getLoginName(), getMailSenderSourceIp(), getTimestamp_time(), getTimestamp_datetime(), getIp(), getClientType(), getResult(), getVersion());
    }

    @Override
    public String toString() {
        return "{" +
                "  'user':'" + user + "'" +
                ", 'client_ip':'" + client_ip  + "'" +
                ", 'source':'" + source  + "'" +
                ", 'loginName':'" + loginName  + "'" +
                ", 'IP':'" + mailSenderSourceIp + "'" +
                ", 'timestamp':'" + timestamp_time + "'" +
                ", '@timestamp':'" + timestamp_datetime + "'" +
                ", 'ip':'"  + "'" +
                ", 'clientType':'" + clientType  + "'" +
                ", 'result':'" + result  + "'" +
                ", 'version':'" + version + "'" +
                "}";
    }

}

源代码已去掉敏感信息,地址:https://gitee.com/wangxm-2270/alarmCollectByFlink.git

到此这篇关于基于FLink实现实时安全检测的示例代码的文章就介绍到这了,更多相关FLink实时安全检测内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 解析Flink内核原理与实现核心抽象

    目录 一.环境对象 1.1 执行环境 StreamExecutionEnvironment LocalStreamEnvironment RemoteStreamEnvironment StreamContextEnvironment StreamPlanEnvironment ScalaShellStreamEnvironment 1.2 运行时环境 RuntimeEnvironment SavepointEnvironment 1.3 运行时上下文 StreamingRuntimeConte

  • Flink自定义Sink端实现过程讲解

    目录 Sink介绍 UML关系 Flink addSink 案例 Sink介绍 在Fink官网中sink端只是给出了常规的write api.在我们实际开发场景中需要将flink处理的数据写入kafka,hbase kudu等外部系统. UML关系 自定义Sink需要实现父类的接口和继承抽象类. 上面是Sink的继承关系 Flink addSink // 方法需要SinkFunction的对象 public DataStreamSink<T> addSink(SinkFunction<T

  • Flink实践Savepoint使用示例详解

    目录 一.背景 Snapshot 状态快照 分布式快照 Checkpoint & Savepoint 二.Flink on yarn 如何使用 savepoint 附录:一致性语义 确保精确一次(exactly once) 端到端精确一次 一.背景 什么是 savepoint,为什么要使用 savepoint ? 保障 flink 作业在 配置迭代.flink 版本升级.蓝绿部署中的数据一致性,提高容错.降低恢复时间: 在此之前引入几个概念: Snapshot 状态快照 Flink 通过状态快照

  • 基于FLink实现实时安全检测的示例代码

    目录 研发背景 场景描述 组件版本 日志结构 技术方案 关键代码 主入口类 mapper算子 filter算子 keyBy算子 窗口函数(核心代码) 最后一次map算子 ElasticSearch工具类 事件实体类 消息实体类 研发背景 公司安全部目前针对内部系统的网络访问日志的安全审计,大部分都是T+1时效,每日当天,启动Python编写的定时任务,完成昨日的日志审计和检测,定时任务运行完成后,统一进行企业微信告警推送.这种方案在目前的网络环境和人员规模下,呈现两个痛点,一是面对日益频繁的网络

  • Android中的人脸检测的示例代码(静态和动态)

    (1)背景. Google 于2006年8月收购Neven Vision 公司 (该公司拥有10多项应用于移动设备领域的图像识别的专利),以此获得了图像识别的技术,并加入到android中.Android 中的人脸识别技术,用到的底层库:android/external/neven/,framework 层:frameworks/base/media/java/android/media/FaceDetector.java. Java 层接口的限制:A,只能接受Bitmap 格式的数据:B,只能

  • 基于Java实现扫码登录的示例代码

    目录 基本介绍 原理解析 1. 身份认证机制 2. 流程概述 代码实现 1. 环境准备 2. 主要依赖 3. 生成二维码 4. 扫描二维码 5. 确认登录 6. PC 端轮询 7. 拦截器配置 效果演示 1. 工具准备 2. 数据准备 3. 扫码登录流程展示 结语 基本介绍 相信大家对二维码都不陌生,生活中到处充斥着扫码登录的场景,如登录网页版微信.支付宝等.最近学习了一下扫码登录的原理,感觉蛮有趣的,于是自己实现了一个简易版扫码登录的 Demo,以此记录一下学习过程. 实际上是面试的时候被问到

  • Python Opencv实现单目标检测的示例代码

    一 简介 目标检测即为在图像中找到自己感兴趣的部分,将其分割出来进行下一步操作,可避免背景的干扰.以下介绍几种基于opencv的单目标检测算法,算法总体思想先尽量将目标区域的像素值全置为1,背景区域全置为0,然后通过其它方法找到目标的外接矩形并分割,在此选择一张前景和背景相差较大的图片作为示例. 环境:python3.7 opencv4.4.0 二 背景前景分离 1 灰度+二值+形态学 轮廓特征和联通组件 根据图像前景和背景的差异进行二值化,例如有明显颜色差异的转换到HSV色彩空间进行分割. 1

  • python 实时调取摄像头的示例代码

    调取摄像头的实现 import numpy as np import cv2 cap = cv2.VideoCapture(0) #参数为0时调用本地摄像头:url连接调取网络摄像头:文件地址获取本地视频 while(True): ret,frame=cap.read() #灰度化 gray=cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY) cv2.imshow('frame',gray) #普通图片 cv2.imshow('frame',frame) if cv2.

  • python+opencv3.4.0 实现HOG+SVM行人检测的示例代码

    参照opencv官网例程写了一个基于python的行人检测程序,实现了和自带检测器基本一致的检测效果. 网址 :https://docs.opencv.org/3.4.0/d5/d77/train_HOG_8cpp-example.html opencv版本:3.4.0 训练集和opencv官方用了同一个,可以从http://pascal.inrialpes.fr/data/human/下载,在网页的最下方"here(970MB处)",用迅雷下载比较快(500kB/s).训练集文件比较

  • 基于C语言实现迷宫游戏的示例代码

    目录 C语言迷宫游戏 定义地图 打印地图方法一 打印地图方法二 定义起点和终点位置 实现读取按键 实现小球下向下移动一步 总结小球移动规律 实现重新打印地图 实现连续移动 实现小球下向上下左右移动 实现小球走到终点就胜利 C语言迷宫游戏 这篇文章是给学完并学懂了C语言的分支(选择和循环)结构和二维数组的朋友看的. 要做一个游戏或者程序先要想好有那些要求,以下是我认为一个迷宫必带的要求: 迷宫要先打印出来(要设置墙.空气.小球的起点),是墙就不能,是空气就可以走. 每次输入'w'.'a'.'s'.

  • 基于JS实现飞机大战游戏的示例代码

    目录 演示 技术栈 源码 定义敌方战机 定义我方战机 碰撞检测 演示 技术栈 今天没有什么特别要讲的,要不我们提前介绍下次要做的技术吧.你不说话就是同意了.我们开始了. 下图是正则表达式的一些总结大家可以先看看哦 (function() { /** * 1. JavaScript使用正则式的函数 */ const str = "abchelloasdasdhelloasd"; // 1. 查找 console.log(str.search("h")); // 3 /

  • 基于Python编写一个点名器的示例代码

    目录 前言 主界面 添加姓名 查看花名册 使用指南 名字转动功能 完整代码 前言 想起小学的时候老师想点名找小伙伴回答问题的时候,老师竟斥巨资买了个点名器.今日无聊便敲了敲小时候老师斥巨资买的点名器. 本人姓白,就取名小白点名器啦,嘿嘿 代码包含:添加姓名.查看花名册.使用指南.随机抽取名字的功能(完整源码在最后) 主界面 定义主界面.使用“w+”模式创建test.txt文件(我添加了个背景图片,若不需要可省略) #打开时预加载储存在test.txt文件中的花名册 namelist = [] w

  • 基于Python实现24点游戏的示例代码

    目录 1.前言 2.思路 3.代码 1.前言 24数大家之前玩过没有? 规则:一副扑克牌抽走大王,小王,K,Q,J(有的规则里面会抽走10,本文一律不抽走),之后在牌堆里随机抽取四张牌,将这四张牌加减乘除得到24. 如果再高级一点,还会有根号.阶乘.幂之类的算法,别问为啥不能幂运算,问就是懒,自己看思路自己实现去(bushi. 知识点:随机数,列表,嵌套判断,循环,死循环,都是新手接触的东西. 由于不能进行像根号,阶乘高级的运算,改版之后完全可以了. 话不多说,上思路 2.思路 1.随机生成四个

随机推荐