【已解决】使用flink1.18同步mysql-->doris时发生报错

Viewed 138

flink版本:1.18.1
flink cdc:3.0.1
doris:2.0.10
flink-doris-connector-1.18-1.6.1

flinksql如下:

set execution.checkpointing.mode ='EXACTLY_ONCE' ;
set execution.checkpointing.externalized-checkpoint-retention='DELETE_ON_CANCELLATION' ;
set execution.checkpointing.interval='60s';
set state.checkpoints.dir='hdfs://flinkcluster/checkpoints';
set execution.checkpointing.checkpoints-after-tasks-finish.enabled='true';
SET execution.checkpointing.timeout = '600000';


DROP TABLE IF EXISTS hpt_source_test_wq_w_cnorder;
CREATE TABLE IF NOT EXISTS hpt_source_test_wq_w_cnorder (
    `CNOrderID`      varchar(36) ,
    `CNOrderCode`    varchar(50)           ,
    `ClassID`        varchar(36)                 ,
    `BillingID`      varchar(36)          ,
    `MachineID`      varchar(36)             ,
    `ProjID`         varchar(36)        ,
    `KeyCardID`      varchar(36)        ,
    `MemberID`       varchar(36)        ,
    `SourceID`       varchar(36)        ,
    `OrderByNum`     bigint             ,
    `CustomerType`   varchar(20)        ,
    `ChildrenNum`    bigint             ,
    `OrderSource`    varchar(40)        ,
    `OrderStatus`    varchar(50)                       ,
    `CNStatus`       varchar(50)                       ,
    `OrderAmount`    decimal(18, 2)                    ,
    `FeeAmount`      decimal(18, 2)                    ,
    `PaidAmount`     decimal(18, 2)                    ,
    `UnpaidAmount`   decimal(18, 2)                    ,
    `EnterDate`      TIMESTAMP                         ,
    `ExitDate`       TIMESTAMP                          ,
    `CreateBy`       varchar(50)                       ,
    `CreateID`       varchar(36)                       ,
    `CreateDate`     TIMESTAMP                          ,
    `CheckOutBy`     varchar(50)                       ,
    `CheckOutID`     varchar(36)                       ,
    `CheckOutDate`   TIMESTAMP                          ,
    `Step`           bigint                                 ,
    `Remark`         varchar(2000)                      ,
    `StayCNOrderID`  varchar(36)                        ,
    `CheckRemark`    varchar(2000)                      ,
    `TicketType`     varchar(20)                        ,
    `PackTicketID`   varchar(36)                        ,
    `CardID`         varchar(36)                        ,
    `DiscountAmount` decimal(18, 2)                     ,
    `IsLook`         bigint                             ,
    `VoidRemark`     varchar(2000)                      ,
    `IsGive`         bigint                             ,
    PRIMARY KEY (`CNOrderID`) NOT ENFORCED
) WITH (
  'connector'='mysql-cdc'
  ,'hostname' = '192.168.99.75'
  ,'port' = '3306'
  ,'username' = 'root'
  ,'password' = 'password'
  ,'database-name'='test'
  ,'table-name'='wq_w_cnorder'
  ,'server-id'='8520-8524'
  ,'scan.incremental.close-idle-reader.enabled'='true'
);
 



