测试表:
CREATE TABLE `t_4` (
`name` VARCHAR(255) NULL
) ENGINE=OLAP
UNIQUE KEY(`name`)
COMMENT 't_4'
DISTRIBUTED BY HASH(`name`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V1",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728"
);
数据:
insert into t_4 values ('001');
flink sql 读取 数据:
-- 创建paimon的catalog连接
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://nameservice2:8020/paimon/warehouse/'
);
create table paimon_print(
`name` string
)with(
'connector'='print'
);
insert into paimon_print select * from paimon_catalog.DDH_PROD.t_1;
CREATE TABLE `doris_t`
(
`name` string,
primary key (`name`) not ENFORCED
) with (
'connector' = 'doris',
'fenodes' = 'localhost:8031',
'table.identifier' = 'test.t_4',
'username' = 'root',
'password' = '123456'
);
create table doris_print
(
`name` string
) with (
'connector'='print'
);
insert into doris_print select * from doris_t
错误信息:
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225) ~[flink-table_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) ~[flink-table_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130) ~[flink-table_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:342) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.11-1.14.0.jar:1.14.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_144]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) ~[flink-table_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) ~[flink-table_2.11-1.14.0.jar:1.14.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_144]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_144]
... 1 more
Caused by: org.apache.doris.flink.exception.DorisException: Load Doris data failed, schema size of fetch data is wrong.
at org.apache.doris.flink.serialization.RowBatch.readArrow(RowBatch.java:127) ~[flink-doris-connector-1.14_2.11-1.1.1.jar:1.1.1]
at org.apache.doris.flink.datastream.ScalaValueReader.hasNext(ScalaValueReader.scala:196) ~[flink-doris-connector-1.14_2.11-1.1.1.jar:1.1.1]
at org.apache.doris.flink.source.reader.DorisSourceSplitReader.fetch(DorisSourceSplitReader.java:59) ~[flink-doris-connector-1.14_2.11-1.1.1.jar:1.1.1]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-table_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ~[flink-table_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) ~[flink-table_2.11-1.14.0.jar:1.14.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_144]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_144]
... 1 more