flink 多表 写入到 一张宽表,部分列更新

Viewed 62

1.部分列更新的情况下,各个维表的更新字段不同,写入宽表时,因为维表的字段是少于宽表字段,所以如果 在 DorisExecutionOptions.Builder 里指定写入宽表字段, 会导致少的列为空

请问有没有一些方法能够去解决这个问题,或者说 如何才能去动态的调整我所想要更新的字段

2 Answers

部分列更新我没试过,但我试过以delete方式写入,sink算子底层是stream load嘛,然后声明一个配置类,在配置里写相关的属性,就相当于给那条stream load拼接参数。你可以看下我写这个delete方式追加写入,然后换成你想要的逻辑。
props.setProperty 相当于直接给stream load拼接参数

public DorisSink<String> getDeleteDorisSink(String tableName) {
        Properties props = new Properties();
        props.setProperty("format", "json");
        props.setProperty("read_json_by_line", "true"); // 每行一条 json 数据
        props.setProperty("columns", "id, user_id, `key`, `updated_time`"); // 删除与哪些列相同的
        props.setProperty("merge_type", "DELETE"); // 以DELETE模式追加写
        props.setProperty("function_column.sequence_col", "updated_time");

        DorisSink<String> sink = DorisSink.<String>builder()
                .setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisOptions(DorisOptions.builder() // 设置 doris 的连接参数
                        .setFenodes(QAConstants.DORIS_FE_NODES)
                        .setTableIdentifier(QAConstants.DORIS_DATABASE + "." + tableName)
                        .setUsername(QAConstants.DORIS_CENTOS7_USER)
                        .setPassword(QAConstants.DORIS_CENTOS7_PASSWORD)
                        .build())

                .setDorisExecutionOptions(DorisExecutionOptions.builder() // 执行参数
                        .setLabelPrefix("doris-label-delete-test-slide")  // stream-load 导入的时候的 label 前缀
                        //.disable2PC() // 开启两阶段提交后,labelPrefix 需要全局唯一,为了测试方便禁用两阶段提交
                        .setDeletable(false)
                        .setBufferCount(3) // 批次条数: 默认 3
                        .setBufferSize(8 * 1024) // 批次大小: 默认 1M
                        .setCheckInterval(3000) // 批次输出间隔   三个对批次的限制是或的关系
                        .setMaxRetries(3)
                        .setStreamLoadProp(props) // 设置 stream load 的数据格式 默认是 csv,根据需要改成 json
                        .build())
                .setSerializer(new SimpleStringSerializer())

                .build();
        return sink;
    }

可以在streamload写入的时候,拼接本次要更新的column header。