【已解决】flink-doris-connector 1.5.2 flink1.8 采集kafka写入doris 不成功,[INTERNAL_ERROR]too many filtered rows

Viewed 140

Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("read_json_by_line", "true");
pro.setProperty("strip_outer_array", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder
.setLabelPrefix("label-doris")
.setStreamLoadProp(pro)

    builder.setDorisReadOptions(DorisReadOptions.builder().build())
            .setDorisExecutionOptions(executionBuilder.build())
            .setSerializer(new SimpleStringSerializer()) //serialize according to string
            .setDorisOptions(dorisOptions);

    flatMapDS.sinkTo(builder.build()).name("doris_ticket_ticket");

be日志报错信息
W0301 14:04:39.982507 88105 status.h:393] meet error status: [INTERNAL_ERROR]cancelled: [INTERNAL_ERROR]too many filtered rows

    0#  std::_Function_handler<void (doris::RuntimeState*, doris::Status*), doris::StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<doris::StreamLoadContext>)::$_0>::_M_invoke(std::_Any_data const&, doris::RuntimeState*&&, doris::Status*&&) at

/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:354
1# doris::FragmentMgr::_exec_actual(std::shared_ptr, std::function<void (doris::RuntimeState*, doris::Status*)> const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique
_ptr.h:360
2# std::_Function_handler<void (), doris::FragmentMgr::exec_plan_fragment(doris::TExecPlanFragmentParams const&, std::function<void (doris::RuntimeState*, doris::Status*)> const&)::$_0>::_M_invoke(std::_Any_data const&) at /var/local/ldb_toolchain
/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:701
3# doris::ThreadPool::dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
4# doris::Thread::supervise_thread(void*) at /var/local/ldb_toolchain/bin/../usr/include/pthread.h:562
5# start_thread
6# __clone

    0#  doris::io::StreamLoadPipe::_append(std::shared_ptr<doris::ByteBuffer> const&, unsigned long) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:0
    1#  doris::io::StreamLoadPipe::append(std::shared_ptr<doris::ByteBuffer> const&) at /home/zcp/repo_center/doris_release/doris/be/src/io/fs/stream_load_pipe.cpp:162
    2#  doris::StreamLoadAction::on_chunk_data(doris::HttpRequest*) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:442
    3#  ?
    4#  bufferevent_run_readcb_
    5#  ?
    6#  ?
    7#  ?
    8#  ?
    9#  std::_Function_handler<void (), doris::EvHttpServer::start()::$_0>::_M_invoke(std::_Any_data const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/ext/atomicity.h:98
    10# doris::ThreadPool::dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
    11# doris::Thread::supervise_thread(void*) at /var/local/ldb_toolchain/bin/../usr/include/pthread.h:562
    12# start_thread
    13# __clone
2 Answers

没有,是flinkcdc 写入kafka的规整数据了。用1.0.3的老连接器可以。

有没有返回的errURl信息,看样子是有脏数据导致的