doris升级2.1.8,使用spark-doris-connector-spark-3.1.25.0.1的connenct报错

Viewed 14

1、场景:使用spark将hive数据导入doris,doris版本:2.1.8,spark版本:3.1.3
2、pom配置:

org.apache.doris
spark-doris-connector-spark-3.1
25.0.1

3、写入程序配置:
source
.write
.format("doris")
.mode(SaveMode.Overwrite)
.option("doris.table.identifier", s"$targetDatabaseName.$targetTableName")
.option("doris.fenodes", s"$targetServerHost:${progressParamConfig.g_http_port}")
.option("doris.query.port", s"$targetServerPort")
.option("user", s"$targetUsername")
.option("password", s"$targetPassword")
.option("doris.sink.streaming.passthrough", "true")
.option("doris.sink.properties.format", "json")
// .option("doris.sink.properties.column_separator", "\x01")
// .option("doris.sink.properties.line_delimiter", "\u0002")
.save()
4、进行overwrite覆盖写时,会在truncate阶段报错如下:
25/03/21 17:01:23 ERROR RuleProcessor: Error processing table: dwm__xxxxx_ext_info__nd
org.apache.spark.sql.AnalysisException: Table edm.dwm__xxxxx_ext_info__nd does not support truncate in batch mode.;
OverwriteByExpression RelationV2[statistical_date#971, project_id#972, prj_no#973, project_name#974, project_type_id#975, project_type_name#976, project_parent_type_id#977, project_parent_type_name#978, state#979, project_scale#980, generate_date#981, begin_date#982, close_time#983, end_date#984, work_limited_time#985, service_team_id#986, service_team_name#987, project_manager_id#988, project_manager_name#989, current_health_check#990, current_grade#991, final_grade#992, risk_value#993, risk_level#994, ... 12 more fields] edm.dwm__xxxxx_ext_info__nd, true, Map(doris.sink.properties.format -> json, doris.sink.streaming.passthrough -> true, doris.fenodes -> 10.x.x.26:8030, doris.query.port -> 9030, doris.table.identifier -> edm.dwm__pm__xxxxt_info__nd, user -> root, password -> xxxxxxx), true
+- SubqueryAlias spark_catalog.edw.dwm__pm__sxxxxxxt_info__nd
+- HiveTableRelation [edw.dwm__pm__xxxxxxxx__nd, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [statistical_date#824, project_id#825, prj_no#826, project_name#827, project_type_id#828, project..., Partition Cols: []]

at org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck$.failAnalysis(TableCapabilityCheck.scala:35)
at org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck$.$anonfun$apply$1(TableCapabilityCheck.scala:65)
at org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck$.$anonfun$apply$1$adapted(TableCapabilityCheck.scala:42)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:173)
at org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck$.apply(TableCapabilityCheck.scala:42)
at org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck$.apply(TableCapabilityCheck.scala:32)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$46(CheckAnalysis.scala:699)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$46$adapted(CheckAnalysis.scala:699)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:699)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:90)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:155)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:176)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:228)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:173)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:73)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:71)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:63)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:77)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:76)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:76)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:87)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:95)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:113)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:101)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:377)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
at com.eccom.sys.util.etl.util.EtlUtil$.hiveToDorisHeterogeneous(EtlUtil.scala:475)
at com.eccom.sys.util.rule.RuleProcessor$.$anonfun$processWithPriorityAndRuleType$2(RuleProcessor.scala:413)
at com.eccom.sys.util.rule.RuleProcessor$.$anonfun$processWithPriorityAndRuleType$2$adapted(RuleProcessor.scala:384)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at com.eccom.sys.util.rule.RuleProcessor$.processWithPriorityAndRuleType(RuleProcessor.scala:384)
at com.eccom.sys.util.rule.RuleProcessor$.$anonfun$handleUserRules$2(RuleProcessor.scala:134)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:321)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:977)
at com.eccom.sys.util.rule.RuleProcessor$.$anonfun$handleUserRules$1(RuleProcessor.scala:125)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.eccom.sys.util.rule.RuleProcessor$.processWithDefaults$1(RuleProcessor.scala:125)
at com.eccom.sys.util.rule.RuleProcessor$.handleUserRules(RuleProcessor.scala:157)
at com.eccom.sys.util.rule.RuleProcessor$.calculateRules(RuleProcessor.scala:91)
at com.eccom.sys.util.rule.RuleProcessor$.main(RuleProcessor.scala:39)
at com.eccom.sys.util.rule.RuleProcessor.main(RuleProcessor.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
1 Answers

补充一下,目前在将spark-doris-connect回退到1.3.2之后,数据导入也是可以成功,但是出现了丢数据的情况,基本每张表都会丢一些数据,不知是不是因为doris版本和connect的版本不匹配导致的?