DROP TABLE IF EXISTS hpt_sink_test_wq_w_cnorder;
CREATE TABLE hpt_sink_test_wq_w_cnorder (
    `CNOrderID`      string                          ,
    `CNOrderCode`    STRING                          ,
    `ClassID`        STRING                          ,
    `BillingID`      STRING                          ,
    `MachineID`      STRING                          ,
    `ProjID`         STRING                          ,
    `KeyCardID`      STRING                           ,
    `MemberID`       STRING                           ,
    `SourceID`       STRING                           ,
    `OrderByNum`     bigint                                ,
    `CustomerType`   STRING                           ,
    `ChildrenNum`    bigint                                ,
    `OrderSource`    STRING                           ,
    `OrderStatus`    STRING                           ,
    `CNStatus`       STRING                           ,
    `OrderAmount`    decimal(18, 2)                        ,
    `FeeAmount`      decimal(18, 2)                        ,
    `PaidAmount`     decimal(18, 2)                        ,
    `UnpaidAmount`   decimal(18, 2)                        ,
    `EnterDate`      TIMESTAMP                             ,
    `ExitDate`       TIMESTAMP                             ,
    `CreateBy`       STRING                          ,
    `CreateID`       STRING                          ,
    `CreateDate`     TIMESTAMP                             ,
    `CheckOutBy`     STRING                           ,
    `CheckOutID`     STRING                           ,
    `CheckOutDate`   TIMESTAMP                             ,
    `Step`           bigint                                ,
    `Remark`         STRING                         ,
    `StayCNOrderID`  STRING                         ,
    `CheckRemark`    STRING                         ,
    `TicketType`     STRING                         ,
    `PackTicketID`   STRING                         ,
    `CardID`         STRING                         ,
    `DiscountAmount` decimal(18, 2)                        ,
    `IsLook`         bigint                               ,
    `VoidRemark`     STRING                        ,
    `IsGive`         bigint                            
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '192.168.98.70:8030',
      'table.identifier' = 'test.wq_w_cnorder',
      'username' = 'root',
      'password' = 'password',
      'sink.label-prefix' = 'test_wq_w_cnorder_1'
      -- 'sink.properties.format' = 'json',
      -- 'sink.properties.read_json_by_line' = 'true',
);


insert into hpt_sink_test_wq_w_cnorder 
select * from hpt_source_test_wq_w_cnorder;

报错日志如下:

2024-06-04 15:52:24
org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=4, backoffTimeMS=10000)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
	at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
	at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.lang.Exception: Could not perform checkpoint 3 for operator Source: hpt_source_test_wq_w_cnorder[55] -> DropUpdateBefore[56] -> ConstraintEnforcer[57] -> hpt_sink_test_wq_w_cnorder[57]: Writer -> hpt_sink_test_wq_w_cnorder[57]: Committer (1/2)#2.
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$14(StreamTask.java:1150)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: tabel {} stream load error: zhwq_dw.wq_w_cnorder, see more in [INTERNAL_ERROR]cancelled: [INTERNAL_ERROR]too many filtered rows

	0#  std::_Function_handler<void (doris::RuntimeState*, doris::Status*), doris::StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<doris::StreamLoadContext>)::$_0>::_M_invoke(std::_Any_data const&, doris::RuntimeState*&&, doris::Status*&&) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:357
	1#  doris::FragmentMgr::_exec_actual(std::shared_ptr<doris::FragmentExecState>, std::function<void (doris::RuntimeState*, doris::Status*)> const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:360
	2#  std::_Function_handler<void (), doris::FragmentMgr::exec_plan_fragment(doris::TExecPlanFragmentParams const&, std::function<void (doris::RuntimeState*, doris::Status*)> const&)::$_0>::_M_invoke(std::_Any_data const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:701
	3#  doris::ThreadPool::dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
	4#  doris::Thread::supervise_thread(void*) at /var/local/ldb_toolchain/bin/../usr/include/pthread.h:562
	5#  start_thread
	6#  __GI___clone


	0#  doris::io::StreamLoadPipe::_append(std::shared_ptr<doris::ByteBuffer> const&, unsigned long) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:0
	1#  doris::io::StreamLoadPipe::append(std::shared_ptr<doris::ByteBuffer> const&) at /home/zcp/repo_center/doris_release/doris/be/src/io/fs/stream_load_pipe.cpp:162
	2#  doris::StreamLoadAction::on_chunk_data(doris::HttpRequest*) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:446
	3#  ?
	4#  bufferevent_run_readcb_
	5#  ?
	6#  ?
	7#  ?
	8#  ?
	9#  std::_Function_handler<void (), doris::EvHttpServer::start()::$_0>::_M_invoke(std::_Any_data const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/ext/atomicity.h:98
	10# doris::ThreadPool::dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
	11# doris::Thread::supervise_thread(void*) at /var/local/ldb_toolchain/bin/../usr/include/pthread.h:562
	12# start_thread
	13# __GI___clone

	at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:205)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:199)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:169)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:322)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java:1318)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1306)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1191)
	... 14 more

