【已记录】rust使用flightsql取数据,执行到be do_get的时候会报错stream error received: unexpected internal error encountered

Viewed 46

fe返回了queryID:SQL,接下来应该那ticket去be DoGet,但是在执行do_get的时候报错,
{ code: Internal, message: "h2 protocol error: http2 error: stream error received: unexpected internal error encountered", source: Some(tonic::transport::Error(Transport, hyper::Error(Http2, Error { kind: Reset(StreamId(1), INTERNAL_ERROR, Remote) }))) }
麻烦各位大佬看一下应该处理呢

doris版本 2.1.0 和 2.1.2
arrow-rs 版本 51.0

以下为rust代码

use std::time::Duration;
use arrow::error::ArrowError;
use arrow_cast::base64::BASE64_STANDARD;
use arrow_flight::{FlightClient, HandshakeRequest};
use arrow_flight::sql::client::FlightSqlServiceClient;
use base64::Engine;
use std::string::String;
use clap::Parser;
use futures::{stream, StreamExt};
use tonic::{Request};
use tonic::transport::{Endpoint};

#[derive(Debug, Parser, PartialEq)]
#[command(disable_help_flag = true)]
struct Args {
    #[clap(
        short = 'q',
        long,
        help = "Flight SQL"
    )]
    sql: String
}

#[tokio::main]
pub async fn main() -> Result<(), ArrowError> {
    let args = Args::parse();

    let protocol = "grpc+tcp";
    let host = "172.16.90.1";
    let port = "39070";
    let user = "root";
    let password = "";

    // fe endpoint
    let url = format!("{protocol}://{host}:{port}");
    println!("{}", url.clone());
    let endpoint_fe = endpoint(String::from(url)).unwrap();
    let channel_fe = endpoint_fe.connect().await.unwrap();

    let mut client_fe = FlightSqlServiceClient::new(channel_fe);
    client_fe.set_header("enable_pipeline_x_engine", "true");

    // Authenticate
    let cmd = HandshakeRequest {
        protocol_version: 0,
        payload: Default::default(),
    };
    let mut req = Request::new(stream::iter(vec![cmd]));
    let val = BASE64_STANDARD.encode(format!("{user}:{password}"));
    let val = format!("Basic {val}")
        .parse()
        .map_err(|_| ArrowError::ParseError("Cannot parse header".to_string()))?;
    req.metadata_mut().insert("authorization", val);
    let resp = client_fe.inner_mut()
        .handshake(req)
        .await
        .map_err(|e| ArrowError::IpcError(format!("Can't handshake {e}")))?;

    // 定一个String auth_token, 用于存储auth token
    let mut auth_token = "".to_string();
    if let Some(auth) = resp.metadata().get("authorization") {
        let auth = auth
            .to_str()
            .map_err(|_| ArrowError::ParseError("Can't read auth header".to_string()))?;
        let bearer = "Bearer ";
        if !auth.starts_with(bearer) {
            Err(ArrowError::ParseError("Invalid auth header!".to_string()))?;
        }
        let auth = auth[bearer.len()..].to_string();
        client_fe.set_token(auth.clone());
        println!("auth success");
        auth_token = auth;
    }
    println!("auth token: {}", auth_token);


    let mut stmt = client_fe.prepare(args.sql.to_string(), None).await.unwrap();
    let flight_info = stmt.execute().await.unwrap();
    println!("all flight info {} done!\n", flight_info.clone());

    // get be endpoint
    let endpoints = flight_info.clone().endpoint;
    for query_endpoint in endpoints {
        println!("{:?} \n", query_endpoint.clone());
        let location = query_endpoint.location;

        if location.len() > 0 {
            println!("get be endpoint success, size {}", location.len());
            let location_uri = location.get(0).unwrap();
            println!("get be endpoint: {:?}", location_uri.uri);

            let ticket = query_endpoint
                .ticket
                .unwrap();
            println!("get ticket success {:?} \n", ticket.clone());

            let be_uri = location_uri.clone().uri;
            println!("set be endpoint to channel: {:?} \n", be_uri.clone());
            let endpoint_be = endpoint(be_uri).unwrap();
            let channel_be = endpoint_be.connect().await.unwrap();

            let mut client_be = FlightClient::new(channel_be);
            client_be.add_header("authorization", auth_token.as_str()).expect("add header error");
            println!("client be endpoint success");

            let mut flight_data = client_be
                .do_get(ticket)
                .await
                .expect("do get be data error");
            println!("get be data success");

            while let Some(Ok(stream_data)) = flight_data.next().await {
                println!("{:?}", stream_data);
            }

        }
        break;
    }

    Ok(())
}


fn endpoint(addr: String) -> Result<Endpoint, ArrowError> {
    let endpoint = Endpoint::new(addr)
        .map_err(|_| ArrowError::IpcError("Cannot create endpoint".to_string()))?
        .connect_timeout(Duration::from_secs(30))
        .timeout(Duration::from_secs(30))
        .tcp_nodelay(false) // Disable Nagle's Algorithm since we don't want packets to wait
        .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
        .http2_keep_alive_interval(Duration::from_secs(300))
        .keep_alive_timeout(Duration::from_secs(30))
        .keep_alive_while_idle(true);

    Ok(endpoint)
}

以下为终端输出

