k8s 部署 3 fe,3 be:doris-2.1.5-rc02-d5a02e095d
k8s 部署 4 nodes, 4 drives:MinIO VERSION 2023-03-22T06:36:24Z
standalone session:flink 1.19.1 1 master, 3 workers
1、 dbeaver 通过mysql驱动连接doris,创建paimon catalog,切换catalog都提示成功
switch internal;
drop CATALOG ctl_paimon_s3;
CREATE CATALOG IF NOT EXISTS ctl_paimon_s3
PROPERTIES (
"type" = "paimon",
"warehouse" = "s3://flink-tst/paimon/",
"s3.endpoint"="https://ossapi.xxx.com",
"s3.access_key"="flink-tst",
"s3.secret_key"="flink-tst",
"s3.path.style.access" = "true",
"s3.region" = "auto"
);
show create catalog ctl_paimon_s3;
switch ctl_paimon_s3;
show catalogs;
2、 执行db和tbl相关语句就会卡死,如show databases;
,后台日志如下:
Caused by: java.net.UnknownHostException: flink-tst.ossapi.xxx.com: Name or service not known
Caused by: org.apache.hadoop.fs.s3a.AWSClientIOException: getFileStatus on s3://flink-tst/paimon: com.amazonaws.SdkClientException: Unable to execute HTTP request: flink-tst.ossapi.xxx.com: Name or service not known: Unable to execute HTTP request: flink-tst.ossapi.xxx.com: Name or service not known
3、 paimon 0.8 已集成到flink, 在flink sql client中可以使用,按上述同样参数能正常创建paimon catalog和table,虽然无法创建db(已在paimon提单),如下:
Flink SQL> use catalog default_catalog;
[INFO] Execute statement succeed.
Flink SQL> drop catalog ctl_paimon_s3;
[INFO] Execute statement succeed.
Flink SQL> CREATE CATALOG ctl_paimon_s3 WITH (
> 'type'='paimon',
> 'warehouse' = 's3p://flink-tst/paimon/',
> 's3.endpoint' = 'https://ossapi.xxx.com',
> 's3.access-key' = 'flink-tst',
> 's3.secret-key' = 'flink-tst',
> 's3.path.style.access' = 'true'
> );
[INFO] Execute statement succeed.
Flink SQL> use catalog ctl_paimon_s3;
[INFO] Execute statement succeed.
Flink SQL> create database db_paimon;
[INFO] Execute statement succeed.
Flink SQL> show databases;
+---------------+
| database name |
+---------------+
| default |
+---------------+
1 row in set
Flink SQL> show tables;
+------------+
| table name |
+------------+
| customer |
| word_count |
+------------+
2 rows in set
Flink SQL> DROP TABLE word_count;
[INFO] Execute statement succeed.
Flink SQL>
> CREATE TABLE word_count (
> word STRING PRIMARY KEY NOT ENFORCED,
> cnt BIGINT
> );
[INFO] Execute statement succeed.
Flink SQL>
> -- create a word data generator table
> CREATE TEMPORARY TABLE word_table (
> word STRING
> ) WITH (
> 'connector' = 'datagen',
> 'fields.word.length' = '1',
> 'rows-per-second' = '10'
> );
[INFO] Execute statement succeed.
Flink SQL> INSERT INTO word_count
> SELECT word, COUNT(*) FROM word_table GROUP BY word;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b204cc71d343e3cd3ecb1116614f60bb
Flink SQL> SELECT * FROM word_count;
[INFO] Result retrieval cancelled.
Flink SQL>