【缺少关键信息】上游是maxwell采集的日志格式 Doris sink无法实现删除功能

Viewed 43

上游日志格式是 maxwell 采集的 , 取出了json里的data json字符串,并根据maxwell的删除标志手动添加了 __DORIS_DELETE_SIGN__字段,删除时__DORIS_DELETE_SIGN__字段值为1,否则为0.
doris版本 1.2.2
doris sink代码如下

 //获取doris sink
    public static DorisSink<String> getDorisSink(String tableName) {
        DorisSink.Builder<String> builder = DorisSink.builder();
        final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();

        Properties pro = new Properties();
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");
        pro.setProperty("hidden_columns",DORIS_DELETE_SIGN);
        
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder.setFenodes("xxxxx:8030")
                .setTableIdentifier(tableName)
                .setUsername("root")
                .setPassword("123456");

        DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
        executionBuilder.setStreamLoadProp(pro)
                .setLabelPrefix("label-" + UUID.randomUUID())
                .setDeletable(true)
        ;

        DorisSink<String> dorisSink = builder.setDorisReadOptions(readOptionBuilder.build())
                .setDorisExecutionOptions(executionBuilder.build())
                .setDorisOptions(dorisBuilder.build())
                .setSerializer(
                        JsonDebeziumSchemaSerializer.builder()
                                                    .setDorisOptions(dorisBuilder.build())
                                                    .build()
                )
                .build();
        return dorisSink;
    }
1 Answers

可以贴下上游的数据格式