Flink Doris Connector 整库同步Mysql 到 Doris

Viewed 83
  • flink-doris-connector版本:flink-doris-connector-1.17-1.6.1.jar
  • mysqlCDC 版本:flink-sql-connector-mysql-cdc-2.4.2.jar
  • flink 版本:1.17.2
    通过使用mysql整库同步的命令提交的任务,通过savepoint 停止后,在Mysql库新增的表和Doris创建的对应的表结构,在通过savepoint恢复任务后,数据并没有同步,而是出现了:whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case. 错误。
    image.png
    但是在算子中可以查看到这张表:
    image.png
    • 这是之前的命令:
      bin/flink run
      -Dexecution.checkpointing.interval=30min
      -Dparallelism.default=1
      -c org.apache.doris.flink.tools.cdc.CdcTools
      lib/flink-doris-connector-1.17-1.6.1.jar
      mysql-sync-database
      --database ywdb
      --mysql-conf hostname==****
      --mysql-conf port==****
      --mysql-conf username=****
      --mysql-conf password=****
      --mysql-conf database-name==****
      --including-tables "t_.|ods_."
      --sink-conf fenodes==****
      --sink-conf username=******
      --sink-conf password=******
      --sink-conf jdbc-url==****
      --sink-conf sink.label-prefix=label
      --sink-conf sink.enable.batch-mode=true
      --sink-conf sink.flush.queue-size=2
      --sink-conf sink.buffer-flush.max-rows=50000
      --sink-conf sink.buffer-flush.max-bytes=10MB
      --sink-conf sink.buffer-flush.interval=60s
      --sink-conf sink.ignore.update-before=true

      • 这是启动恢复的命令:
        bin/flink run
        -Dexecution.checkpointing.interval=30min
        -s /data/erpdata/flink-data/sps/ywdb/20241023/savepoint-5c021b-a9284e87aa52
        -Dparallelism.default=1
        -c org.apache.doris.flink.tools.cdc.CdcTools
        lib/flink-doris-connector-1.17-1.6.1.jar
        mysql-sync-database
        --database ywdb
        --mysql-conf hostname==****
        --mysql-conf port==****
        --mysql-conf username==****
        --mysql-conf password=****
        --mysql-conf database-name==****
        --including-tables "t_.|ods_.|t_contract_matter_settlement_detailed"
        --sink-conf fenodes==****
        --sink-conf username==****
        --sink-conf password==****
        --sink-conf jdbc-url==****
        --sink-conf sink.label-prefix=label
        --sink-conf sink.enable.batch-mode=true
        --sink-conf sink.flush.queue-size=2
        --sink-conf sink.buffer-flush.max-rows=50000
        --sink-conf sink.buffer-flush.max-bytes=10MB
        --sink-conf sink.buffer-flush.interval=60s
        --sink-conf sink.ignore.update-before=true
1 Answers