【已记录】使用flinkcdc同步pgsql到doris,pgsql表如果分区了,flinkcdc就读不到

Viewed 85

image.png
image.png
root@mapuni-05:/home/flink-1.19.0# ./bin/flink run -Dexecution.checkpointing.interval=10s -Dparallelism.default=1 -c org.apache.doris.flink.tools.cdc.CdcTools ./lib/flink-doris-connector-1.19-1.6.0.jar postgres-sync-database --database yutu_datacenter --postgres-conf hostname=192.168.6.15 --postgres-conf port=5432 --postgres-conf username=postgres --postgres-conf password="yutu@615Security" --postgres-conf database-name=yutu_datacenter --postgres-conf schema-name=public --postgres-conf slot.name=test3 --postgres-conf scan.incremental.snapshot.enabled=true --postgres-conf decoding.plugin.name=pgoutput --including-tables "dc_dim_company_pollution" --sink-conf fenodes=192.168.6.11:8030 --sink-conf username=root --sink-conf password=yutu@611Security --sink-conf jdbc-url=jdbc:mysql://192.168.6.11:9030 --sink-conf sink.label-prefix=label --table-conf replication_num=1
Input args: [postgres-sync-database, --database, yutu_datacenter, --postgres-conf, hostname=192.168.6.15, --postgres-conf, port=5432, --postgres-conf, username=postgres, --postgres-conf, password=yutu@615Security, --postgres-conf, database-name=yutu_datacenter, --postgres-conf, schema-name=public, --postgres-conf, slot.name=test3, --postgres-conf, scan.incremental.snapshot.enabled=true, --postgres-conf, decoding.plugin.name=pgoutput, --including-tables, dc_dim_company_pollution, --sink-conf, fenodes=192.168.6.11:8030, --sink-conf, username=root, --sink-conf, password=yutu@611Security, --sink-conf, jdbc-url=jdbc:mysql://192.168.6.11:9030, --sink-conf, sink.label-prefix=label, --table-conf, replication_num=1].


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No tables to be synchronized.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.IllegalStateException: No tables to be synchronized.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at org.apache.doris.flink.tools.cdc.DatabaseSync.build(DatabaseSync.java:118)
at org.apache.doris.flink.tools.cdc.CdcTools.syncDatabase(CdcTools.java:144)
at org.apache.doris.flink.tools.cdc.CdcTools.createPostgresSyncDatabase(CdcTools.java:92)
at org.apache.doris.flink.tools.cdc.CdcTools.main(CdcTools.java:57)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 9 more

1 Answers