上游日志格式是 maxwell 采集的 , 取出了json里的data json字符串,并根据maxwell的删除标志手动添加了 __DORIS_DELETE_SIGN__字段,删除时__DORIS_DELETE_SIGN__字段值为1,否则为0.
doris版本 1.2.2
doris sink代码如下
//获取doris sink
public static DorisSink<String> getDorisSink(String tableName) {
DorisSink.Builder<String> builder = DorisSink.builder();
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("read_json_by_line", "true");
pro.setProperty("hidden_columns",DORIS_DELETE_SIGN);
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("xxxxx:8030")
.setTableIdentifier(tableName)
.setUsername("root")
.setPassword("123456");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setStreamLoadProp(pro)
.setLabelPrefix("label-" + UUID.randomUUID())
.setDeletable(true)
;
DorisSink<String> dorisSink = builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisBuilder.build())
.setSerializer(
JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisBuilder.build())
.build()
)
.build();
return dorisSink;
}