flink版本:1.18.1
doris版本:doris-2.0.7-bin-x64
flink-doris-connector-1.18-1.5.2.jar
doris建表语句如下:
CREATE TABLE IF NOT EXISTS hpt_unique_doris2doris_test_sink
(
`id` BIGINT COMMENT 'nullable id',
`name` VARCHAR(255),
`description` VARCHAR(255)
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"enable_unique_key_merge_on_write" = "true"
);
同步语句如下:
set state.checkpoints.dir='hdfs://flinkcluster/checkpoints';
set execution.checkpointing.mode ='EXACTLY_ONCE' ;
set externalized-checkpoint-retention='DELETE_ON_CANCELLATION' ;
set execution.checkpointing.interval='2s';
-- Doris source
CREATE TABLE flink_doris_source (
id INT,
name STRING,
description STRING
)
WITH (
'connector' = 'doris',
'fenodes' = '192.168.98.45:8030',
'table.identifier' = 'datawarehouse.hpt_unique_doris2doris_test_source',
'username' = 'flinkcdc',
'password' = 'password',
'benodes' = '192.168.98.45:8040,192.168.98.46:8040,192.168.98.47:8040'
);
-- Doris sink
DROP TABLE IF EXISTS hpt_doris_test;
CREATE TABLE hpt_doris_test (
id INT,
name STRING,
description STRING
)
WITH (
'connector' = 'doris',
'fenodes' = '192.168.98.45:8030',
'table.identifier' = 'datawarehouse.hpt_unique_doris2doris_test_sink',
'username' = 'flinkcdc',
'password' = 'password',
'sink.label-prefix' = 'hpt_mysql2doris_test1',
'benodes' = '192.168.98.45:8040,192.168.98.46:8040,192.168.98.47:8040'
);
INSERT INTO hpt_doris_test SELECT
id,
name,
description FROM flink_doris_source ;
flink报错日志如下:
2024-04-15 14:27:35
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:263)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:185)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:147)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: Load Doris data failed, schema size of fetch data is wrong.
at org.apache.doris.flink.serialization.RowBatch.readArrow(RowBatch.java:138)
at org.apache.doris.flink.source.reader.DorisValueReader.hasNext(DorisValueReader.java:218)
at org.apache.doris.flink.source.reader.DorisSourceSplitReader.fetch(DorisSourceSplitReader.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more