flinksql写入doris频繁任务掉线:stream load error

Viewed 57

报错:
任务呼啦啦跑几个小时。然后不见了。异常日志如下:

    "WriteDataTimeMs": 817,
    "CommitAndPublishTimeMs": 0
}

2024-12-06 21:26:18,824 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2024-12-06 21:26:18,825 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (3/3)#0 (b949322ac8f7eba479b605c3bf9c5c3c) switched from RUNNING to FAILED with failure cause: java.lang.Exception: Could not perform checkpoint 11651 for operator Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (3/3)#0.
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1175)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$12(StreamTask.java:1122)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: stream load error: [ANALYSIS_ERROR]errCode = 2, detailMessage = get tableList write lock timeout, tableList=(Table [id=33371714, name=ods_userlog_formal, type=OLAP]), see more in null
	at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:142)
	at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.prepareSnapshotPreBarrier(SinkOperator.java:170)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:92)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:295)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1163)
	... 13 more

补一段:

 "Message": "[ANALYSIS_ERROR]errCode = 2, detailMessage = get tableList write lock timeout, tableList=(Table [id=33371714, name=ods_userlog_formal, type=OLAP])",
    "NumberTotalRows": 0,
    "NumberLoadedRows": 0,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 0,
    "LoadTimeMs": 34628,
    "BeginTxnTimeMs": 0,
    "StreamLoadPutTimeMs": 0,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 977,
    "CommitAndPublishTimeMs": 0
}

2024-12-06 21:27:28,134 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2024-12-06 21:27:28,134 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (3/3)#1 (666347ee546caafdd30dd372ae75ce73) switched from RUNNING to FAILED with failure cause: java.lang.Exception: Could not perform checkpoint 11654 for operator Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (3/3)#1.
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1175)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$12(StreamTask.java:1122)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: stream load error: [ANALYSIS_ERROR]errCode = 2, detailMessage = get tableList write lock timeout, tableList=(Table [id=33371714, name=ods_userlog_formal, type=OLAP]), see more in null
	at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:142)
	at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.prepareSnapshotPreBarrier(SinkOperator.java:170)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:92)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:295)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1163)
	... 13 more

2024-12-06 21:27:28,134 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (3/3)#1 (666347ee546caafdd30dd372ae75ce73).
2024-12-06 21:27:28,135 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (3/3)#1 666347ee546caafdd30dd372ae75ce73.
2024-12-06 21:27:28,149 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to cancel task Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (1/3)#0 (33516d2f53d60515664bc5bcad5d8790).
2024-12-06 21:27:28,149 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (1/3)#0 (33516d2f53d60515664bc5bcad5d8790) switched from RUNNING to CANCELING.
2024-12-06 21:27:28,149 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Triggering cancellation of task code Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (1/3)#0 (33516d2f53d60515664bc5bcad5d8790).
2024-12-06 21:27:28,151 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to cancel task Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (2/3)#2 (751f694dc263646e6545304c814c1ec5).
2024-12-06 21:27:28,151 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (2/3)#2 (751f694dc263646e6545304c814c1ec5) switched from RUNNING to CANCELING.
2024-12-06 21:27:28,151 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Triggering cancellation of task code Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (2/3)#2 (751f694dc263646e6545304c814c1ec5).
2024-12-06 21:27:28,151 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2024-12-06 21:27:28,151 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (1/3)#0 (33516d2f53d60515664bc5bcad5d8790) switched from CANCELING to CANCELED.
2024-12-06 21:27:28,151 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (1/3)#0 (33516d2f53d60515664bc5bcad5d8790).
2024-12-06 21:27:28,152 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state CANCELED to JobManager for task Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (1/3)#0 33516d2f53d60515664bc5bcad5d8790.
2024-12-06 21:27:28,154 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.

环境:
doris-version:doris-1.2.4-1-Unknown
flink-version:1.14.4
connector:flink-doris-connector-1.14_2.12-1.1.1.jar
sql:
source:

WITH (
    'connector' = 'kafka',
    'topic' = '$TOPIC',
    'scan.startup.mode' = '$SCAN_MODE',
    'properties.bootstrap.servers' = '$KAFKA_BROKER',
    'properties.group.id' = '$SINKTABLE',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
)

sink:

WITH (
  'connector' = 'doris',
  'fenodes' = '$FENODES',
  'table.identifier'= '$SINKDB.$SINKTABLE',
  'username' = '$USER',
  'password' = '$PASSWORD',
  'sink.label-prefix' = '$SINKTABLE$RANNUMBER'
)

insert:

FROM kafka_source where LEFT(data[1].fncode , 6) ='148072'

config:

    val tEnv = StreamTableEnvironment.create(env, settings)
    // 设置 RocksDBStateBackend
    val hdfsPath = "hdfs://node104.data:8020/rocksdb" // HDFS 上的路径
    val stateBackend: StateBackend = new RocksDBStateBackend(hdfsPath, true) // 创建 RocksDBStateBackend 实例
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    env.setStateBackend(stateBackend) // 将 RocksDBStateBackend 设置为状态后端
    env.enableCheckpointing(1000)
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)))
    env.setParallelism(3)
    env.setMaxParallelism(3)
    val configuration = tEnv.getConfig().getConfiguration()
    configuration.setString("table.exec.mini-batch.enabled", "true")
    configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
    configuration.setString("table.exec.mini-batch.size", "5000")
    configuration.setString("table.exec.sink.not-null-enforcer", "drop")
    configuration.setString("taskmanager.numberOfTaskSlots", "4")
    configuration.setString("taskmanager.memory.task.heap.size", "2g") //每个任务管理器的堆内存大小
    configuration.setString("taskmanager.memory.framework.off-heap.size", "2g") //每个任务管理器的堆外内存

2 Answers

写入太快了,调大一下写入的间隔和攒批的数据量。

WITH (
'connector' = 'doris',
'fenodes' = '$FENODES',
'table.identifier'= '$SINKDB.$SINKTABLE',
'username' = '$USER',
'password' = '$PASSWORD',
'sink.enable.batch-mode' = 'true',
'sink.flush.queue-size' = '2',
'sink.buffer-flush.max-rows' = '500000',
'sink.buffer-flush.max-bytes' = '100M',
'sink.buffer-flush.interval' = '10s',
'sink.label-prefix' = '$SINKTABLE$RANNUMBER'
)
image.png

上面的参数不对。参数增加了下面这几个。跑起来了。但是不知道是不是稳定。观察中……

cfa4e609ae26debdb858c8aad70ee1c.png