通过 FlinkCDC 将 postgres 整库同步到 Doris 发生报错

Viewed 111

pg中这表分别是1400W左右的数据,使用以下命令同步不成功,查看任务的CheckPoint发生异常

org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. The latest checkpoint failed due to Checkpoint expired before completing., view the Checkpoint History tab or the Job Manager log to find out why continuous checkpoints failed.

2024-05-28 15:39:07,367 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for aia_t_icc_jjdb: Writer -> aia_t_icc_jjdb: Committer (1/1)#0 (dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0).
2024-05-28 15:39:07,370 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state CANCELED to JobManager for task aia_t_vcs_fkdb: Writer -> aia_t_vcs_fkdb: Committer (1/1)#0 dbec67a546f03f19bae3a56726a02174_1992c3287130e2f49268dbcc909a7d1d_0_0.
2024-05-28 15:39:07,370 INFO org.apache.http.impl.execchain.RetryExec [] - I/O exception (java.net.SocketException) caught when processing request to {}->http://192.168.0.168:8040: Socket closed
2024-05-28 15:39:07,378 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state CANCELED to JobManager for task aia_t_icc_jjdb: Writer -> aia_t_icc_jjdb: Committer (1/1)#0 dbec67a546f03f19bae3a56726a02174_788573959fc6fd87fb6bfd0ffc27d896_0_0.
2024-05-28 15:39:37,363 WARN org.apache.flink.runtime.taskmanager.Task [] - Task 'Source: Postgres Source -> Process (1/1)#0' did not react to cancelling signal - interrupting; it is stuck for 30 seconds in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
org.apache.flink.runtime.taskmanager.Task$$Lambda$813/1723769838.run(Unknown Source)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
java.lang.Thread.run(Thread.java:750)

2024-05-28 15:39:37,372 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping the embedded engine
2024-05-28 15:39:37,373 ERROR com.ververica.cdc.debezium.internal.Handover [] - Reporting error:
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_381]
at com.ververica.cdc.debezium.internal.Handover.produce(Handover.java:115) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at com.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:54) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:83) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:822) [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192) [flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_381]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:39:37,375 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping the task and engine
2024-05-28 15:39:37,375 INFO io.debezium.connector.common.BaseSourceTask [] - Stopping down connector
2024-05-28 15:41:07,376 WARN io.debezium.pipeline.ChangeEventSourceCoordinator [] - Coordinator didn't stop in the expected time, shutting down executor now
2024-05-28 15:41:07,377 WARN io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot was interrupted before completion
2024-05-28 15:41:07,377 INFO io.debezium.pipeline.source.AbstractSnapshotChangeEventSource [] - Snapshot - Final stage
2024-05-28 15:41:07,377 WARN io.debezium.pipeline.ChangeEventSourceCoordinator [] - Change event source executor was interrupted
java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[?:1.8.0_381]
at io.debezium.connector.base.ChangeEventQueue.doEnqueue(ChangeEventQueue.java:204) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:169) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver.changeRecord(EventDispatcher.java:440) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.EventDispatcher$1.changeRecord(EventDispatcher.java:166) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:120) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:57) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEventsForTable(RelationalSnapshotChangeEventSource.java:407) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:316) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:132) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) ~[flink-sql-connector-postgres-cdc-3.0.1.jar:3.0.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_381]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_381]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_381]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_381]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
2024-05-28 15:41:07,378 INFO io.debezium.pipeline.ChangeEventSourceCoordinator [] - Connected metrics set to 'false'
2024-05-28 15:41:07,380 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully closed

image.png

image.png

-Dexecution.checkpointing.interval=10s
-Dparallelism.default=1
--table-prefix
ods_
--database
cdc
--postgres-conf
hostname=192.168.0.1
--postgres-conf
port=5432
--postgres-conf
username=postgres
--postgres-conf
password="db123"
--postgres-conf
database-name=db
--postgres-conf
schema-name=public
--postgres-conf
slot.name=test2
--postgres-conf
decoding.plugin.name=pgoutput
--including-tables
"aia_t_icc_jjdb.*?"
--multi-to-one-origin
"aia_t_icc_jjdb.*?"
--multi-to-one-target
"aia_t_icc_jjdb"
--sink-conf
fenodes=10.115.6.215:8030,10.115.6.216:8030,192.168.0.168:8030
--sink-conf
username=admin
--sink-conf
password=password
--sink-conf
jdbc-url=jdbc:mysql://10.115.6.215:9030
--sink-conf
sink.label-prefix=label
--sink-conf
sink.enable.batch-mode=true
--table-conf
replication_num=3
3 Answers

找到问题了,因为我的postgres有三张表是分区表,共计300多个分区,其中PostgresDialect的queryTableSchema会查询每张表的schema,每次CDC数据进来都去查schema,导致连接超时
image.png

试试将flink-cdc版本换到2.4.2