sink.batch.size的大小问题

Viewed 84

spakr 写入 sink.batch.size 配置为10W 采用12个线程去跑 运行一段时间后报异常如下 这个问题是不是因为导入数据量太大还是并发太高导致的 需要修改什么配置来解决这个问题

614028 [Executor task launch worker for task 9.0 in stage 0.0 (TID 3)] ERROR org.apache.doris.spark.backend.BackendClient  - Doris server 'Doris BE{host='192.168.1.221', port=9060}' internal failed, status is 'ABORTED', error message is '[(192.168.1.221)[ABORTED]]'

3614030 [Executor task launch worker for task 9.0 in stage 0.0 (TID 3)] INFO  org.apache.http.impl.execchain.RetryExec  - I/O exception (java.io.IOException) caught when processing request to {}->http://192.168.1.221:8040: org.apache.doris.spark.exception.DorisInternalException: Doris server Doris BE{host='192.168.1.221', port=9060} internal failed, status code [ABORTED] error message is [(192.168.1.221)[ABORTED]]

3614056 [Executor task launch worker for task 9.0 in stage 0.0 (TID 3)] INFO  org.apache.doris.spark.backend.BackendClient  - CloseScanner to Doris BE 'Doris BE{host='192.168.1.221', port=9060}' success.

3614056 [Executor task launch worker for task 9.0 in stage 0.0 (TID 3)] INFO  org.apache.doris.spark.backend.BackendClient  - Closed a connection to Doris BE{host='192.168.1.221', port=9060}.

3614068 [Executor task launch worker for task 9.0 in stage 0.0 (TID 3)] ERROR org.apache.spark.executor.Executor  - Exception in task 9.0 in stage 0.0 (TID 3)

java.io.IOException: Failed to load batch data on BE: http://192.168.1.221:8040/api/etl_dwd/dwd_pa_company_base_add_duplicate/_stream_load node.

	at org.apache.doris.spark.writer.DorisWriter.$anonfun$doWrite$1(DorisWriter.scala:108)

	at org.apache.doris.spark.writer.DorisWriter.$anonfun$doWrite$1$adapted(DorisWriter.scala:95)

	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1020)

	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1020)

	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)

	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)

	at org.apache.spark.scheduler.Task.run(Task.scala:131)

	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)

	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)

	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)

	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

	at java.lang.Thread.run(Thread.java:750)

Caused by: org.apache.doris.spark.exception.StreamLoadException: load execute failed

	at org.apache.doris.spark.load.DorisStreamLoad.load(DorisStreamLoad.java:242)

	at org.apache.doris.spark.writer.DorisWriter.$anonfun$write$1(DorisWriter.scala:71)

	at org.apache.doris.spark.writer.DorisWriter.$anonfun$write$1$adapted(DorisWriter.scala:71)

	at org.apache.doris.spark.writer.DorisWriter.$anonfun$doWrite$4(DorisWriter.scala:100)

	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)

	at scala.util.Try$.apply(Try.scala:213)

	at org.apache.doris.spark.sql.Utils$.retry(Utils.scala:182)

	at org.apache.doris.spark.writer.DorisWriter.$anonfun$doWrite$3(DorisWriter.scala:99)

	at org.apache.doris.spark.writer.DorisWriter.$anonfun$doWrite$1(DorisWriter.scala:100)

	... 12 more

Caused by: org.apache.http.client.ClientProtocolException

	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:186)

	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)

	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)

	at org.apache.doris.spark.load.DorisStreamLoad.load(DorisStreamLoad.java:227)

	... 20 more

Caused by: org.apache.http.client.NonRepeatableRequestException: Cannot retry request with a non-repeatable request entity

	at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:107)

	at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)

	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)

	... 23 more

Caused by: java.io.IOException: org.apache.doris.spark.exception.DorisInternalException: Doris server Doris BE{host='192.168.1.221', port=9060} internal failed, status code [ABORTED] error message is [(192.168.1.221)[ABORTED]]

	at org.apache.doris.spark.load.RecordBatchInputStream.read(RecordBatchInputStream.java:103)

	at java.io.InputStream.read(InputStream.java:101)

	at org.apache.http.entity.InputStreamEntity.writeTo(InputStreamEntity.java:133)

	at org.apache.http.impl.execchain.RequestEntityProxy.writeTo(RequestEntityProxy.java:123)

	at org.apache.http.impl.DefaultBHttpClientConnection.sendRequestEntity(DefaultBHttpClientConnection.java:156)

	at org.apache.http.impl.conn.CPoolProxy.sendRequestEntity(CPoolProxy.java:162)

	at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:238)

	at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)

	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)

	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)

	at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)

	... 25 more

Caused by: org.apache.doris.spark.exception.DorisInternalException: Doris server Doris BE{host='192.168.1.221', port=9060} internal failed, status code [ABORTED] error message is [(192.168.1.221)[ABORTED]]

	at org.apache.doris.spark.backend.BackendClient.getNext(BackendClient.java:192)

	at org.apache.doris.spark.rdd.ScalaValueReader.$anonfun$hasNext$2(ScalaValueReader.scala:207)

	at org.apache.doris.spark.rdd.ScalaValueReader.org$apache$doris$spark$rdd$ScalaValueReader$$lockClient(ScalaValueReader.scala:239)

	at org.apache.doris.spark.rdd.ScalaValueReader.hasNext(ScalaValueReader.scala:207)

	at org.apache.doris.spark.rdd.AbstractDorisRDDIterator.hasNext(AbstractDorisRDDIterator.scala:56)

	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)

	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)

	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)

	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)

	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)

	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)

	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)

	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)

	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)

	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)

	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)

	at org.apache.doris.spark.writer.DorisWriter$BatchIterator.hasNext(DorisWriter.scala:158)

	at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:32)

	at org.apache.doris.spark.load.RecordBatchInputStream.endOfBatch(RecordBatchInputStream.java:120)

	at org.apache.doris.spark.load.RecordBatchInputStream.read(RecordBatchInputStream.java:93)

	... 35 more

2 Answers

spark任务的connector的其他参数是什么?方便发一下吗

如果是在进行 doris 到doris的数据迁移,可以使用 x2doris来进行,可以简化操作