使用flink1.18同步doris-->doris的时候报错

Viewed 57

flink版本:1.18.1
doris版本:doris-2.0.7-bin-x64
flink-doris-connector-1.18-1.5.2.jar

doris建表语句如下:

CREATE TABLE IF NOT EXISTS hpt_unique_doris2doris_test_sink
(
    `id` BIGINT COMMENT 'nullable id',
    `name` VARCHAR(255),
    `description` VARCHAR(255)
)
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"enable_unique_key_merge_on_write" = "true"
);

同步语句如下:

set state.checkpoints.dir='hdfs://flinkcluster/checkpoints';
set execution.checkpointing.mode ='EXACTLY_ONCE' ;
set externalized-checkpoint-retention='DELETE_ON_CANCELLATION' ;
set execution.checkpointing.interval='2s';



-- Doris source
CREATE TABLE flink_doris_source (
    id INT,    
    name STRING,    
    description STRING
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '192.168.98.45:8030',
      'table.identifier' = 'datawarehouse.hpt_unique_doris2doris_test_source',
      'username' = 'flinkcdc',
      'password' = 'password',
      'benodes' = '192.168.98.45:8040,192.168.98.46:8040,192.168.98.47:8040'
);



-- Doris sink
DROP TABLE IF EXISTS hpt_doris_test;
CREATE TABLE hpt_doris_test (
  id INT,    
  name STRING,    
  description STRING
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '192.168.98.45:8030',
      'table.identifier' = 'datawarehouse.hpt_unique_doris2doris_test_sink',
      'username' = 'flinkcdc',
      'password' = 'password',
      'sink.label-prefix' = 'hpt_mysql2doris_test1',
      'benodes' = '192.168.98.45:8040,192.168.98.46:8040,192.168.98.47:8040'
);


INSERT INTO hpt_doris_test SELECT 
  id,    
  name,    
  description FROM  flink_doris_source ;

flink报错日志如下:

2024-04-15 14:27:35
java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:263)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:185)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:147)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: Load Doris data failed, schema size of fetch data is wrong.
	at org.apache.doris.flink.serialization.RowBatch.readArrow(RowBatch.java:138)
	at org.apache.doris.flink.source.reader.DorisValueReader.hasNext(DorisValueReader.java:218)
	at org.apache.doris.flink.source.reader.DorisSourceSplitReader.fetch(DorisSourceSplitReader.java:58)
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
	... 6 more

1 Answers

看起来是读这里有点问题

不过 Doris to Doris 建议使用 X2doris 的方式进行数据同步。