IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink 实时告警系统开发 -> 正文阅读

[大数据]Flink 实时告警系统开发

一、背景需求介绍

实时监控系统需要满足对多种来源的数据进行告警,为提升系统的可扩展行和灵活性,采用动态规则配置来实现多种数据源、多种告警规则的实时告警

1、数据来源

车机应用端数据源:

数据源内容hostporttopic
Sentry 中的异常、奔溃tsp3dev07.para.com9092events
车机端埋点数据tsp3dev01.para.com,tsp3dev01.para.com,tsp3dev03.para.com9092data-monitor-ivi

云端数据源:

数据源内容hostporttopic
云端埋点数据tsp3dev01.para.com,tsp3dev01.para.com,tsp3dev03.para.com9092cloud_cp_data
云端微服务日志tsp3dev01.para.com,tsp3dev01.para.com,tsp3dev03.para.com9092data-monitor-cloud

二、系统架构设计

1、系统分层架构设计

本着高内聚低耦合的原则,实时告警系统采用分层设计的思想对整体的功能模块进行组合,其中:

1、Flink DataStream 层的功能是数据流在Flink内部的整体流向DAG图,如addSourceconnectprocessaddSink

2、Flink Function 层的功能是对function的具体实现,如AlertManagerSinkFunctionCustomMysqlSourceFunctionRuleMatchBroadCastProcessFunction等;

3、Service 层是业务的处理过程,如负责向AlertManager传输数据的AlertManagerService、负责规则同步、更新、维护、转化、匹配的 RulesService

2、业务模块设计

说明:业务上,需要告警的数据源目前有4中数据来源,分别是远端日志、云端微服务日志、车机端埋点、Sentry异常奔溃,其中Sentry 中的数据需要通过告警规则的筛选后发送到kafka中用于实时监控。设计上首先通过Driver中的class 路由到通用JSON告警模块或者Sentry异常奔溃业务处理模块,其次通过app.type 选择kafka中的数据源。

3、Flink DataStream 处理流程图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7VDDELwQ-1627199664538)(images/Flink DataStream 处理流程图.png)]

说明:DataStream 处理流程图展示的是数据从Kafka消费后再Flink Function 中的流向关系,Driver 负责Flink程序的启动,通过class筛选路由到通用JSON告警或者Sentry异常崩溃模块,其中内部的逻辑比较相似:

1、首先Mysql中的配置通过自定义数据源模块会被解析成配置流

2、其次kafka topic 会被解析成数据流,通过广播连接,配置流会被广播到每个数据流的TaskManager;

3、通过规则匹配模块对数据流和规则流进行匹配;

4、匹配到数据筛选出非Sentry中的数据分别发送到AlertManager实时告警、MySQL告警统计、kafka 实时监控

4、规则匹配模块设计

规则匹配模块核心使用的是Avaitor规则引擎表达式进行规则匹配,匹配的内容来源于:

1、数据流的JSON通过flattenAsMap转成map;

2、规则流中有效的Rule中获取得到的规则表达式。

5、规则设计

规则存储在MySQL中便于管理和修改,表结构如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CgpZeA6H-1627199664546)(images/规则表结构.png)]

其中,各字段解释如下:

id:为唯一的规则id,一个程序中id被认为是同一条规则;

name:规则的名称,用于规则的解释和告警输出;

exp:规则表达式,用于和数据流匹配;

update_time:规则变化时的更新时间;

create_time:规则的创建时间;

is_valid:是否有效规则,匹配的时候只使用有效的规则;

app_type:应用类型,只有当数据流中的ap_type和程序启动时的app.type 相同时才有可能匹配成功。

建表语句为:

