【已解决】routine load 不可用

Viewed 182

['default_cluster:xkf_rw'@9030][14:48:49][xkf]> SHOW ROUTINE LOAD for xkf.job_rpt_xxx_test\G
*************************** 1. row ***************************
Id: 249167
Name: job_rpt_xxx_test
CreateTime: 2024-03-21 14:48:48
PauseTime: 2024-03-21 14:48:49
EndTime: NULL
DbName: default_cluster:xkf
TableName: rpt_xxx_test
IsMultiTable: false
State: PAUSED
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {"max_batch_rows":"200000","timezone":"Asia/Shanghai","send_batch_parallelism":"1","load_to_single_tablet":"false","current_concurrent_number":"3","delete":"","partial_columns":"false","merge_type":"APPEND","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","max_batch_interval":"5","max_batch_size":"209715200","fuzzy_parse":"false","partitions":"","columnToColumnExpr":"callid,workstarttime,tenantid","whereExpr":"","desired_concurrent_number":"3","precedingFilter":"","format":"json","max_error_number":"10000","max_filter_ratio":"1.0","json_root":"","strip_outer_array":"false","num_as_string":"false"}DataSourceProperties: {"topic":"rpt_xxx_detail","currentKafkaPartitions":"0,1,2","brokerList":"xxxx:32020,xxx:32021,xxx:32188"}
CustomProperties: {"security.protocol":"SASL_PLAINTEXT","sasl.username":"xkf-cc-test","sasl.mechanism":"SCRAM-SHA-256","group.id":"group1","sasl.password":"xxx","client.id":"group1"}
Statistic: {"receivedBytes":0,"runningTxns":[40665,40666],"errorRows":0,"committedTaskNum":0,"loadedRows":0,"loadRowsRate":0,"abortedTaskNum":1,"errorRowsAfterResumed":0,"totalRows":0,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":1}
Progress: {"0":"OFFSET_BEGINNING","1":"OFFSET_BEGINNING","2":"OFFSET_BEGINNING"}
Lag: {"0":1017,"1":1016,"2":1016}
ReasonOfStateChanged: ErrorReason{code=errCode = 104, msg='be 10072 abort task, task id: 9a21465c-23a4-4355-b59f-e954e82c5fc3 job id: 249167 with reason: [INTERNAL_ERROR]fetch failed due to requested offset not available on the broker: Broker: Offset out of range (broker 1)

    0#  doris::KafkaDataConsumer::group_consume(doris::BlockingQueue<RdKafka::Message*>*, long) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:356
    1#  doris::KafkaDataConsumerGroup::actual_consume(std::shared_ptr<doris::DataConsumer>, doris::BlockingQueue<RdKafka::Message*>*, long, std::function<void (doris::Status const&)>) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:701
    2#  std::_Function_handler<void (), std::_Bind_result<void, void (doris::KafkaDataConsumerGroup::*(doris::KafkaDataConsumerGroup*, std::shared_ptr<doris::DataConsumer>, doris::BlockingQueue<RdKafka::Message*>*, long, doris::KafkaDataConsumerGroup::start_all(std::shared_ptr<doris::StreamLoadContext>, std::shared_ptr<doris::io::KafkaConsumerPipe>)::$_0))(std::shared_ptr<doris::DataConsumer>, doris::BlockingQueue<RdKafka::Message*>*, long, std::function<void (doris::Status const&)>)> >::_M_invoke(std::_Any_data const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:244
    3#  doris::WorkThreadPool<true>::work_thread(int) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/atomic_base.h:646
    4#  execute_native_thread_routine at /data/gcc-11.1.0/build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/unique_ptr.h:85
    5#  ?
    6#  clone

the offset used by job does not exist in kafka, please check the offset, using the Alter ROUTINE LOAD command to modify it, and resume the job'}
ErrorLogUrls:
OtherMsg:
User: default_cluster:xkf_rw
Comment: 2024年3月6日测试
1 row in set (0.00 sec)

2 Answers

参考评论回答:用alter routine load 指定一下时间可以解决。

Doris的routine load 在第一次创建时无法指定时间,只能通过alter routine load 指定时间

可以参考下Q11: Doris FAQ

出现这个问题是因为 kafka 的清理策略默认为7天,当某个 routine load 任务因为某种原因导致任务暂停,长时间没有恢复,当重新恢复任务的时候 routine load 记录了消费的 offset,而 kafka 的清理策略已经清理了对应的offset,就会出现这个问题

所以这个问题可以用 alter routine load 解决方式:

查看 kafka 最小的offset,使用 ALTER ROUTINE LOAD 命令修改 offset,重新恢复任务即可