UNIQUE KEY表模型flink任务启动失败,报checkpoint错误

Viewed 18

同样的配置,从kafka读取数据用flink写Doris,跑DUPLICATE表模型是成功的,但UNIQUE表模型是失败的,信息如下
Doris版本:3.0.2

flink的jar包依赖:
flink-doris-connector-1.17-24.0.1.jar

flink配置:
state.checkpoints.dir: hdfs://192.168.28.46:9820/flink-checkpoints
state.savepoints.dir: hdfs://192.168.28.46:9820/flink-savepoints

报错:

2024-11-20 17:32:17,388 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1732095137386
2024-11-20 17:32:17,389 INFO  org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - Starting the KafkaSourceEnumerator for consumer group doris_group_id_p without periodic partition discovery.
2024-11-20 17:32:17,507 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: market_mo_track_event_game_recharge_source[1] -> Calc[2] -> market_mo_track_event_game_recharge_all[3]: Writer -> market_mo_track_event_game_recharge_all[3]: Committer (1/1) (397055d1a1d4bf74f9f51e11edfdee97_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from SCHEDULED to DEPLOYING.
2024-11-20 17:32:17,507 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Source: market_mo_track_event_game_recharge_source[1] -> Calc[2] -> market_mo_track_event_game_recharge_all[3]: Writer -> market_mo_track_event_game_recharge_all[3]: Committer (1/1) (attempt #0) with attempt id 397055d1a1d4bf74f9f51e11edfdee97_cbc357ccb763df2852fee8c4fc7d55f2_0_0 and vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to 192.168.28.38:22355-b004fd @ cmagent028038.xiyouapi.com (dataPort=19129) with allocation id 4de95194ae1bd95774da888f778460d0
2024-11-20 17:32:17,833 INFO  org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - Discovered new partitions: [MoTrackEventGameRecharge-0]
2024-11-20 17:32:18,198 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: market_mo_track_event_game_recharge_source[1] -> Calc[2] -> market_mo_track_event_game_recharge_all[3]: Writer -> market_mo_track_event_game_recharge_all[3]: Committer (1/1) (397055d1a1d4bf74f9f51e11edfdee97_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to INITIALIZING.
2024-11-20 17:32:19,668 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source Source: market_mo_track_event_game_recharge_source[1] registering reader for parallel task 0 (#0) @ 192.168.28.38
2024-11-20 17:32:19,668 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: market_mo_track_event_game_recharge_source[1] -> Calc[2] -> market_mo_track_event_game_recharge_all[3]: Writer -> market_mo_track_event_game_recharge_all[3]: Committer (1/1) (397055d1a1d4bf74f9f51e11edfdee97_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to RUNNING.
2024-11-20 17:32:19,672 INFO  org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - Assigning splits to readers {0=[[Partition: MoTrackEventGameRecharge-0, StartingOffset: -3, StoppingOffset: -9223372036854775808]]}
2024-11-20 17:32:23,714 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1732095143690 for job 059ee0951cba095e2399ad7baec0451e.


2024-11-20 17:37:17,873 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient [] - [AdminClient clientId=doris_group_id_p-enumerator-admin-client] Node -3 disconnected.
2024-11-20 17:37:23,715 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 1 of job 059ee0951cba095e2399ad7baec0451e expired before completing.
2024-11-20 17:37:23,722 WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job 059ee0951cba095e2399ad7baec0451e. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2216) [flink-dist-1.17.1.jar:1.17.1]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_162]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_162]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_162]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_162]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_162]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_162]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162]
2024-11-20 17:37:23,742 INFO  org.apache.flink.runtime.checkpoint.CheckpointRequestDecider [] - checkpoint request time in queue: 290053
2024-11-20 17:37:23,751 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. The latest checkpoint failed due to Checkpoint expired before completing., view the Checkpoint History tab or the Job Manager log to find out why continuous checkpoints failed.
	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:212) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:169) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:122) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2155) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2134) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$700(CheckpointCoordinator.java:101) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2216) ~[flink-dist-1.17.1.jar:1.17.1]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_162]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_162]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_162]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_162]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_162]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_162]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_162]
2024-11-20 17:37:23,759 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - 1 tasks will be restarted to recover from a global failure.
2024-11-20 17:37:23,759 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job market_mo_track_event_game_recharge_all (059ee0951cba095e2399ad7baec0451e) switched from state RUNNING to RESTARTING.
2024-11-20 17:37:23,771 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 0 (#0) of source Source: market_mo_track_event_game_recharge_source[1].
2024-11-20 17:37:23,771 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: market_mo_track_event_game_recharge_source[1] -> Calc[2] -> market_mo_track_event_game_recharge_all[3]: Writer -> market_mo_track_event_game_recharge_all[3]: Committer (1/1) (397055d1a1d4bf74f9f51e11edfdee97_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to CANCELING.
2024-11-20 17:37:23,794 INFO  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger checkpoint for job 059ee0951cba095e2399ad7baec0451e since Checkpoint triggering task Source: market_mo_track_event_game_recharge_source[1] -> Calc[2] -> market_mo_track_event_game_recharge_all[3]: Writer -> market_mo_track_event_game_recharge_all[3]: Committer (1/1) of job 059ee0951cba095e2399ad7baec0451e is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running..

2 Answers

感谢,Flink的sink配置有点问题,label-prefix不能用变换的值,我换成静态的就可以了,

出问题的label-prefix:

  'sink.properties.format' = 'json',                                  \n \
  'sink.properties.read_json_by_line' = 'true',                       \n \
  'sink.properties.partial_columns' = 'true',     -- 开启部分列更新      \n \
  'sink.label-prefix' = '${flink.task.name}_${yyyyMMddHHmmss}'        \n \

解决后的label-prefix:

  'sink.properties.format' = 'json',                                  \n \
  'sink.properties.read_json_by_line' = 'true',                       \n \
  'sink.properties.partial_columns' = 'true',     -- 开启部分列更新      \n \
  'sink.label-prefix' = '${flink.task.name}'        \n \

修改后能写入了,其中${}中的值是自己业务代码中的逻辑FlinkSQL是不认识的

这个看着是ckp 异常导致的,ckp 设置了多长时间,排查下是不是ckp 超时了,导致 Checkpoint 数据快照处理时间超过了 execution.checkpointing.timeout 的配置。