Flink Doris Connector 部分列更新

Viewed 158

官方文档中提到使用 Flink Connector进行部分列更新, 需要添加如下配置:'sink.properties.partial_columns' = 'true',不知道是否适用于flink doris connector,能不能写一个demo。目前只看到使用 FlinkSQL 通过 CDC 接入并实现部分列更新示例。

1 Answers

可以使用flink doris connector,代码如下:

DorisSink.Builder<String> builder = DorisSink.builder();
        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
        dorisBuilder.setFenodes(dorisFeNodes)
                .setTableIdentifier(tableName)
                .setUsername(dorisUsername)
                .setPassword(dorisPassword);
        logger.info("Doris节点信息为:{}",dorisFeNodes);

        Properties pro = new Properties();
        //json data format
        pro.setProperty("format", "json");
        pro.setProperty("read_json_by_line", "true");
        pro.setProperty("max_filter_ratio", "0.999");
//        pro.setProperty("sink.properties.partial_columns", "true");
 //       pro.setProperty("partial_columns", "true");
        DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
                .setLabelPrefix(labelPrefix)
                .setStreamLoadProp(pro).build();

        builder.setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisExecutionOptions(executionOptions)
                .setSerializer(new SimpleStringSerializer())
                .setDorisOptions(dorisBuilder.build());

pro.setProperty("columns", "serialno,insert_time"); 加上这个可以实现部分列更新