fe返回了queryID:SQL,接下来应该那ticket去be DoGet,但是在执行do_get的时候报错,
{ code: Internal, message: "h2 protocol error: http2 error: stream error received: unexpected internal error encountered", source: Some(tonic::transport::Error(Transport, hyper::Error(Http2, Error { kind: Reset(StreamId(1), INTERNAL_ERROR, Remote) }))) }
麻烦各位大佬看一下应该处理呢
doris版本 2.1.0 和 2.1.2
arrow-rs 版本 51.0
以下为rust代码
use std::time::Duration;
use arrow::error::ArrowError;
use arrow_cast::base64::BASE64_STANDARD;
use arrow_flight::{FlightClient, HandshakeRequest};
use arrow_flight::sql::client::FlightSqlServiceClient;
use base64::Engine;
use std::string::String;
use clap::Parser;
use futures::{stream, StreamExt};
use tonic::{Request};
use tonic::transport::{Endpoint};
#[derive(Debug, Parser, PartialEq)]
#[command(disable_help_flag = true)]
struct Args {
#[clap(
short = 'q',
long,
help = "Flight SQL"
)]
sql: String
}
#[tokio::main]
pub async fn main() -> Result<(), ArrowError> {
let args = Args::parse();
let protocol = "grpc+tcp";
let host = "172.16.90.1";
let port = "39070";
let user = "root";
let password = "";
// fe endpoint
let url = format!("{protocol}://{host}:{port}");
println!("{}", url.clone());
let endpoint_fe = endpoint(String::from(url)).unwrap();
let channel_fe = endpoint_fe.connect().await.unwrap();
let mut client_fe = FlightSqlServiceClient::new(channel_fe);
client_fe.set_header("enable_pipeline_x_engine", "true");
// Authenticate
let cmd = HandshakeRequest {
protocol_version: 0,
payload: Default::default(),
};
let mut req = Request::new(stream::iter(vec![cmd]));
let val = BASE64_STANDARD.encode(format!("{user}:{password}"));
let val = format!("Basic {val}")
.parse()
.map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
req.metadata_mut().insert("authorization", val);
let resp = client_fe.inner_mut()
.handshake(req)
.await
.map_err(|e| ArrowError::IpcError(format!("Can't handshake {e}")))?;
// 定一个String auth_token, 用于存储auth token
let mut auth_token = "".to_string();
if let Some(auth) = resp.metadata().get("authorization") {
let auth = auth
.to_str()
.map_err(|_| ArrowError::ParseError("Can't read auth header".to_string()))?;
let bearer = "Bearer ";
if !auth.starts_with(bearer) {
Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
}
let auth = auth[bearer.len()..].to_string();
client_fe.set_token(auth.clone());
println!("auth success");
auth_token = auth;
}
println!("auth token: {}", auth_token);
let mut stmt = client_fe.prepare(args.sql.to_string(), None).await.unwrap();
let flight_info = stmt.execute().await.unwrap();
println!("all flight info {} done!\n", flight_info.clone());
// get be endpoint
let endpoints = flight_info.clone().endpoint;
for query_endpoint in endpoints {
println!("{:?} \n", query_endpoint.clone());
let location = query_endpoint.location;
if location.len() > 0 {
println!("get be endpoint success, size {}", location.len());
let location_uri = location.get(0).unwrap();
println!("get be endpoint: {:?}", location_uri.uri);
let ticket = query_endpoint
.ticket
.unwrap();
println!("get ticket success {:?} \n", ticket.clone());
let be_uri = location_uri.clone().uri;
println!("set be endpoint to channel: {:?} \n", be_uri.clone());
let endpoint_be = endpoint(be_uri).unwrap();
let channel_be = endpoint_be.connect().await.unwrap();
let mut client_be = FlightClient::new(channel_be);
client_be.add_header("authorization", auth_token.as_str()).expect("add header error");
println!("client be endpoint success");
let mut flight_data = client_be
.do_get(ticket)
.await
.expect("do get be data error");
println!("get be data success");
while let Some(Ok(stream_data)) = flight_data.next().await {
println!("{:?}", stream_data);
}
}
break;
}
Ok(())
}
fn endpoint(addr: String) -> Result<Endpoint, ArrowError> {
let endpoint = Endpoint::new(addr)
.map_err(|_| ArrowError::IpcError("Cannot create endpoint".to_string()))?
.connect_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(30))
.tcp_nodelay(false) // Disable Nagle's Algorithm since we don't want packets to wait
.tcp_keepalive(Option::Some(Duration::from_secs(3600)))
.http2_keep_alive_interval(Duration::from_secs(300))
.keep_alive_timeout(Duration::from_secs(30))
.keep_alive_while_idle(true);
Ok(endpoint)
}
以下为终端输出
grpc+tcp://172.16.90.1:39070
auth success
auth token: r4v5hv9t3el9esgd7s63gp3d6a
all flight info FlightInfo { schema: Field { name: "date", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, descriptor: FlightDescriptor { type: cmd, value: [10, 75, 116, 121, 112, 101, 46, 103] }, endpoint: [FlightEndpoint { ticket: Ticket { ticket: CkJ0eXBlLmdvb2dsZWFwaXMuY29tL2Fycm93LmZsaWdodC5wcm90b2NvbC5zcWwuVGlja2V0U3RhdGVtZW50UXVlcnkSPAo6ZjQ3OGVkMmY0YjQwNDdjMC04YjNkNjY2NzVhNTFhNjEzOnNlbGVjdCAqIGZyb20gdGVzdC50ZXN0MQ==, location: [Location { uri: grpc+tcp://172.16.90.1:38070], expiration_time: None, app_metadata: [] }], total_records: -1, total_bytes: -1, ordered: false, app_metadata: [] } done!
FlightEndpoint { ticket: Some(Ticket { ticket: b"\nBtype.googleapis.com/arrow.flight.protocol.sql.TicketStatementQuery\x12<\n:f478ed2f4b4047c0-8b3d66675a51a613:select * from test.test1" }), location: [Location { uri: "grpc+tcp://172.16.90.1:38070" }], expiration_time: None, app_metadata: b"" }
get be endpoint success, size 1
get be endpoint: "grpc+tcp://172.16.90.1:38070"
get ticket success Ticket { ticket: b"\nBtype.googleapis.com/arrow.flight.protocol.sql.TicketStatementQuery\x12<\n:f478ed2f4b4047c0-8b3d66675a51a613:select * from test.test1" }
set be endpoint to channel: "grpc+tcp://172.16.90.1:38070"
client be endpoint success
thread 'main' panicked at src/main.rs:109:18:
do get be data error: Tonic(Status { code: Internal, message: "h2 protocol error: http2 error: stream error received: unexpected internal error encountered", source: Some(tonic::transport::Error(Transport, hyper::Error(Http2, Error { kind: Reset(StreamId(1), INTERNAL_ERROR, Remote) }))) })
fe无报错
be无报错
be.INFO
I20240420 18:00:28.211560 121852 fragment_mgr.cpp:661] Register query/load memory tracker, query/load id: f478ed2f4b4047c0-8b3d66675a51a611 limit: 0
I20240420 18:00:28.211572 121852 plan_fragment_executor.cpp:121] PlanFragmentExecutor::prepare|query_id=f478ed2f4b4047c0-8b3d66675a51a611|instance_id=f478ed2f4b4047c0-8b3d66675a51a613|backend_num=0|pthread_id=139632716592704
I20240420 18:00:28.211680 121852 plan_fragment_executor.cpp:121] PlanFragmentExecutor::prepare|query_id=f478ed2f4b4047c0-8b3d66675a51a611|instance_id=f478ed2f4b4047c0-8b3d66675a51a612|backend_num=1|pthread_id=139632716592704
I20240420 18:00:28.213418 127383 plan_fragment_executor.cpp:256] PlanFragmentExecutor::open f478ed2f4b4047c0-8b3d66675a51a611|f478ed2f4b4047c0-8b3d66675a51a613, mem_limit 2.00 GB
I20240420 18:00:28.213424 138850 plan_fragment_executor.cpp:256] PlanFragmentExecutor::open f478ed2f4b4047c0-8b3d66675a51a611|f478ed2f4b4047c0-8b3d66675a51a612, mem_limit 2.00 GB
I20240420 18:00:28.213642 138850 exec_node.cpp:204] query= f478ed2f4b4047c0-8b3d66675a51a611, fragment_instance_id=f478ed2f4b4047c0-8b3d66675a51a612, id=0 type=OLAP_SCAN_NODE closed
I20240420 18:00:28.213655 127383 exec_node.cpp:204] query= f478ed2f4b4047c0-8b3d66675a51a611, fragment_instance_id=f478ed2f4b4047c0-8b3d66675a51a613, id=1 type=EXCHANGE_NODE closed
I20240420 18:00:28.213658 138850 fragment_mgr.cpp:458] Instance f478ed2f4b4047c0-8b3d66675a51a612 finished
I20240420 18:00:28.213680 127383 fragment_mgr.cpp:458] Instance f478ed2f4b4047c0-8b3d66675a51a613 finished
I20240420 18:00:28.213688 127383 fragment_mgr.cpp:462] Query f478ed2f4b4047c0-8b3d66675a51a611 finished
I20240420 18:00:28.213733 127383 query_context.cpp:111] Query f478ed2f4b4047c0-8b3d66675a51a611 deconstructed,
I20240420 18:00:28.249017 121854 fragment_mgr.cpp:623] query_id: ee38a307ed5d4911-a849f9078d936691 coord_addr TNetworkAddress(hostname=betalpha-sh.betalpha.com, port=39020) total fragment num on current host: 2 fe process uuid: 1713606422291
I20240420 18:00:28.249044 121854 fragment_mgr.cpp:648] Query/load id: ee38a307ed5d4911-a849f9078d936691, use task group: TG[id = 1, name = normal, cpu_share = 1024, memory_limit = 50.91 GB, enable_memory_overcommit = true, version = 0, cpu_hard_limit = -1, scan_thread_num = 128, max_remote_scan_thread_num = 128, min_remote_scan_thread_num = 128], is pipeline: 1, enable cgroup soft limit: 1
be.out有一条报错
E0420 18:00:28.290855787 122312 hpack_parser.cc:833] Error parsing ':scheme' metadata: error=invalid value key=:scheme