flink(1.17) + docker部署的doris(1.2.4),填的8030端口却报8040 Connection refused

Viewed 26

source端 阿里云(hologres)
sink端 doris(1.2.4)单机版 FE/BE在1台机器上
CREATE TEMPORARY TABLE dwd_cmxs_analyse_ri (
id varchar(200) ,
date_time TIMESTAMP ,
app varchar(100) ,
cname varchar(100) ,
sname varchar(100) ,
user_name varchar(100) ,
error_code varchar(100) ,
cmxs_type varchar(100),
source_ts_analysis_enabled int
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'demo.dwd_cmxs_analyse_ri',
'username' = 'root',
'password' = '****',
'sink.label-prefix' = 'doris_label'
);
异常:
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: java.util.concurrent.ExecutionException: org.apache.http.conn.ConnectTimeoutException: Connect to 172.17.0.2:8040 [/172.17.0.2] failed: connect timed out
at org.apache.doris.flink.sink.writer.DorisStreamLoad.stopLoad(DorisStreamLoad.java:296)
at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:250)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:229)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:199)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:340)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1577)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1565)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1431)
... 14 more
Caused by: java.util.concurrent.ExecutionException: org.apache.http.conn.ConnectTimeoutException: Connect to 172.17.0.2:8040 [/172.17.0.2] failed: connect timed out
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.doris.flink.sink.writer.DorisStreamLoad.stopLoad(DorisStreamLoad.java:290)
... 23 more
Caused by: org.apache.http.conn.ConnectTimeoutException: Connect to 172.17.0.2:8040 [/172.17.0.2] failed: connect timed out
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:151)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
at org.apache.doris.flink.sink.writer.DorisStreamLoad.lambda$startLoad$0(DorisStreamLoad.java:347)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.net.SocketTimeoutException: connect timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:662)
at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
... 14 more

1 Answers

可能是因为你配置的 be 的 ip,外部的 Flink 集群无法访问。这主要是因为当连接 fe 时,会通过 fe 解析出 be 的地址。例如,当你添加的 be 地址为127.0.0.1,那么 Flink 通过 fe 获取的 be 地址就为127.0.0.1:webserver_port,此时 Flink 就会去访问这个地址。当出现这个问题时,可以通过在 with 属性中增加实际对应的 be 外部 ip 地'benodes' = "be_ip:webserver_port, be_ip:webserver_port...",整库同步则可增加--sink-conf benodes=be_ip:webserver,be_ip:webserver...