doris2.1.0的flink连接器一直写不进数据

Viewed 41
org.apache.doris flink-doris-connector-1.18 1.6.0 使用上述连接器,用flink从kafka读取数据并写入doris
DorisOptions dorisOptions = DorisOptions.builder()
                .setFenodes(param.getDorisParam().getFenodes())
                .setTableIdentifier(table)
                .setUsername(param.getDorisParam().getUsername())
                .setPassword(param.getDorisParam().getPassword()).build();

        DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions
                .builder();
        executionBuilder.setLabelPrefix(generateLabel())
                .setStreamLoadProp(props).setDeletable(true);

        DorisSink.Builder<String> builder = DorisSink.builder();
        builder.setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisExecutionOptions(executionBuilder.build())
                .setDorisOptions(dorisOptions)
                .setSerializer(JsonDebeziumSchemaSerializer.builder()
                        .setDorisOptions(dorisOptions).setNewSchemaChange(true)
                        .build());

但是运行一段时间后,卡住写不进doris。
flink做检查点一直没做好,超时后就失败。

image.png
be部分日志:

I20241212 11:15:42.239010 202263 topic_subscriber.cpp:48] handle topic WORKLOAD_SCHED_POLICY successfully
I20241212 11:15:43.030752  3066 wal_manager.cpp:473] Scheduled(every 10s) WAL info: [/opt/apache-doris-2.1.0-bin-x64/be/storage/wal: limit 47742504960 Bytes, used 0 Bytes, estimated wal bytes 0 Bytes, available 47742504960 Bytes.];
I20241212 11:15:50.194788  3170 storage_engine.cpp:1077] start to delete unused rowset, size: 513
I20241212 11:15:50.195013  3170 storage_engine.cpp:1110] collected 0 unused rowsets to remove, skipped 513 rowsets due to use count > 1, skipped 0 rowsets due to don't need to delete file, skipped 0 rowsets due to delayed expired timestamp.
I20241212 11:15:50.195019  3170 storage_engine.cpp:1126] removed all collected unused rowsets
I20241212 11:15:53.041891  3066 wal_manager.cpp:473] Scheduled(every 10s) WAL info: [/opt/apache-doris-2.1.0-bin-x64/be/storage/wal: limit 47742504960 Bytes, used 0 Bytes, estimated wal bytes 0 Bytes, available 47742504960 Bytes.];
I20241212 11:15:53.780968  3208 olap_server.cpp:1104] cooldown producer get tablet num: 0
I20241212 11:16:03.053056  3066 wal_manager.cpp:473] Scheduled(every 10s) WAL info: [/opt/apache-doris-2.1.0-bin-x64/be/storage/wal: limit 47742504960 Bytes, used 0 Bytes, estimated wal bytes 0 Bytes, available 47742504960 Bytes.];
I20241212 11:16:04.259593  3299 data_dir.cpp:880] path: /opt/apache-doris-2.1.0-bin-x64/be/storage total capacity: 528046399488, available capacity: 477425049600, usage: 0.0958653, in_use: 1
I20241212 11:16:04.259778  3299 storage_engine.cpp:366] get root path info cost: 0 ms. tablet counter: 240
I20241212 11:16:11.431665  3300 tablet_manager.cpp:1004] find expired transactions for 0 tablets
I20241212 11:16:11.432061  3300 tablet_manager.cpp:1035] success to build all report tablets info. tablet_count=240
I20241212 11:16:12.239491 202263 topic_subscriber.cpp:43] begin handle topic info

fe部分日志:

2024-12-12 11:15:57,662 INFO (thrift-server-pool-32|347) [ReportHandler.handleReport():199] receive report from be 10525. type: TASK, current queue size: 1
2024-12-12 11:15:57,662 INFO (Thread-69|144) [ReportHandler.taskReport():560] finished to handle task report from backend 10525, diff task num: 0. cost: 0 ms
2024-12-12 11:15:59,295 INFO (thrift-server-pool-194|1072) [ReportHandler.handleReport():199] receive report from be 10526. type: TASK, current queue size: 1
2024-12-12 11:15:59,295 INFO (Thread-69|144) [ReportHandler.taskReport():560] finished to handle task report from backend 10526, diff task num: 0. cost: 0 ms
2024-12-12 11:16:02,540 INFO (tablet checker|44) [TabletChecker.checkTablets():333] finished to check tablets. unhealth/total/added/in_sched/not_ready: 0/532/0/0/0, cost: 1 ms
2024-12-12 11:16:04,260 INFO (Thread-69|144) [ReportHandler.diskReport():565] begin to handle disk report from backend 10179
2024-12-12 11:16:04,260 INFO (thrift-server-pool-18|333) [ReportHandler.handleReport():199] receive report from be 10179. type: DISK, current queue size: 1
2024-12-12 11:16:04,260 INFO (Thread-69|144) [ReportHandler.diskReport():576] finished to handle disk report from backend: 10179, disk size: 1, bad disk: [], cost: 0 ms
2024-12-12 11:16:04,260 INFO (Thread-69|144) [ReportHandler.cpuReport():581] begin to handle cpu report from backend 10179
2024-12-12 11:16:04,260 INFO (Thread-69|144) [ReportHandler.cpuReport():595] finished to handle cpu report from backend 10179, cost: 0 ms
2024-12-12 11:16:05,044 INFO (binlog-gcer|59) [BinlogManager.gc():358] begin gc binlog
2024-12-12 11:16:05,044 INFO (binlog-gcer|59) [BinlogManager.gc():369] gc binlog, dbBinlogMap is null
2024-12-12 11:16:05,044 INFO (binlog-gcer|59) [BinlogGcer.runAfterCatalogReady():63] no gc binlog
2024-12-12 11:16:05,636 INFO (InsertOverwriteDropDirtyPartitions|71) [InsertOverwriteManager.runAfterCatalogReady():171] start clean insert overwrite temp partitions

flink部分日志

2024-12-12 11:31:48,926 INFO  org.apache.flink.connector.kafka.sink.KafkaWriter            [] - Created new transactional producer youjie-business-postgre-kafka-0-119664
2024-12-12 11:31:58,773 INFO  com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader [] - Binlog offset on checkpoint 1265231: {transaction_id=null, ts_sec=1733395363, file=mysql-bin.001387, pos=7465370, kind=SPECIFIC, gtids=470e0d58-e817-11ee-bfb3-00163e254e58:1-41331093,93b4f0e9-aacf-11ee-9d6b-00163e25b464:1-6258716, row=0, event=0, server_id=29300911}
2024-12-12 11:32:03,922 INFO  org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer [] - Flushing new partitions
2024-12-12 11:32:03,923 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-youjie-business-postgre-kafka-0-28233, transactionalId=youjie-business-postgre-kafka-0-28233] Invoking InitProducerId for the first time in order to acquire a producer ID
2024-12-12 11:32:03,924 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=producer-youjie-business-postgre-kafka-0-28233, transactionalId=youjie-business-postgre-kafka-0-28233] ProducerId set to 1104314 with epoch 0
2024-12-12 11:32:03,924 INFO  org.apache.flink.connector.kafka.sink.KafkaWriter            [] - Created new transactional producer youjie-business-postgre-kafka-0-119665

1 Answers

这块日志不太全,可以加一下我的微信Faith_xzc, 发一下完整的日志,后面进展会更新到论坛