报错:
任务呼啦啦跑几个小时。然后不见了。异常日志如下:
"WriteDataTimeMs": 817,
"CommitAndPublishTimeMs": 0
}
2024-12-06 21:26:18,824 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2024-12-06 21:26:18,825 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (3/3)#0 (b949322ac8f7eba479b605c3bf9c5c3c) switched from RUNNING to FAILED with failure cause: java.lang.Exception: Could not perform checkpoint 11651 for operator Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (3/3)#0.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1175)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$12(StreamTask.java:1122)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: stream load error: [ANALYSIS_ERROR]errCode = 2, detailMessage = get tableList write lock timeout, tableList=(Table [id=33371714, name=ods_userlog_formal, type=OLAP]), see more in null
at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:142)
at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.prepareSnapshotPreBarrier(SinkOperator.java:170)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:92)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:295)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1163)
... 13 more
补一段:
"Message": "[ANALYSIS_ERROR]errCode = 2, detailMessage = get tableList write lock timeout, tableList=(Table [id=33371714, name=ods_userlog_formal, type=OLAP])",
"NumberTotalRows": 0,
"NumberLoadedRows": 0,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 0,
"LoadTimeMs": 34628,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 0,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 977,
"CommitAndPublishTimeMs": 0
}
2024-12-06 21:27:28,134 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2024-12-06 21:27:28,134 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (3/3)#1 (666347ee546caafdd30dd372ae75ce73) switched from RUNNING to FAILED with failure cause: java.lang.Exception: Could not perform checkpoint 11654 for operator Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (3/3)#1.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1175)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$12(StreamTask.java:1122)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: stream load error: [ANALYSIS_ERROR]errCode = 2, detailMessage = get tableList write lock timeout, tableList=(Table [id=33371714, name=ods_userlog_formal, type=OLAP]), see more in null
at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:142)
at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.prepareSnapshotPreBarrier(SinkOperator.java:170)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:92)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:295)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1163)
... 13 more
2024-12-06 21:27:28,134 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (3/3)#1 (666347ee546caafdd30dd372ae75ce73).
2024-12-06 21:27:28,135 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (3/3)#1 666347ee546caafdd30dd372ae75ce73.
2024-12-06 21:27:28,149 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (1/3)#0 (33516d2f53d60515664bc5bcad5d8790).
2024-12-06 21:27:28,149 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (1/3)#0 (33516d2f53d60515664bc5bcad5d8790) switched from RUNNING to CANCELING.
2024-12-06 21:27:28,149 INFO org.apache.flink.runtime.taskmanager.Task [] - Triggering cancellation of task code Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (1/3)#0 (33516d2f53d60515664bc5bcad5d8790).
2024-12-06 21:27:28,151 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (2/3)#2 (751f694dc263646e6545304c814c1ec5).
2024-12-06 21:27:28,151 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (2/3)#2 (751f694dc263646e6545304c814c1ec5) switched from RUNNING to CANCELING.
2024-12-06 21:27:28,151 INFO org.apache.flink.runtime.taskmanager.Task [] - Triggering cancellation of task code Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (2/3)#2 (751f694dc263646e6545304c814c1ec5).
2024-12-06 21:27:28,151 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2024-12-06 21:27:28,151 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (1/3)#0 (33516d2f53d60515664bc5bcad5d8790) switched from CANCELING to CANCELED.
2024-12-06 21:27:28,151 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (1/3)#0 (33516d2f53d60515664bc5bcad5d8790).
2024-12-06 21:27:28,152 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state CANCELED to JobManager for task Source: KafkaSource-default_catalog.default_database.kafka_source -> MiniBatchAssigner(interval=[5000ms], mode=[ProcTime]) -> Calc(select=[ITEM(data, 1).logid AS logid, ITEM(data, 1).app_name AS app_name, ITEM(data, 1).deviceId AS device_id, ITEM(data, 1).fncode AS fncode, ITEM(data, 1).fnname AS fnname, ITEM(data, 1).fngroup AS fngroup, ITEM(data, 1).ver AS ver, ITEM(data, 1).debug AS debug, ITEM(data, 1).trigertime AS trigertime, ITEM(data, 1).receivetime AS receivetime, ITEM(data, 1).ip AS ip, ITEM(data, 1).env AS env, ITEM(data, 1).tenantid AS tenant_id, ITEM(data, 1).user_id AS user_id, ITEM(data, 1).query AS query, LEFT(ITEM(data, 1).fncode, 6) AS pcode, DATE_FORMAT(FROM_UNIXTIME((CAST(ITEM(data, 1).receivetime) / 1000)), _UTF-16LE'yyyy-MM-dd') AS pdate], where=[(LEFT(ITEM(data, 1).fncode, 6) = _UTF-16LE'148072':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> Sink Sink(table=[default_catalog.default_database.table_sink], fields=[logid, app_name, device_id, fncode, fnname, fngroup, ver, debug, trigertime, receivetime, ip, env, tenant_id, user_id, query, pcode, pdate]) (1/3)#0 33516d2f53d60515664bc5bcad5d8790.
2024-12-06 21:27:28,154 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
环境:
doris-version:doris-1.2.4-1-Unknown
flink-version:1.14.4
connector:flink-doris-connector-1.14_2.12-1.1.1.jar
sql:
source:
WITH (
'connector' = 'kafka',
'topic' = '$TOPIC',
'scan.startup.mode' = '$SCAN_MODE',
'properties.bootstrap.servers' = '$KAFKA_BROKER',
'properties.group.id' = '$SINKTABLE',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
sink:
WITH (
'connector' = 'doris',
'fenodes' = '$FENODES',
'table.identifier'= '$SINKDB.$SINKTABLE',
'username' = '$USER',
'password' = '$PASSWORD',
'sink.label-prefix' = '$SINKTABLE$RANNUMBER'
)
insert:
FROM kafka_source where LEFT(data[1].fncode , 6) ='148072'
config:
val tEnv = StreamTableEnvironment.create(env, settings)
// 设置 RocksDBStateBackend
val hdfsPath = "hdfs://node104.data:8020/rocksdb" // HDFS 上的路径
val stateBackend: StateBackend = new RocksDBStateBackend(hdfsPath, true) // 创建 RocksDBStateBackend 实例
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setStateBackend(stateBackend) // 将 RocksDBStateBackend 设置为状态后端
env.enableCheckpointing(1000)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)))
env.setParallelism(3)
env.setMaxParallelism(3)
val configuration = tEnv.getConfig().getConfiguration()
configuration.setString("table.exec.mini-batch.enabled", "true")
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
configuration.setString("table.exec.mini-batch.size", "5000")
configuration.setString("table.exec.sink.not-null-enforcer", "drop")
configuration.setString("taskmanager.numberOfTaskSlots", "4")
configuration.setString("taskmanager.memory.task.heap.size", "2g") //每个任务管理器的堆内存大小
configuration.setString("taskmanager.memory.framework.off-heap.size", "2g") //每个任务管理器的堆外内存