doris arm环境下flink读不到数据问题

Viewed 87

我们线上2.1.3的arm版本,flink-connector版本1.6.1,读取数据报错导致数据读取失败。flink-sql任务提交后,过一段时间任务失败报错如下:

2024-07-01 09:55:52,384 WARN  org.apache.doris.flink.backend.BackendClient                 [] - Get next from Doris BE{host='10.76.119.39', port=9060} failed.
org.apache.doris.shaded.org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
at org.apache.doris.shaded.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:179) ~[flink-doris-connector-1.15-1.6.1.jar:1.6.1]
at org.apache.doris.shaded.org.apache.thrift.transport.TTransport.readAll(TTransport.java:109) ~[flink-doris-connector-1.15-1.6.1.jar:1.6.1]
at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:464) ~[flink-doris-connector-1.15-1.6.1.jar:1.6.1]
at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:362) ~[flink-doris-connector-1.15-1.6.1.jar:1.6.1]
at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:245) ~[flink-doris-connector-1.15-1.6.1.jar:1.6.1]
at org.apache.doris.shaded.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) ~[flink-doris-connector-1.15-1.6.1.jar:1.6.1]
at org.apache.doris.sdk.thrift.TDorisExternalService$Client.recvGetNext(TDorisExternalService.java:92) ~[flink-doris-connector-1.15-1.6.1.jar:1.6.1]
at org.apache.doris.sdk.thrift.TDorisExternalService$Client.getNext(TDorisExternalService.java:79) ~[flink-doris-connector-1.15-1.6.1.jar:1.6.1]
at org.apache.doris.flink.backend.BackendClient.getNext(BackendClient.java:185) [flink-doris-connector-1.15-1.6.1.jar:1.6.1]
at org.apache.doris.flink.source.reader.DorisValueReader.hasNext(DorisValueReader.java:243) [flink-doris-connector-1.15-1.6.1.jar:1.6.1]
at org.apache.doris.flink.source.reader.DorisSourceSplitReader.fetch(DorisSourceSplitReader.java:57) [flink-doris-connector-1.15-1.6.1.jar:1.6.1]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) [flink-connector-files-1.15.2.jar:1.15.2]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) [flink-connector-files-1.15.2.jar:1.15.2]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) [flink-connector-files-1.15.2.jar:1.15.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_311]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_311]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_311]
at java.util.concurrent.ThreadPoolExecutor$Worker. run(ThreadPoolExecutor.java:624) [?:1.8.0_311]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_311]
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method) ~[?:1.8.0_311]
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[?:1.8.0_311]
at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[?:1.8.0_311]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_311]
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[?:1.8.0_311]
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) ~[?:1.8.0_311]
at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_311]
at org.apache.doris.shaded.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:177) ~[flink-doris-connector-1.15-1.6.1.jar:1.6.1]
... 18 more

乍一看,以为是网络不通,排查了一遍flink的taskmanager到be的9060端口的发现网络没有问题,认真分析了一下“Read timed out”这个报错,应该是flink向be的9060端口发起了读请求,be一直没有应答,因为客户端设置了超时时间,到期时间到了之后抛出了异常。这种情况我们怀疑是be端出现了问题,要么是be在卡在了哪个逻辑上一直不返回,要么是触发了什么异常,异常没有处理导致没有响应。我们在报错之后打印了be的线程栈,核心部分如下:

Thread 2401 (LWP 35966):
#0  0x0000fffeebbdec48 in pthread_cond_wait () from /lib64/libpthread.so.0
#1  0x0000aaacf2d49880 in std::condition_variable::wait(std::unique_lock<std::mutex>&) ()
#2  0x0000aaace6ccc104 in doris::BlockingQueue<std::shared_ptr<arrow::RecordBatch> >::blocking_get(std::shared_ptr<arrow::RecordBatch>*) ()
#3  0x0000aaace6ccabd8 in doris::ResultQueueMgr::fetch_result(doris::TUniqueId const&, std::shared_ptr<arrow::RecordBatch>*, bool*) ()
#4  0x0000aaace6d6033c in doris::BackendService::get_next(doris::TScanBatchResult&, doris::TScanNextBatchParams const&) ()
#5  0x0000aaace6efafa4 in doris::BackendServiceProcessor::process_get_next(int, apache::thrift::protocol::TProtocol*, apache::thrift::protocol::TProtocol*, void*) ()
#6  0x0000aaace6ef30d0 in doris::BackendServiceProcessor::dispatchCall(apache::thrift::protocol::TProtocol*, apache::thrift::protocol::TProtocol*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int, void*) ()
#7  0x0000aaace6f0b6e4 in apache::thrift::TDispatchProcessor::process(std::shared_ptr<apache::thrift::protocol::TProtocol>, std::shared_ptr<apache::thrift::protocol::TProtocol>, void*) ()
#8  0x0000aaacf0e38b70 in apache::thrift::server::TConnectedClient::run() ()
#9  0x0000aaacf0e39cb4 in apache::thrift::server::TThreadedServer::TConnectedClientRunner::run() ()
#10 0x0000aaacf0e3fab4 in apache::thrift::concurrency::Thread::threadMain(std::shared_ptr<apache::thrift::concurrency::Thread>) ()
#11 0x0000aaacf0e3f940 in void std::__invoke_impl<void, void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread> >(std::__invoke_other, void (*&&)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>&&) ()
#12 0x0000aaacf0e3f87c in std::__invoke_result<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread> >::type std::__invoke<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread> >(void (*&&)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread>&&) ()
#13 0x0000aaacf0e3f7e0 in void std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread> > >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) ()
#14 0x0000aaacf0e3f754 in std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread> > >::operator()() ()
#15 0x0000aaacf0e3f6f0 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<apache::thrift::concurrency::Thread>), std::shared_ptr<apache::thrift::concurrency::Thread> > > >::_M_run() ()
#16 0x0000aaacf2e05a5c in execute_native_thread_routine ()
#17 0x0000fffeebbd88cc in ?? () from /lib64/libpthread.so.0
#18 0x0000fffeebe2a1ec in ?? () from /lib64/libc.so.6

通过线程栈的分析,应该是flink发起了thrift的get_next方法调用,get_next的主要逻辑是从队列上阻塞读取数据,应该是队列里面没有数据导致线程卡到这个地方出不来了。后面看了一下flink-connector的读取逻辑,应该是首先发起了一个open_scanner的调用,数据应该是在这个逻辑写到队列里面的,完了通过get_next从队列里面读取,猜测是open_scanner的过程出现了问题,因为open_scanner逻辑比较复杂,没有环境进行debug跟踪,还请哪位大佬帮忙解决一下。

2 Answers
  1. doris.request.read.timeout 默认为30s ,调大值再观察下。