官方文档中提到使用 Flink Connector进行部分列更新, 需要添加如下配置:'sink.properties.partial_columns' = 'true',不知道是否适用于flink doris connector,能不能写一个demo。目前只看到使用 FlinkSQL 通过 CDC 接入并实现部分列更新示例。
官方文档中提到使用 Flink Connector进行部分列更新, 需要添加如下配置:'sink.properties.partial_columns' = 'true',不知道是否适用于flink doris connector,能不能写一个demo。目前只看到使用 FlinkSQL 通过 CDC 接入并实现部分列更新示例。
可以使用flink doris connector,代码如下:
DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(dorisFeNodes)
.setTableIdentifier(tableName)
.setUsername(dorisUsername)
.setPassword(dorisPassword);
logger.info("Doris节点信息为:{}",dorisFeNodes);
Properties pro = new Properties();
//json data format
pro.setProperty("format", "json");
pro.setProperty("read_json_by_line", "true");
pro.setProperty("max_filter_ratio", "0.999");
// pro.setProperty("sink.properties.partial_columns", "true");
// pro.setProperty("partial_columns", "true");
DorisExecutionOptions executionOptions = DorisExecutionOptions.builder()
.setLabelPrefix(labelPrefix)
.setStreamLoadProp(pro).build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
.setSerializer(new SimpleStringSerializer())
.setDorisOptions(dorisBuilder.build());
pro.setProperty("columns", "serialno,insert_time"); 加上这个可以实现部分列更新