FLink如何编写实时安全检测的示例代码?
- 内容介绍
- 文章标签
- 相关推荐
本文共计2718个文字,预计阅读时间需要11分钟。
目录+研发背景+场景描述+组件版本+日志结构+技术方案+关键代码+主入口类+mapper算子+filter算子+keyBy算子+窗口函数(核心代码)+最后一次map算子+ElasticSearch工具类+事件实体类+消息实体类
目录
- 研发背景
- 场景描述
- 组件版本
- 日志结构
- 技术方案
- 关键代码
- 主入口类
- mapper算子
- filter算子
- keyBy算子
- 窗口函数(核心代码)
- 最后一次map算子
- ElasticSearch工具类
- 事件实体类
- 消息实体类
研发背景
公司安全部目前针对内部系统的网络访问日志的安全审计,大部分都是T+1时效,每日当天,启动Python编写的定时任务,完成昨日的日志审计和检测,定时任务运行完成后,统一进行企业微信告警推送。这种方案在目前的网络环境和人员规模下,呈现两个痛点,一是面对日益频繁的网络攻击、钓鱼链接,T+1的定时任务,难以及时进行告警,因此也难以有效避免如关键信息泄露等问题,二是目前以Python为主的单机定时任务,针对不同场景的处理时效,从一小时到十几小时不等,效率低下。为解决以上问题,本人协助公司安全部同时对告警采集平台进行改造,由之前的python单机任务处理,切换到基于Flink集群的并行处理,且告警推送时效,由之前的T+1天,提升到秒级实时告警。本次改造涉及网络日志审计的多个常见场景,如端口扫描、黑名单统计、异常流量、连续恶意登录等。本次以一段时间内连续登录失败20次后,下一次登录成功场景来进行介绍。
场景描述
针对一个内部系统,如邮件系统,公司员工的访问行为日志,存放于kafka,我们希望对于一个用户账号在同一个IP下,任意的3分钟时间内,连续登录邮件系统20次失败,下一次登录成功,这种场景能够及时获取并推送到企业微信某个指定的安全接口人。kafka中的数据,能够通过某个关键字,区分当前网络访问是否一次登录事件,且有访问时间(也就是事件时间)。在解析到符合需求的用户账号之后,第一时间进行企业微信告警推送,并将其这段时间内的访问行为,写入下游ElasticSearch。
组件版本
- Flink-1.14.4
- Java8
- ElasticSearch-7.3.2
- Kafka-2.12_2.8.1
日志结构
IP和账号皆为测试使用。
{ "user": "wangxm", "client_ip": "110.68.6.182", "source": "login", "loginname": "wangxm@test.com", "IP": "110.8.148.58", "timestamp": "17:58:12", "@timestamp": "2022-04-20T09:58:13.647Z", "ip": "110.7.231.25", "clienttype": "POP3", "result": "success", "@version": "1" }
技术方案
上述场景,可考虑使用FlinkCEP及Flink的滑动窗口进行实现。由于本人在采用FlinkCEP的方案进行代码编写调试后,发现并不能满足,因此改用滑动窗口进行实现。
关键代码
主入口类
主入口类,创建了flink环境、设置了基础参数,创建了kafkaSource,接入消息后,进行了映射、过滤,并设置了水位线,进行了分组,之后设置了滑动窗口,在窗口内进行了事件统计,将复合条件的事件收集返回并写入ElasticSearch。
针对map、filter、keyBy、window等算子,都单独进行了编写,后面会一一列出来。
package com.data.dev.flink.mailTopic.main; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm; import com.data.dev.elasticsearch.ElasticSearchInfo; import com.data.dev.elasticsearch.SinkToEs; import com.data.dev.flink.FlinkEnv; import com.data.dev.flink.mailTopic.OperationForLoginFailCheck.*; import com.data.dev.kafka.KafkaSourceBuilder; import com.data.dev.key.ConfigurationKey; import com.data.dev.utils.TimeUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.time.Duration; /** * Flink处理在3分钟内连续登录失败20次后登录成功的场景 * 采用滑动窗口来实现 * @author wangxiaomin 2022-06-01 */ @Slf4j public class MailMsg extends BaseBean { /** * Flink作业名称 */ public static final String JobName = "告警采集平台——连续登录失败后登录成功告警"; /** * Kafka消息名 */ public static final String KafkaSourceName = "Kafka Source for AlarmPlatform About Mail Topic"; public MailMsg(){ log.info("初始化滑动窗口场景告警程序"); } /** * 执行逻辑统计场景,实现告警推送 */ public static void execute(){ //① 创建Flink执行环境并设置checkpoint等必要的参数 StreamExecutionEnvironment env = FlinkEnv.getFlinkEnv(); KafkaSource<String> kafkaSource = KafkaSourceBuilder.getKafkaSource(ConfigurationKey.KAFKA_MAIL_TOPIC_NAME,ConfigurationKey.KAFKA_MAIL_CONSUMER_GROUP_ID) ; DataStreamSource<String> kafkaMailMsg = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)), KafkaSourceName); //② 筛选登录消息,创建初始登录事件流 SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginMapDs = kafkaMailMsg.map(new MsgToBeanMapper()).name("Map算子加工"); SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginFilterDs = loginMapDs.filter(new MailMsgForLoginFilter()).name("Filter算子加工"); //③ 设置水位线 WatermarkStrategy<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> watermarkStrategy = WatermarkStrategy.<com.data.dev.common.javabean.kafkaMailTopic.MailMsg>forBoundedOutOfOrderness(Duration.ofMinutes(1)) .withTimestampAssigner((mailMsg, timestamp) -> TimeUtils.switchUTCToBeijingTimestamp(mailMsg.getTimestamp_datetime())); SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginWmDs = loginFilterDs.assignTimestampsAndWatermarks(watermarkStrategy.withIdleness(Duration.ofMinutes(3))).name("增加水位线"); //④ 设置主键 KeyedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String> loginKeyedDs = loginWmDs.keyBy(new LoginKeySelector()); //⑥ 转化为滑动窗口 WindowedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String, TimeWindow> loginWindowDs = loginKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(180L),Time.seconds(90L))); //⑦ 在窗口内进行逻辑统计 SingleOutputStreamOperator<MailMsgAlarm> loginWindowsDealDs = loginWindowDs.process(new WindowProcessFuncImpl()).name("窗口处理逻辑"); //⑧ 将结果转化为通用DataStream<String>格式 SingleOutputStreamOperator<String> resultDs = loginWindowsDealDs.map(new AlarmMsgToStringMapper()).name("窗口结果转化为标准格式"); //⑨ 将最终结果写入ES resultDs.addSink(SinkToEs.getEsSinkBuilder(ElasticSearchInfo.ES_LOGIN_FAIL_INDEX_NAME,ElasticSearchInfo.ES_INDEX_TYPE_DEFAULT).build()); //⑩ 提交Flink集群进行执行 FlinkEnv.envExec(env,JobName); } }
mapper算子
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.alibaba.fastjson.JSON; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.MapFunction; /** * 逻辑统计场景告警推送ES消息体 * @author wangxiaoming-ghq 2022-06-01 */ @Slf4j public class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> { @Override public String map(MailMsgAlarm mailMsgAlarm) throws Exception { return JSON.toJSONString(mailMsgAlarm); } }
filter算子
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsg; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.FilterFunction; /** * ② 消费mail主题的消息,过滤其中login的事件 * @author wangxiaoming-ghq 2022-06-01 */ @Slf4j public class MailMsgForLoginFilter extends BaseBean implements FilterFunction<MailMsg> { @Override public boolean filter(MailMsg mailMsg) { if("login".equals(mailMsg.getSource())) { log.info("筛选原始的login事件:"); } return "login".equals(mailMsg.getSource()); } }
keyBy算子
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsg; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.java.functions.KeySelector; /** * CEP 编程,需要进行key选取 */ @Slf4j public class LoginKeySelector extends BaseBean implements KeySelector<MailMsg, String> { @Override public String getKey(MailMsg mailMsg) { return mailMsg.getUser() + "@" + mailMsg.getClient_ip(); } }
窗口函数(核心代码)
这里我们主要考虑使用一个事件列表,用来存储每一个窗口期内得到的连续登录,当检测到登陆失败的事件,即存入事件列表中,之后判断下一次登录失败事件,如果检测到登录成功事件,但此时登录失败的次数不足20次,则清空loginEventList,等待下一次检测。一旦符合窗口内连续登录失败超过20次且下一次登录成功这个事件,则清空此时的loginEventList并将当前登录成功的事件进行告警推送。
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.data.dev.common.javabean.kafkaMailTopic.MailMsg; import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm; import com.data.dev.utils.HttpUtils; import com.data.dev.utils.IPUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.io.Serializable; import java.util.ArrayList; import java.util.List; /** * 滑动窗口内复杂事件解析逻辑实现 * @author wangxiaoming-ghq 2022-06-01 */ @Slf4j public class WindowProcessFuncImpl extends ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow> implements Serializable { @Override public void process(String key, ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow>.Context context, Iterable<MailMsg> iterable, Collector<MailMsgAlarm> collector) { List<MailMsg> loginEventList = new ArrayList<>(); MailMsgAlarm mailMsgAlarm; for (MailMsg mailMsg : iterable) { log.info("收集到的登录事件"); if (mailMsg.getResult().equals("fail")) { //开始检测当前窗口内的事件,并将失败的事件收集到loginEventList log.info("开始检测当前窗口内的事件,并将失败的事件收集到loginEventList"); loginEventList.add(mailMsg); } else if (mailMsg.getResult().equals("success") && loginEventList.size() < 20) {//如果检测到登录成功事件,但此时登录失败的次数不足20次,则清空loginEventList,等待下一次检测 log.info("检测到登录成功事件,但此时登录失败的次数为不足20次,清空loginEventList,等待下一次检测"); loginEventList.clear(); } else if (mailMsg.getResult().equals("success") && loginEventList.size() >= 20) { mailMsgAlarm = getMailMsgAlarm(loginEventList,mailMsg); log.info("检测到登录成功的事件,此时窗口内连续登录失败的次数为"); //一旦符合窗口内连续登录失败超过20次且下一次登录成功这个事件,则清空此时的loginEventList并将当前登录成功的事件进行告警推送; loginEventList.clear(); doAlarmPush(mailMsgAlarm); collector.collect(mailMsgAlarm);//将当前登录成功的事件进行收集上报 } else { log.info(mailMsg.getUser() + "当前已连续: 次登录失败"); } } } /** * 2022年6月17日15:03:06 * @param eventList:当前窗口内的事件列表 * @param eventCurrent:当前登录成功的事件 * @return mailMsgAlarm:告警消息体 */ public static MailMsgAlarm getMailMsgAlarm(List<MailMsg> eventList,MailMsg eventCurrent){ String alarmKey = eventCurrent.getUser() + "@" + eventCurrent.getClient_ip(); String loginFailStartTime = eventList.get(0).getTimestamp_datetime(); String loginSuccessTime = eventCurrent.getTimestamp_datetime(); int loginFailTimes = eventList.size(); MailMsgAlarm mailMsgAlarm = new MailMsgAlarm(); mailMsgAlarm.setMailMsg(eventCurrent); mailMsgAlarm.setAlarmKey(alarmKey); mailMsgAlarm.setStartTime(loginFailStartTime); mailMsgAlarm.setEndTime(loginSuccessTime); mailMsgAlarm.setFailTimes(loginFailTimes); return mailMsgAlarm; } /** * 2022年6月17日14:47:53 * @param mailMsgAlarm :当前构建的需要告警的事件 */ public void doAlarmPush(MailMsgAlarm mailMsgAlarm){ String userKey = mailMsgAlarm.getAlarmKey(); String clientIp = mailMsgAlarm.mailMsg.getClient_ip(); boolean isWhiteListIp = IPUtils.isWhiteListIp(clientIp); if(isWhiteListIp){//如果是白名单IP,不告警 log.info("当前登录用户属于白名单IP"); }else { //IP归属查询结果、企业微信推送告警 String user = HttpUtils.getUserByClientIp(clientIp); HttpUtils.pushAlarmMsgToWechatWork(user,mailMsgAlarm.toString()); } } }
最后一次map算子
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.alibaba.fastjson.JSON; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.MapFunction; /** * 逻辑统计场景告警推送ES消息体 * @author wangxiaoming-ghq 2022-06-01 */ @Slf4j public class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> { @Override public String map(MailMsgAlarm mailMsgAlarm) throws Exception { return JSON.toJSONString(mailMsgAlarm); } }
ElasticSearch工具类
package com.data.dev.elasticsearch; import com.data.dev.common.javabean.BaseBean; import com.data.dev.key.ConfigurationKey; import com.data.dev.key.ElasticSearchKey; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; import org.apache.gitee.com/wangxm-2270/alarmCollectByFlink.git
到此这篇关于基于FLink实现实时安全检测的示例代码的文章就介绍到这了,更多相关FLink实时安全检测内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!
本文共计2718个文字,预计阅读时间需要11分钟。
目录+研发背景+场景描述+组件版本+日志结构+技术方案+关键代码+主入口类+mapper算子+filter算子+keyBy算子+窗口函数(核心代码)+最后一次map算子+ElasticSearch工具类+事件实体类+消息实体类
目录
- 研发背景
- 场景描述
- 组件版本
- 日志结构
- 技术方案
- 关键代码
- 主入口类
- mapper算子
- filter算子
- keyBy算子
- 窗口函数(核心代码)
- 最后一次map算子
- ElasticSearch工具类
- 事件实体类
- 消息实体类
研发背景
公司安全部目前针对内部系统的网络访问日志的安全审计,大部分都是T+1时效,每日当天,启动Python编写的定时任务,完成昨日的日志审计和检测,定时任务运行完成后,统一进行企业微信告警推送。这种方案在目前的网络环境和人员规模下,呈现两个痛点,一是面对日益频繁的网络攻击、钓鱼链接,T+1的定时任务,难以及时进行告警,因此也难以有效避免如关键信息泄露等问题,二是目前以Python为主的单机定时任务,针对不同场景的处理时效,从一小时到十几小时不等,效率低下。为解决以上问题,本人协助公司安全部同时对告警采集平台进行改造,由之前的python单机任务处理,切换到基于Flink集群的并行处理,且告警推送时效,由之前的T+1天,提升到秒级实时告警。本次改造涉及网络日志审计的多个常见场景,如端口扫描、黑名单统计、异常流量、连续恶意登录等。本次以一段时间内连续登录失败20次后,下一次登录成功场景来进行介绍。
场景描述
针对一个内部系统,如邮件系统,公司员工的访问行为日志,存放于kafka,我们希望对于一个用户账号在同一个IP下,任意的3分钟时间内,连续登录邮件系统20次失败,下一次登录成功,这种场景能够及时获取并推送到企业微信某个指定的安全接口人。kafka中的数据,能够通过某个关键字,区分当前网络访问是否一次登录事件,且有访问时间(也就是事件时间)。在解析到符合需求的用户账号之后,第一时间进行企业微信告警推送,并将其这段时间内的访问行为,写入下游ElasticSearch。
组件版本
- Flink-1.14.4
- Java8
- ElasticSearch-7.3.2
- Kafka-2.12_2.8.1
日志结构
IP和账号皆为测试使用。
{ "user": "wangxm", "client_ip": "110.68.6.182", "source": "login", "loginname": "wangxm@test.com", "IP": "110.8.148.58", "timestamp": "17:58:12", "@timestamp": "2022-04-20T09:58:13.647Z", "ip": "110.7.231.25", "clienttype": "POP3", "result": "success", "@version": "1" }
技术方案
上述场景,可考虑使用FlinkCEP及Flink的滑动窗口进行实现。由于本人在采用FlinkCEP的方案进行代码编写调试后,发现并不能满足,因此改用滑动窗口进行实现。
关键代码
主入口类
主入口类,创建了flink环境、设置了基础参数,创建了kafkaSource,接入消息后,进行了映射、过滤,并设置了水位线,进行了分组,之后设置了滑动窗口,在窗口内进行了事件统计,将复合条件的事件收集返回并写入ElasticSearch。
针对map、filter、keyBy、window等算子,都单独进行了编写,后面会一一列出来。
package com.data.dev.flink.mailTopic.main; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm; import com.data.dev.elasticsearch.ElasticSearchInfo; import com.data.dev.elasticsearch.SinkToEs; import com.data.dev.flink.FlinkEnv; import com.data.dev.flink.mailTopic.OperationForLoginFailCheck.*; import com.data.dev.kafka.KafkaSourceBuilder; import com.data.dev.key.ConfigurationKey; import com.data.dev.utils.TimeUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.time.Duration; /** * Flink处理在3分钟内连续登录失败20次后登录成功的场景 * 采用滑动窗口来实现 * @author wangxiaomin 2022-06-01 */ @Slf4j public class MailMsg extends BaseBean { /** * Flink作业名称 */ public static final String JobName = "告警采集平台——连续登录失败后登录成功告警"; /** * Kafka消息名 */ public static final String KafkaSourceName = "Kafka Source for AlarmPlatform About Mail Topic"; public MailMsg(){ log.info("初始化滑动窗口场景告警程序"); } /** * 执行逻辑统计场景,实现告警推送 */ public static void execute(){ //① 创建Flink执行环境并设置checkpoint等必要的参数 StreamExecutionEnvironment env = FlinkEnv.getFlinkEnv(); KafkaSource<String> kafkaSource = KafkaSourceBuilder.getKafkaSource(ConfigurationKey.KAFKA_MAIL_TOPIC_NAME,ConfigurationKey.KAFKA_MAIL_CONSUMER_GROUP_ID) ; DataStreamSource<String> kafkaMailMsg = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)), KafkaSourceName); //② 筛选登录消息,创建初始登录事件流 SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginMapDs = kafkaMailMsg.map(new MsgToBeanMapper()).name("Map算子加工"); SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginFilterDs = loginMapDs.filter(new MailMsgForLoginFilter()).name("Filter算子加工"); //③ 设置水位线 WatermarkStrategy<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> watermarkStrategy = WatermarkStrategy.<com.data.dev.common.javabean.kafkaMailTopic.MailMsg>forBoundedOutOfOrderness(Duration.ofMinutes(1)) .withTimestampAssigner((mailMsg, timestamp) -> TimeUtils.switchUTCToBeijingTimestamp(mailMsg.getTimestamp_datetime())); SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginWmDs = loginFilterDs.assignTimestampsAndWatermarks(watermarkStrategy.withIdleness(Duration.ofMinutes(3))).name("增加水位线"); //④ 设置主键 KeyedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String> loginKeyedDs = loginWmDs.keyBy(new LoginKeySelector()); //⑥ 转化为滑动窗口 WindowedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String, TimeWindow> loginWindowDs = loginKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(180L),Time.seconds(90L))); //⑦ 在窗口内进行逻辑统计 SingleOutputStreamOperator<MailMsgAlarm> loginWindowsDealDs = loginWindowDs.process(new WindowProcessFuncImpl()).name("窗口处理逻辑"); //⑧ 将结果转化为通用DataStream<String>格式 SingleOutputStreamOperator<String> resultDs = loginWindowsDealDs.map(new AlarmMsgToStringMapper()).name("窗口结果转化为标准格式"); //⑨ 将最终结果写入ES resultDs.addSink(SinkToEs.getEsSinkBuilder(ElasticSearchInfo.ES_LOGIN_FAIL_INDEX_NAME,ElasticSearchInfo.ES_INDEX_TYPE_DEFAULT).build()); //⑩ 提交Flink集群进行执行 FlinkEnv.envExec(env,JobName); } }
mapper算子
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.alibaba.fastjson.JSON; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.MapFunction; /** * 逻辑统计场景告警推送ES消息体 * @author wangxiaoming-ghq 2022-06-01 */ @Slf4j public class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> { @Override public String map(MailMsgAlarm mailMsgAlarm) throws Exception { return JSON.toJSONString(mailMsgAlarm); } }
filter算子
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsg; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.FilterFunction; /** * ② 消费mail主题的消息,过滤其中login的事件 * @author wangxiaoming-ghq 2022-06-01 */ @Slf4j public class MailMsgForLoginFilter extends BaseBean implements FilterFunction<MailMsg> { @Override public boolean filter(MailMsg mailMsg) { if("login".equals(mailMsg.getSource())) { log.info("筛选原始的login事件:"); } return "login".equals(mailMsg.getSource()); } }
keyBy算子
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsg; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.java.functions.KeySelector; /** * CEP 编程,需要进行key选取 */ @Slf4j public class LoginKeySelector extends BaseBean implements KeySelector<MailMsg, String> { @Override public String getKey(MailMsg mailMsg) { return mailMsg.getUser() + "@" + mailMsg.getClient_ip(); } }
窗口函数(核心代码)
这里我们主要考虑使用一个事件列表,用来存储每一个窗口期内得到的连续登录,当检测到登陆失败的事件,即存入事件列表中,之后判断下一次登录失败事件,如果检测到登录成功事件,但此时登录失败的次数不足20次,则清空loginEventList,等待下一次检测。一旦符合窗口内连续登录失败超过20次且下一次登录成功这个事件,则清空此时的loginEventList并将当前登录成功的事件进行告警推送。
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.data.dev.common.javabean.kafkaMailTopic.MailMsg; import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm; import com.data.dev.utils.HttpUtils; import com.data.dev.utils.IPUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.io.Serializable; import java.util.ArrayList; import java.util.List; /** * 滑动窗口内复杂事件解析逻辑实现 * @author wangxiaoming-ghq 2022-06-01 */ @Slf4j public class WindowProcessFuncImpl extends ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow> implements Serializable { @Override public void process(String key, ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow>.Context context, Iterable<MailMsg> iterable, Collector<MailMsgAlarm> collector) { List<MailMsg> loginEventList = new ArrayList<>(); MailMsgAlarm mailMsgAlarm; for (MailMsg mailMsg : iterable) { log.info("收集到的登录事件"); if (mailMsg.getResult().equals("fail")) { //开始检测当前窗口内的事件,并将失败的事件收集到loginEventList log.info("开始检测当前窗口内的事件,并将失败的事件收集到loginEventList"); loginEventList.add(mailMsg); } else if (mailMsg.getResult().equals("success") && loginEventList.size() < 20) {//如果检测到登录成功事件,但此时登录失败的次数不足20次,则清空loginEventList,等待下一次检测 log.info("检测到登录成功事件,但此时登录失败的次数为不足20次,清空loginEventList,等待下一次检测"); loginEventList.clear(); } else if (mailMsg.getResult().equals("success") && loginEventList.size() >= 20) { mailMsgAlarm = getMailMsgAlarm(loginEventList,mailMsg); log.info("检测到登录成功的事件,此时窗口内连续登录失败的次数为"); //一旦符合窗口内连续登录失败超过20次且下一次登录成功这个事件,则清空此时的loginEventList并将当前登录成功的事件进行告警推送; loginEventList.clear(); doAlarmPush(mailMsgAlarm); collector.collect(mailMsgAlarm);//将当前登录成功的事件进行收集上报 } else { log.info(mailMsg.getUser() + "当前已连续: 次登录失败"); } } } /** * 2022年6月17日15:03:06 * @param eventList:当前窗口内的事件列表 * @param eventCurrent:当前登录成功的事件 * @return mailMsgAlarm:告警消息体 */ public static MailMsgAlarm getMailMsgAlarm(List<MailMsg> eventList,MailMsg eventCurrent){ String alarmKey = eventCurrent.getUser() + "@" + eventCurrent.getClient_ip(); String loginFailStartTime = eventList.get(0).getTimestamp_datetime(); String loginSuccessTime = eventCurrent.getTimestamp_datetime(); int loginFailTimes = eventList.size(); MailMsgAlarm mailMsgAlarm = new MailMsgAlarm(); mailMsgAlarm.setMailMsg(eventCurrent); mailMsgAlarm.setAlarmKey(alarmKey); mailMsgAlarm.setStartTime(loginFailStartTime); mailMsgAlarm.setEndTime(loginSuccessTime); mailMsgAlarm.setFailTimes(loginFailTimes); return mailMsgAlarm; } /** * 2022年6月17日14:47:53 * @param mailMsgAlarm :当前构建的需要告警的事件 */ public void doAlarmPush(MailMsgAlarm mailMsgAlarm){ String userKey = mailMsgAlarm.getAlarmKey(); String clientIp = mailMsgAlarm.mailMsg.getClient_ip(); boolean isWhiteListIp = IPUtils.isWhiteListIp(clientIp); if(isWhiteListIp){//如果是白名单IP,不告警 log.info("当前登录用户属于白名单IP"); }else { //IP归属查询结果、企业微信推送告警 String user = HttpUtils.getUserByClientIp(clientIp); HttpUtils.pushAlarmMsgToWechatWork(user,mailMsgAlarm.toString()); } } }
最后一次map算子
package com.data.dev.flink.mailTopic.OperationForLoginFailCheck; import com.alibaba.fastjson.JSON; import com.data.dev.common.javabean.BaseBean; import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.MapFunction; /** * 逻辑统计场景告警推送ES消息体 * @author wangxiaoming-ghq 2022-06-01 */ @Slf4j public class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> { @Override public String map(MailMsgAlarm mailMsgAlarm) throws Exception { return JSON.toJSONString(mailMsgAlarm); } }
ElasticSearch工具类
package com.data.dev.elasticsearch; import com.data.dev.common.javabean.BaseBean; import com.data.dev.key.ConfigurationKey; import com.data.dev.key.ElasticSearchKey; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; import org.apache.gitee.com/wangxm-2270/alarmCollectByFlink.git
到此这篇关于基于FLink实现实时安全检测的示例代码的文章就介绍到这了,更多相关FLink实时安全检测内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!

