json_unquote函数报错问题

Viewed 10

原版本:Version : doris-2.0.10
升级版本:Version : doris-2.1.5-rc02

版本升级后在表中新增了两个字段callercallid和calleecallid,varchar(64)
创建routine load任务,任务中通过json_unquote去除字符型数据中带的双引号
image.png

问题:
刚启动时,可以正常写库,但是一段时间后会自动停止消费,routine load任务正常,始终是running,停止消费后网卡流量激增,fe.log日志有报错
Invalid JSON text in argument 1 to function json_unquote: "21d848ea63616c6c00001810@47.100.xx.xx"""
"21d848ea63616c6c00001810@47.100.xx.xx",是新增的字段对应的值,看日志都是这个新增字段的
其他的之前加的字段,就不报这个问题。
image.png

附job:

CREATE ROUTINE LOAD vos.ROUTINE_vos5000_idc_e_cdr_vos_dwd ON e_cdr_vos_dwd
    COLUMNS TERMINATED BY ",",
    COLUMNS(
        callere164,
        calleraccesse164,
        calleee164,
        calleeaccesse164,
        callerip,
        callergatewayid,
        callertogatewaye164,
        calleeip,
        calleegatewayid,
        calleetogatewaye164,
        starttime,
        stoptime,
        callerpdd,
        calleepdd,
        holdtime,
        callerareacode,
        feetime,
        fee,
        customeraccount,
        customername,
        calleeareacode,
        agentfee,
        agentaccount,
        agentname,
				callercallid,
				calleecallid,
        enddirection,
        endreason,
        calleeaccesse164_11 = CASE 
            WHEN LENGTH(right(json_unquote(calleeaccesse164),11)) <= 11 
            THEN right(json_unquote(calleeaccesse164),11)
            ELSE left(json_unquote(calleeaccesse164),3) 
        END,
        callertogatewaye164_11 = CASE 
            WHEN LENGTH(right(json_unquote(callertogatewaye164),11)) <= 11 
            THEN right(json_unquote(callertogatewaye164),11)
            ELSE left(json_unquote(callertogatewaye164),3) 
        END,
        starttime_DATETIME = FROM_UNIXTIME(starttime/1000),
        stoptime_DATETIME = FROM_UNIXTIME(stoptime/1000),
        data_source = 'vos5000_idc',
				callere164=json_unquote(callere164),
				calleraccesse164=json_unquote(calleraccesse164),
				calleee164=json_unquote(calleee164),
				calleeaccesse164=json_unquote(calleeaccesse164),
				callerip=json_unquote(callerip),
				callergatewayid=json_unquote(callergatewayid),
				callertogatewaye164=json_unquote(callertogatewaye164),
				calleeip=json_unquote(calleeip),
				calleegatewayid=json_unquote(calleegatewayid),
				calleetogatewaye164=json_unquote(calleetogatewaye164),
				callerareacode=json_unquote(callerareacode),
				customeraccount=json_unquote(customeraccount),
				customername=json_unquote(customername),
				calleeareacode=json_unquote(calleeareacode),
				agentaccount=json_unquote(agentaccount),
				agentname=json_unquote(agentname),
				callercallid=json_unquote(callercallid),
				calleecallid=json_unquote(calleecallid)
    )
PROPERTIES
(
    "desired_concurrent_number" = "3",
    "strict_mode" = "false"
		
)
FROM KAFKA
(
    "kafka_broker_list" = "mdw:9092,sdw1:9092,sdw2:9092",
    "kafka_topic" = "vos5000_idc2doris",
    "property.group.id" = "vos5000_idc2flume2kafka2doris",
    "property.csv.disable-quote-character" = "TRUE",
    "property.kafka_default_offsets" = "OFFSET_END",
    "property.enable.auto.commit" = "false"
);

1 Answers

已经和用户建联,内部跟进中