把mysql的数据用seatunnel 导入到doris库中,需要在执行seatunnel脚本前,手工先单独去创建doris表吗?

Viewed 149

把mysql的数据用seatunnel 导入到doris库中,需要在执行seatunnel脚本前,手工先单独去创建doris表吗?

我写了下面这个脚本,执行的时候报错,但是文档中差不到原因:

env{
parallelism = 4
job.mode = "BATCH"
}
source{
Jdbc{
url = "jdbc:mysql://129.0.0.226:3306/bank?serverTimezone=GMT%2b8&useSSL=false"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
query = "select * from exam_paper" #必须有
}
}

transform {
sql {
source_table_name = "exam_paper"
query = "select * from exam_paper" # 必须有
result_table_name = "exam_paper_ods"
}
}

sink {
Doris{
fenodes = "129.0.0.109:8030"
username = root
password = "123456"
table.identifier = "ods.exam_paper"
sink.enable-2pc = "true"
sink.label-prefix = "it_"
doris.config = {
format = "json"
read_json_by_line = "true"
}
}
}

执行报错:
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:655)
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.createSinkAction(MultipleTableJobConfigParser.java:641)
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:553)
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:200)
at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:88)
at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:156)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:149)
... 2 more
Caused by: org.apache.seatunnel.api.table.catalog.exception.CatalogException: ErrorCode:[API-03], ErrorDescription:[Catalog initialize failed] - create table statement execute failed
at org.apache.seatunnel.connectors.doris.catalog.DorisCatalog.createTable(DorisCatalog.java:291)
at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createTable(DefaultSaveModeHandler.java:181)
at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:108)
at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:69)
at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:38)
at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode(MultipleTableJobConfigParser.java:653)
... 8 more
Caused by: java.sql.SQLException: errCode = 2, detailMessage = Syntax error in line 46:
UNIQUE KEY ()
^
Encountered: )
Expected: IDENTIFIER

at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.StatementImpl.executeInternal(StatementImpl.java:768)
at com.mysql.cj.jdbc.StatementImpl.execute(StatementImpl.java:653)
at org.apache.seatunnel.connectors.doris.catalog.DorisCatalog.createTable(DorisCatalog.java:289)
... 14 more
3 Answers

1、如果先建表再导入是否正常
2、seatunnel用的什么版本,最新版本是否正常
3、可以考虑基于flink-doris-connector有自动建表模块:flink-doris-connector

没有尝试先建表。
seatunnel版本是2.3.5
执行下面的代码的时候,报这么个错,感觉是没有设置primary-key。
但是seatunell-doris的文档中,似乎没有怎么设置主键这个参数。
脚本执行后,库建立了,但是要导入的表没有。
mysql> show databases;
+--------------------------+
| Database |
+--------------------------+
| information_schema |
| itest_bank_ods |
| mysql |
+--------------------------+
6 rows in set (0.01 sec)

mysql> use itest_bank_ods
Database changed
mysql> show tables;
Empty set (0.00 sec)


env{
parallelism = 4
job.mode = "BATCH"
}
source{
Jdbc{
url = "jdbc:mysql://129.0.0.226:3306/itest_bank?serverTimezone=GMT%2b8&useSSL=false"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
query = "select * from itest_bank.it_exam_paper"
primary_keys = ["ppid"]
}
}

transform {
}

sink {
Doris {
fenodes = "129.0.0.109:9030"
username = root
password = "123456"
database = "itest_bank_ods"
#table = "question_itest_exam_paper_ods"
table.identifier = "itest_bank_ods.question_itest_exam_paper_ods"
sink.label-prefix = "it_"
sink.enable-2pc = "true"
doris.config {
format = "json"
read_json_by_line = "true"
}
}
}

上游的 mysql 表没有 primary key 吗