创建ROUTINE LOAD任务,接入kafka数据发现有丢数据的情况

Viewed 31
  • doris 版本

  • Git : git://VM-0-117-ubuntu@22edcabbb007e8878649f071b67b3cc1f5b010a2
    Version : doris-2.0.11
    BuildInfo : VM-0-117-ubuntu
    BuildTime : Sun, 02 Jun 2024 10:11:07 CST

  • ROUTINE LOAD任务创建语句

CREATE ROUTINE LOAD phv_order_create ON rt_order
WITH APPEND
COLUMNS(temp_sys_order_time,ser_id,order_id,agg_ser_id,agg_order_id,unit_id,agg_unit_id,is_agg_order,is_agg_upload=FALSE,row_key,temp_create_order_time,order_type,create_dep_address,create_det_address,sys_order_time=from_millisecond(`temp_sys_order_time`),create_order_time=from_millisecond(`temp_create_order_time`),is_create=TRUE),
WHERE `sys_order_time` > '2023-07-01 00:00:00.000'
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_error_number" = "0",
"max_filter_ratio" = "1.0",
"max_batch_interval" = "20",
"max_batch_rows" = "2000000",
"max_batch_size" = "536870912",
"format" = "json",
"partial_columns" = "true",
"jsonpaths" = "["$.sysOrderTime","$.serId","$.orderId","$.aggSerId","$.aggOrderId","$.unitId","$.aggUnitId","$.isAggOrder","$.rowKey","$.createOrderTime","$.orderType","$.createDepAddress","$.createDetAddress"]",
"strip_outer_array" = "false",
"num_as_string" = "false",
"fuzzy_parse" = "false",
"strict_mode" = "false",
"timezone" = "Asia/Shanghai",
"exec_mem_limit" = "2147483648"
)
FROM KAFKA
(
"kafka_broker_list" = "192.168.42.11:9092",
"kafka_topic" = "cal_rt_order_create",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.group.id" = "doris_job",
"kafka_partitions" = "0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11",
"kafka_offsets" = "2292766694, 2209791592, 2202989032, 2284481319, 2199431053, 2202433210, 331643455, 331527026, 332185064, 330962972, 332266436, 332041637"
);
  • 任务都是正常运行的,但是通过show routine load 可是看到OtherMsg有告警信息

image.png

  • 我们通过ROUTINE LOAD kafka接入的是订单数据,通过订单id进行部分列更新,一共有多个状态 创建、取消、支付等,现在有一条订单数据其他状态都有值,但是没有创建的状态,及创建的topic这条数据没有进doris,丢掉了,但是通过kafka-stream程序接入到hbase的数据这个订单id的创建状态的数据是存在的,我该如何排查这个问题

  • be节点的一些异常日志,cat be.INFO | grep 'kafka consume timeout' 存在大量超时日志,但是我们kafka-stream程序连接一直都是正常的

I1224 09:50:55.937489 3216216 data_consumer.cpp:238] kafka consume timeout: c346f561a95b3bc2-2ee04d1491dae491
I1224 09:50:56.511955 3216026 data_consumer.cpp:238] kafka consume timeout: ff479e6b802ab3f3-ef2bc82b6a70bfb3
I1224 09:50:56.514799 3216083 data_consumer.cpp:238] kafka consume timeout: 68450aada94fd49a-78b3393480c576af
I1224 09:50:56.516703 3216082 data_consumer.cpp:238] kafka consume timeout: c743218046466fb2-d69c65bd5c215a93
I1224 09:50:56.545652 3216159 data_consumer.cpp:238] kafka consume timeout: 2345f860382c4637-47d1e18d862ad288
I1224 09:50:56.545842 3216160 data_consumer.cpp:238] kafka consume timeout: 304449d6a6734f4e-016453b83426bc8b
I1224 09:50:56.840989 3216027 data_consumer.cpp:238] kafka consume timeout: d74b827420f326c3-632de9dd5d120490
I1224 09:50:57.014087 3216122 data_consumer.cpp:238] kafka consume timeout: a94618eb623f71a1-1468fedd62f5c5ad
I1224 09:50:57.592698 3216216 data_consumer.cpp:238] kafka consume timeout: c346f561a95b3bc2-2ee04d1491dae491
I1224 09:50:57.618567 3216217 data_consumer.cpp:238] kafka consume timeout: 5542d171ef4981c0-ec0384917a2199ac
I1224 09:50:57.703253 3216121 data_consumer.cpp:238] kafka consume timeout: 394db4aaaf1997bd-712de0937980d384
I1224 09:50:58.592820 3216216 data_consumer.cpp:238] kafka consume timeout: c346f561a95b3bc2-2ee04d1491dae491
I1224 09:50:58.618686 3216217 data_consumer.cpp:238] kafka consume timeout: 5542d171ef4981c0-ec0384917a2199ac
I1224 09:50:59.214872 3216026 data_consumer.cpp:238] kafka consume timeout: ff479e6b802ab3f3-ef2bc82b6a70bfb3
I1224 09:50:59.252770 3216082 data_consumer.cpp:238] kafka consume timeout: c743218046466fb2-d69c65bd5c215a93
I1224 09:51:00.280136 3216083 data_consumer.cpp:238] kafka consume timeout: 68450aada94fd49a-78b3393480c576af
I1224 09:51:00.781975 3216027 data_consumer.cpp:238] kafka consume timeout: d74b827420f326c3-632de9dd5d120490
I1224 09:51:00.852222 3216082 data_consumer.cpp:238] kafka consume timeout: c743218046466fb2-d69c65bd5c215a93
I1224 09:51:01.481241 3216217 data_consumer.cpp:238] kafka consume timeout: 5542d171ef4981c0-ec0384917a2199ac
I1224 09:51:01.488682 3216026 data_consumer.cpp:238] kafka consume timeout: ff479e6b802ab3f3-ef2bc82b6a70bfb3
I1224 09:51:01.489279 3216216 data_consumer.cpp:238] kafka consume timeout: c346f561a95b3bc2-2ee04d1491dae491
I1224 09:51:01.509366 3216083 data_consumer.cpp:238] kafka consume timeout: 68450aada94fd49a-78b3393480c576af
I1224 09:51:01.785630 3216027 data_consumer.cpp:238] kafka consume timeout: d74b827420f326c3-632de9dd5d120490
I1224 09:51:01.854940 3216082 data_consumer.cpp:238] kafka consume timeout: c743218046466fb2-d69c65bd5c215a93
I1224 09:51:02.509456 3216083 data_consumer.cpp:238] kafka consume timeout: 68450aada94fd49a-78b3393480c576af
I1224 09:51:02.659253 3216350 data_consumer.cpp:238] kafka consume timeout: 6541e05a1b7f97bb-688e2b032bb4faa7
I1224 09:51:02.698505 3216351 data_consumer.cpp:238] kafka consume timeout: 5d4cd12be046dd6a-de6bd1609d5a54be
1 Answers

可以从这个方向来排查:
创建一个同样结构的duplicate表,通过routine load往里导数,如果duplicate表也没有这条数据说明数据可能被过滤了,可以检查一下数据质量问题,是否被过滤了。如果数据存在,可能是数据的先后顺序不一致,这条数据被覆盖了,可以用sequence列来解决这个问题。