CREATE TABLE  IF NOT EXISTS `flink`.`flink-alert-rule123` (
     `id` int(16) NOT NULL AUTO_INCREMENT COMMENT '主键id',
     `name` varchar(255) DEFAULT NULL COMMENT '规则名称',
     `exp` varchar(1020) DEFAULT NULL COMMENT '规则表达式',
     `update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新规则时间',
     `create_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '创建规则时间',
     `is_valid` int(1) NOT NULL COMMENT '规则是否有效,无效不会告警',
     `app_type` varchar(255) NOT NULL COMMENT '规则适用的应用类型,值必须是AppType的枚举值,忽略大小写',
     PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

6、规则表达说明

1、规则表中的exp字段用来存储规则表达式,用aviator表达式引擎执行JSON是否满足表达式,返回boolean值,

2、规则的最简单格式为:key 比较运算符 value

  • key 是要匹配的JSON中的全路径key,不支持特殊字符,支持.
  • 比较运算符 可以是 ==!=<> 等参考:https://www.yuque.com/boyan-avfmj/aviatorscript/lvabnw
  • value 是数字或者字符串,包含特殊字符用''包裹

3、规则的例子:

  • JSON格式的数据源:
{
    "common_data":{
        "appPackage":"ltd.qisi.sotasupportapp",
        "appVersion":"3.03.01.000",
        "collectedTime":1625240289781,
        "behaviorId":"50026003",
        "qisiuiVersion":"0.2.02",
        "uid":"1924427992",
        "regionCode":"659001",
        "eventName":"mock",
        "vin":"MOCK1TELWMOMZRQAWO",
        "hardwareVersion":"3.03.01.000",
        "carseries":"E115",
        "pdsn":"47556519116431",
        "displayId":"0"
    },
    "gather_data":{
        "key1":"value5",
        "key2":"69",
        "key3":"0"
    }
}
  • 告警规则 common_data.appPackage == 'ltd.qisi.sotasupportapp' 表示common_data.appPackage字段等于’td.qisi.sotasupportapp

7、输出业务告警数据格式设计

1、车机端告警统计格式

CREATE TABLE `flink`.`flink-alert-data` (
     `app_package`          varchar(255)    comment 'app包名'
    ,`collected_time`       bigint(16)      comment '数据'
    ,`behavior_id`          varchar(255)    comment ''
    ,`qisiui_version`       varchar(255)    comment ''
    ,`uid`                  varchar(255)    comment '用户id'
    ,`region_code`          varchar(255)    comment ''
    ,`os_version`           varchar(255)    comment ''
    ,`event_name`           varchar(255)    comment ''
    ,`vin`                  varchar(255)    comment 'vin码'
    ,`hardware_version`     varchar(255)    comment ''
    ,`carseries`            varchar(255)    comment ''
    ,`pdsn`                 varchar(255)    comment ''
    ,`display_id`           varchar(255)    comment '屏幕id[ 0:主控屏;1:副驾屏;2:左后排屏;3:右后排屏;-1:未知]'
    ,`rule_name`            varchar(255)    comment ''
    ,`rule_id`              varchar(255)    comment ''
    ,`rule_exp`             varchar(255)    comment ''
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2、云端告警统计格式

CREATE TABLE `flink`.`flink-cloud-alert-data` (
     `microservice`         varchar(255)    comment ''
    ,`reqPath`              varchar(255)    comment ''
    ,`clicnetIP`            varchar(255)    comment ''
    ,`resultCode`           varchar(255)    comment ''
    ,`createDate`           varchar(255)    comment ''
    ,`ctime`                varchar(255)    comment ''
    ,`rule_name`            varchar(255)    comment ''
    ,`rule_id`              varchar(255)    comment ''
    ,`rule_exp`             varchar(255)    comment ''
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

8、AlertManager告警模块设计

对接AlertManager的模块为ISendService的实现类,通过调用void send(AlertManagerData data)方法把数据发送的出去,其其实类会调用AlertManager的post请求发送json数据,请求的基本格式为:

curl -XPOST http://localhost:9093/api/v1/alerts -d '
[
  {
    "labels": {
       "alertname": "DiskRunningFull",
       "dev": "sda1",
       "instance": "中文测试",
       "route": "WEBHOOK"
     },
     "annotations": {
        "info": "The disk sda1 is running full",
        "summary": "please check the instance example1"
      },
     "Source": {
        "link": "http://www.baidu.com"
      }
  }
]
'

三、提交 yarn 命令

flink任务的执行流程为:

1、mvn打包;

$ mvn clean package 

2、把打包好的jar包上传到hdfs,路径为 /flink/jobs/;

# 删除久的jar
$ hadoop fs -rm /flink/jobs/flink-AlertManager-v1.1.jar
# 上传jar包到hdfs
$ hadoop fs -put ./flink-AlertManager-v1.1.jar /flink/jobs/

3、提交yarn-applicationn

# 提交yarn-applicationn
$ flink run-application -t yarn-applicationn  -Dyarn.application.name="Flink Alert System" hdfs:///flink/jobs/flink-AlertManager-v0.1.jar MysqlAndAlertManagerSink --app.type CLIENT_EVENT

必填参数:

  • 执行的class类 , MysqlAndAlertManagerSink
  • 应用类型 --app.type AppType的枚举值,忽略大小写

执行任务可选的参数:

1、Mysql 可选参数
--mysql.conf.file alert/mysql.properties
--mysql.host sit.dbaas.private
--mysql.port 13500
--mysql.db flink
--mysql.table flink.flink-alert-rule
--mysql.username tsp
--mysql.passwd TspMysql2020!
2、AlertManager 可选参数
--am.route <WEBHOOK、MAIL>
--am.host 10.6.215.39
--am.port 9093
3、Kafka 可选参数
--kafka.sink.conf.file alert/producer.properties
--kafka.source.conf.file alert/kafka.properties
--kafka.sink.topic sentry-sink-topic-test

源代码:

https://gitee.com/zhangjian-eng/flink-real-time-rule-alert

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-26 12:08:54  更:2021-07-26 12:11:05 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 3:12:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码