把flinksql相关配置修改为如下后

set execution.checkpointing.mode ='' ;
set execution.checkpointing.externalized-checkpoint-retention='' ;
set state.checkpoints.dir='';
.......

报错日志为:

2024-06-04 16:09:35
org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=4, backoffTimeMS=10000)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
	at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
	at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
	at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
	at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.lang.Exception: Could not perform checkpoint 3 for operator Source: hpt_source_test_wq_w_cnorder[61] -> DropUpdateBefore[62] -> ConstraintEnforcer[63] -> hpt_sink_test_wq_w_cnorder[63]: Writer -> hpt_sink_test_wq_w_cnorder[63]: Committer (2/2)#1.
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$14(StreamTask.java:1150)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: tabel {} stream load error: zhwq_dw.wq_w_cnorder, see more in [INTERNAL_ERROR]cancelled: closed

	0#  doris::io::StreamLoadPipe::_append(std::shared_ptr<doris::ByteBuffer> const&, unsigned long) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:0
	1#  doris::io::StreamLoadPipe::append(std::shared_ptr<doris::ByteBuffer> const&) at /home/zcp/repo_center/doris_release/doris/be/src/io/fs/stream_load_pipe.cpp:162
	2#  doris::StreamLoadAction::on_chunk_data(doris::HttpRequest*) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:446
	3#  ?
	4#  bufferevent_run_readcb_
	5#  ?
	6#  ?
	7#  ?
	8#  ?
	9#  std::_Function_handler<void (), doris::EvHttpServer::start()::$_0>::_M_invoke(std::_Any_data const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/ext/atomicity.h:98
	10# doris::ThreadPool::dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
	11# doris::Thread::supervise_thread(void*) at /var/local/ldb_toolchain/bin/../usr/include/pthread.h:562
	12# start_thread
	13# __clone

	at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:205)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:199)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:169)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:322)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$15(StreamTask.java:1318)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1306)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1191)
	... 14 more

补充日志:

2024-06-04 17:30:37,612 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline checkpoint 3 by task 140ecb65b7a48152a2cdeabf48a31b99_cbc357ccb763df2852fee8c4fc7d55f2_1_2 of job a0f727e754e6cc2e5c99afcaf46f2e2b at container_e06_1717380686578_0001_02_000101 @ real-time-computing-engine03-98-38 (dataPort=34327).
org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: hpt_source_test_wq_w_cnorderconsumption[73] -> DropUpdateBefore[74] -> ConstraintEnforcer[75] -> hpt_sink_test_wq_w_cnorderconsumption[75]: Writer -> hpt_sink_test_wq_w_cnorderconsumption[75]: Committer (2/2)#2 Failure reason: Task has failed.
	at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1396) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1339) ~[flink-dist-1.18.1.jar:1.18.1]
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) ~[?:1.8.0_392]
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) ~[?:1.8.0_392]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_392]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_392]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$14(StreamTask.java:1159) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.18.1.jar:1.18.1]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_392]
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not perform checkpoint 3 for operator Source: hpt_source_test_wq_w_cnorderconsumption[73] -> DropUpdateBefore[74] -> ConstraintEnforcer[75] -> hpt_sink_test_wq_w_cnorderconsumption[75]: Writer -> hpt_sink_test_wq_w_cnorderconsumption[75]: Committer (2/2)#2.
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1203) ~[flink-dist-1.18.1.jar:1.18.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$14(StreamTask.java:1150) ~[flink-dist-1.18.1.jar:1.18.1]
	... 13 more

发现checkpoint失败
image.png

1 Answers

已经和用户联系,同步一下问题:
从taskmanager 中找见errurl ,看到具体报错,根据提示修改后,任务恢复正常