【已记录】利用定时调度实时分钟级实时统计的想法和遇到的瓶颈

Viewed 186

需求和背景

在大数据量在实现分钟级的实时统计。
利用异步物化视图实现分钟级的实时统计的帖子: 异步物化视图分组级实时统计的问题
本帖子将提供另一种利用定时调度器实现的思路。 但遇到了性能瓶颈。

实现

先建两张表

CREATE TABLE `source_table` (
  `ge_time` DATETIME NOT NULL COMMENT '事件发生时间',
  `app_id` VARCHAR(192) NOT NULL COMMENT '业务标识'
) ENGINE=OLAP
DUPLICATE KEY(`ge_time`, `app_id`)
COMMENT '源表, 假设会一直的输入数据'
AUTO PARTITION BY RANGE date_trunc(ge_time, 'day')()
DISTRIBUTED BY HASH(`ge_time`, `app_id`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);



CREATE TABLE `target_table` (
  `ge_time` DATETIME NOT NULL COMMENT '事件发生时间',
  `biz_count` int NOT NULL COMMENT '业务统计count,只为演示用'
) ENGINE=OLAP
unique KEY(`ge_time`)
COMMENT '目标表, 源表通过定时调度的数据会输出到这张表'
AUTO PARTITION BY RANGE date_trunc(ge_time, 'day')()
DISTRIBUTED BY HASH(`ge_time`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

创建定时调度任务

-- 一个基于定时调度实现分钟级实时报表的想法.  基于增量的实时统计.
CREATE JOB inct_job ON SCHEDULE EVERY 1 MINUTE STARTS '2024-03-01 00:00:00' ENDS '2030-01-01 00:10:00' 
DO 
INSERT INTO target_table
select
date_trunc(ge_time,'minute') as ge_time,
COUNT(*) as biz_count
from
source_table
where 
-- 减5分钟是考虑到可能有迟到数据. 当然, 5分钟前的迟到数据, 就永远统计不到了. 业务可以接受就可以用这种方法. 
-- 外层的date_trunc是将窗口时间的最前端扩展到所在时间格的最前. 防止最前面的时间格,统计了不到1分钟,并把该时间格的1分钟的正确统计覆盖成了不到1分钟的错误统计. 
-- 目标表target_table只能是unqiue模型. 这样可以做到对历史数据的正确覆盖
ge_time > (select date_trunc(minutes_add(max(ge_time),-5),'minute') from target_table)
group by date_trunc(ge_time,'minute');

说明

1: 此种方法相对于异步物化视图(最小刷新粒度是分区),刷新的数据会更少,只需要考虑迟到数据的范围。
2:此种方法只适合【增量】和【追加】型的日志。或者只在窗口时间范围内有数据修改的场景。
3: 窗口时间外的数据, 可以【小时】或【天】为间隔进行一次更大时间范围的调度。 使数据最终一致。

该思路遇到的性能瓶颈

上述的思路依赖于一个点:目标表的最大时间作为一个值.用这个值用作基表增量锚点.通过分钟级调度聚合锚点之后的增量数据.

简化过程

先建三张表

CREATE TABLE `source_interface_report` (
  `ge_time` DATETIME NOT NULL COMMENT '事件发生时间',
  `app_id` VARCHAR(192) NOT NULL COMMENT '应用key',
  `interface_code` VARCHAR(192) NOT NULL COMMENT '接口key',
  `result_status` TINYINT NOT NULL COMMENT '查询结果通知,  0:失败;  1:成功',
  `application_name` VARCHAR(192) NOT NULL COMMENT '应用名称'
) ENGINE=OLAP
DUPLICATE KEY(`ge_time`, `app_id`, `interface_code`)
COMMENT '源表'
AUTO PARTITION BY RANGE date_trunc(ge_time, 'day')()
DISTRIBUTED BY HASH(`ge_time`, `app_id`, `interface_code`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);




CREATE TABLE `target_interface_report` (
  `ge_time` DATETIME NOT NULL COMMENT '事件发生时间',
  `app_id` VARCHAR(192) NOT NULL COMMENT '应用key',
  `interface_code` VARCHAR(192) NOT NULL COMMENT '接口key',
  `result_status` TINYINT NOT NULL COMMENT '查询结果通知,  0:失败;  1:成功',
  `application_name` VARCHAR(192) NOT NULL COMMENT '应用名称'
) ENGINE=OLAP
DUPLICATE KEY(`ge_time`, `app_id`, `interface_code`)
COMMENT '目标表'
AUTO PARTITION BY RANGE date_trunc(ge_time, 'day')()
DISTRIBUTED BY HASH(`ge_time`, `app_id`, `interface_code`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);






CREATE TABLE `other_interface_report` (
  `ge_time` DATETIME NOT NULL COMMENT '事件发生时间',
  `app_id` VARCHAR(192) NOT NULL COMMENT '应用key',
  `interface_code` VARCHAR(192) NOT NULL COMMENT '接口key',
  `result_status` TINYINT NOT NULL COMMENT '查询结果通知,  0:失败;  1:成功',
  `application_name` VARCHAR(192) NOT NULL COMMENT '应用名称'
) ENGINE=OLAP
DUPLICATE KEY(`ge_time`, `app_id`, `interface_code`)
COMMENT '目标表'
AUTO PARTITION BY RANGE date_trunc(ge_time, 'day')()
DISTRIBUTED BY HASH(`ge_time`, `app_id`, `interface_code`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

上面的表, 三张表的表结构一样。
source表和target表有6亿数据, 大量的数据有利用发现性能瓶颈。

数据操作

-- 1:毫秒级出结果, 符合预期, 这种查询是表数据结构友好型
select max(ge_time) from target_interface_report;

-- 2:将上一步的结果,作为常量条件。 同样也是毫秒级出结果。
-- 符合预期,这种查询是表数据结构友好型。
select * from source_interface_report 
where
ge_time >= "2024-02-05 23:29:47.0";


-- 3: 
-- 但是: 将上述两步变成子查询.  运行了足足有5秒出结果.
-- 问题: 内外两个查询都是数据结构友好型的, 运行时间不尽人意.
-- 根据前两步的运行时间.  我认为有优化空间. 
select * from source_interface_report where
ge_time >= 
(select max(ge_time) from target_interface_report)


-- 4:
-- 把第3步的逻辑转成insert into select, 941条数据, 插入时间足足有40多秒, 优化空间更大.
insert into other_interface_report
select * from source_interface_report where
ge_time >= 
(select max(ge_time) from target_interface_report)

总结

基于调度器的实时统计。利用目标表的最大时间作为源表的增量数据锚点。 出现了性能问题。为了实时统计,调度器的时候间隔很短, 这就要求每次调度的代价不能太大。

调度器的其他要求

调度器调度任务的成功与否很重要, 因此, 需投递prometheus指标。 方便后续加入监控。

1 Answers

感谢提帖;约一个时间,线上细聊下这块需求。