grpc+tcp://172.16.90.1:39070
auth success
auth token: r4v5hv9t3el9esgd7s63gp3d6a
all flight info FlightInfo { schema: Field { name: "date", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, descriptor: FlightDescriptor { type: cmd, value: [10, 75, 116, 121, 112, 101, 46, 103] }, endpoint: [FlightEndpoint { ticket: Ticket { ticket: CkJ0eXBlLmdvb2dsZWFwaXMuY29tL2Fycm93LmZsaWdodC5wcm90b2NvbC5zcWwuVGlja2V0U3RhdGVtZW50UXVlcnkSPAo6ZjQ3OGVkMmY0YjQwNDdjMC04YjNkNjY2NzVhNTFhNjEzOnNlbGVjdCAqIGZyb20gdGVzdC50ZXN0MQ==, location: [Location { uri: grpc+tcp://172.16.90.1:38070], expiration_time: None, app_metadata: [] }], total_records: -1, total_bytes: -1, ordered: false, app_metadata: [] } done!

FlightEndpoint { ticket: Some(Ticket { ticket: b"\nBtype.googleapis.com/arrow.flight.protocol.sql.TicketStatementQuery\x12<\n:f478ed2f4b4047c0-8b3d66675a51a613:select * from test.test1" }), location: [Location { uri: "grpc+tcp://172.16.90.1:38070" }], expiration_time: None, app_metadata: b"" } 

get be endpoint success, size 1
get be endpoint: "grpc+tcp://172.16.90.1:38070"
get ticket success Ticket { ticket: b"\nBtype.googleapis.com/arrow.flight.protocol.sql.TicketStatementQuery\x12<\n:f478ed2f4b4047c0-8b3d66675a51a613:select * from test.test1" } 

set be endpoint to channel: "grpc+tcp://172.16.90.1:38070" 

client be endpoint success
thread 'main' panicked at src/main.rs:109:18:
do get be data error: Tonic(Status { code: Internal, message: "h2 protocol error: http2 error: stream error received: unexpected internal error encountered", source: Some(tonic::transport::Error(Transport, hyper::Error(Http2, Error { kind: Reset(StreamId(1), INTERNAL_ERROR, Remote) }))) })

fe无报错
be无报错

be.INFO


I20240420 18:00:28.211560 121852 fragment_mgr.cpp:661] Register query/load memory tracker, query/load id: f478ed2f4b4047c0-8b3d66675a51a611 limit: 0
I20240420 18:00:28.211572 121852 plan_fragment_executor.cpp:121] PlanFragmentExecutor::prepare|query_id=f478ed2f4b4047c0-8b3d66675a51a611|instance_id=f478ed2f4b4047c0-8b3d66675a51a613|backend_num=0|pthread_id=139632716592704
I20240420 18:00:28.211680 121852 plan_fragment_executor.cpp:121] PlanFragmentExecutor::prepare|query_id=f478ed2f4b4047c0-8b3d66675a51a611|instance_id=f478ed2f4b4047c0-8b3d66675a51a612|backend_num=1|pthread_id=139632716592704
I20240420 18:00:28.213418 127383 plan_fragment_executor.cpp:256] PlanFragmentExecutor::open f478ed2f4b4047c0-8b3d66675a51a611|f478ed2f4b4047c0-8b3d66675a51a613, mem_limit 2.00 GB
I20240420 18:00:28.213424 138850 plan_fragment_executor.cpp:256] PlanFragmentExecutor::open f478ed2f4b4047c0-8b3d66675a51a611|f478ed2f4b4047c0-8b3d66675a51a612, mem_limit 2.00 GB
I20240420 18:00:28.213642 138850 exec_node.cpp:204] query= f478ed2f4b4047c0-8b3d66675a51a611, fragment_instance_id=f478ed2f4b4047c0-8b3d66675a51a612, id=0 type=OLAP_SCAN_NODE closed
I20240420 18:00:28.213655 127383 exec_node.cpp:204] query= f478ed2f4b4047c0-8b3d66675a51a611, fragment_instance_id=f478ed2f4b4047c0-8b3d66675a51a613, id=1 type=EXCHANGE_NODE closed
I20240420 18:00:28.213658 138850 fragment_mgr.cpp:458] Instance f478ed2f4b4047c0-8b3d66675a51a612 finished
I20240420 18:00:28.213680 127383 fragment_mgr.cpp:458] Instance f478ed2f4b4047c0-8b3d66675a51a613 finished
I20240420 18:00:28.213688 127383 fragment_mgr.cpp:462] Query f478ed2f4b4047c0-8b3d66675a51a611 finished
I20240420 18:00:28.213733 127383 query_context.cpp:111] Query f478ed2f4b4047c0-8b3d66675a51a611 deconstructed, 
I20240420 18:00:28.249017 121854 fragment_mgr.cpp:623] query_id: ee38a307ed5d4911-a849f9078d936691 coord_addr TNetworkAddress(hostname=betalpha-sh.betalpha.com, port=39020) total fragment num on current host: 2 fe process uuid: 1713606422291
I20240420 18:00:28.249044 121854 fragment_mgr.cpp:648] Query/load id: ee38a307ed5d4911-a849f9078d936691, use task group: TG[id = 1, name = normal, cpu_share = 1024, memory_limit = 50.91 GB, enable_memory_overcommit = true, version = 0, cpu_hard_limit = -1, scan_thread_num = 128, max_remote_scan_thread_num = 128, min_remote_scan_thread_num = 128], is pipeline: 1, enable cgroup soft limit: 1

be.out有一条报错

E0420 18:00:28.290855787  122312 hpack_parser.cc:833]                  Error parsing ':scheme' metadata: error=invalid value key=:scheme
1 Answers

【问题状态】处理中
【问题处理】内部跟进中,有进展会更新回帖