doris 2.1.7版本,Routine Load 消费kafka 3.2.0获取offset错误

Viewed 43

相关版本

doris版本: 2.1.7
kafka 版本: 3.2.0(kafka_2.13-3.2.0)

问题描述

问题一:
提交routine load 任务时,出现 [INTERNAL_ERROR]failed to get latest offset for partition: 0, err: Local: Unknown partition异常.
be.WARNING错误日志:
image.png
fe.warn.log错误日志:
image.png
创建语句如下:
CREATE ROUTINE LOAD __internal_schema.ods_farm_device_data_moisture_r_test_routine_load_json2
ON ods_farm_device_data_moisture_r_test
COLUMNS(id,device_code,report_time,soil_moisture_one,soil_moisture_two,soil_moisture_three,
soil_ph,soil_temperature_one,soil_temperature_two,soil_temperature_three,temperature,
dew_temperature,soil_electrical_conductivity_one,soil_electrical_conductivity_two,
soil_electrical_conductivity_three,soil_nitrogen,soil_phosphorus,soil_potassium)
PROPERTIES(
"format"="json",
"jsonpaths"="["$.uid","$.deviceCode","$.reportTime","$.soilMoistureOne","$.soilMoistureTwo","$.soilMoistureThree",
"$.soilPh","$.soilTemperatureOne","$.soilTemperatureTwo","$.soilTemperatureThree","$.temperature",
"$.dewTemperature","$.soilElectricalConductivityOne","$.soilElectricalConductivityTwo",
"$.soilElectricalConductivityThree","$.soilNitrogen","$.soilPhosphorus","$.soilPotassium"]"
)
FROM KAFKA(
"kafka_broker_list" = "172.19.10.13:9092",
"kafka_topic" = "HW_PUSH_BIGDATA_TOPTIC_PLANT",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

问题二:
doris版本: 2.1.7
kafka 版本: 0.10.0.0(kafka_2.11-0.10.0.0)
问题描述:
此版本能正常连接kafka,但存在消费积压。任务状态一直是running,be及fe没有任何异常日志。
be.INFO日志:
image.png
fe.log日志:
image.png
image.png
执行SHOW ROUTINE LOAD FOR __internal_schema.ods_farm_device_data_moisture_r_test_routine_load_json2;显示如下:
ID: 11002
Name: ods_farm_device_data_moisture_r_test_routine_load_json2
CreateTime: 2025-01-24 10:14:57
PauseTime:
EndTime:
DbName: __internal_schema
TableName: ods_farm_device_data_moisture_r_test
IsMultiTable: false
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"max_batch_rows":"20000000","timezone":"Asia/Shanghai","send_batch_parallelism":"1","load_to_single_tablet":"false","current_concurrent_number":"1","delete":"","partial_columns":"false","merge_type":"APPEND","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"["$.uid","$.deviceCode","$.reportTime","$.soilMoistureOne","$.soilMoistureTwo","$.soilMoistureThree",\n "$.soilPh","$.soilTemperatureOne","$.soilTemperatureTwo","$.soilTemperatureThree","$.temperature",\n "$.dewTemperature","$.soilElectricalConductivityOne","$.soilElectricalConductivityTwo",\n "$.soilElectricalConductivityThree","$.soilNitrogen","$.soilPhosphorus","$.soilPotassium"]","max_batch_interval":"10","max_batch_size":"1073741824","fuzzy_parse":"false","escape":"0","enclose":"0","partitions":"","columnToColumnExpr":"id,device_code,report_time,soil_moisture_one,soil_moisture_two,soil_moisture_three,soil_ph,soil_temperature_one,soil_temperature_two,soil_temperature_three,temperature,dew_temperature,soil_electrical_conductivity_one,soil_electrical_conductivity_two,soil_electrical_conductivity_three,soil_nitrogen,soil_phosphorus,soil_potassium","whereExpr":"","desired_concurrent_number":"256","precedingFilter":"","format":"json","max_error_number":"0","max_filter_ratio":"1.0","json_root":"","strip_outer_array":"false","num_as_string":"false"}
DataSourceProperties: {"topic":"HW_PUSH_BIGDATA_TOPTIC_PLANT","currentKafkaPartitions":"0","brokerList":"172.19.10.13:9092"}
CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"ods_farm_device_data_moisture_r_test_routine_load_json2_b052689e-f80f-49ca-bd50-0e001dddd005"}
Statistic: {"receivedBytes":0,"runningTxns":[],"errorRows":0,"committedTaskNum":0,"loadedRows":0,"loadRowsRate":0,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":0,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":1}
Progress: {"0":"OFFSET_ZERO"}
Lag: {"0":1}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
User: root
Comment:
image.png
image.png
不知道什么原因,如果把doris版本换成2.0.3是可以正常消费kafka写入doris,但对一流多表存在TOO_MANY_TASK问题。

1 Answers

1、"failed to get latest offset"可能是topic那边位点丢失,可以将routine load位点属性调整对其重跑试试
2、如果有消费堆积问题,且doris资源交空闲,可以调大max_routine_load_task相关的routine load属性:Routine Load导入参数说明