Flink Doris Connector 往 DORIS 插入数据,往表中新增了一个字段,默认值不生效

Viewed 61

Doris 版本: 1.2.8
Flink Doris Connector版本: 1.6.1


Flink Doris Connector 往 DORIS 插入数据,往表中新增了一个字段,默认值给的是0。

历史数据默认值是 0 没问题,但是 不改代码的情况下,新插入的数据默认值都是 NULL。

麻烦问下,这个问题怎么解决呢

1 Answers

企业微信截图_17189416135396.png

企业微信截图_17189415914084.png

Doris 表结构如下:

CREATE TABLE `gateway_number_report` (
  `result_date` date NOT NULL COMMENT '日期',
  `company_id` varchar(36) NOT NULL COMMENT '公司id',
  `inbound_calls` int(11) SUM NULL DEFAULT "0" COMMENT '呼入电话数量',
  `outbound_calls` int(11) SUM NULL DEFAULT "0" COMMENT '外呼数'
) ENGINE=OLAP
AGGREGATE KEY(`result_date`, `company_id`)
COMMENT 'OLAP'
PARTITION BY RANGE(`result_date`)()
DISTRIBUTED BY HASH(`company_id`) BUCKETS 5
PROPERTIES (
"light_schema_change" = "true",
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "5",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.history_partition_num" = "3"
);

Flink 往 Doris 写数据代码如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment

    val jsonStr =
      """
        |{
        |    "result_date": "2024-06-21",
        |    "company_id": "3",
        |    "inbound_calls": 1,
        |    "outbound_calls": 1
        |}
        |""".stripMargin

    val element = StringUtils.replace(jsonStr, StringUtils.SPACE, StringUtils.EMPTY)
      .replace(System.lineSeparator(), StringUtils.EMPTY)

    val dorisDataStream = env.fromElements[String](element)

    val pro = new Properties()
    pro.setProperty("format", "json")
    pro.setProperty("read_json_by_line", "true")

    val fenodes = "XXX"
    val dorisUserName = "XXX"
    val dorisPassword = "XXX"

    val builder = DorisSink.builder[String]
    val dorisBuilder = DorisOptions.builder
    dorisBuilder.setFenodes(fenodes)
      .setTableIdentifier("sobot_db.gateway_number_report")
      .setUsername(dorisUserName)
      .setPassword(dorisPassword)

    val executionBuilder = DorisExecutionOptions.builder
    executionBuilder.setLabelPrefix(s"label-gateway_number_rpt${System.currentTimeMillis}") 
      .setStreamLoadProp(pro)
      .setDeletable(false)

    builder.setDorisReadOptions(DorisReadOptions.builder.build)
      .setDorisExecutionOptions(executionBuilder.build)
      .setSerializer(new SimpleStringSerializer)
      .setDorisOptions(dorisBuilder.build)

    duplicateModelJSONStrDS.sinkTo(builder.build())
      

修改表结构的预计如下:

ALTER TABLE sobot_db.gateway_number_report 
ADD COLUMN `inbound_answered_after_loss_calls` INT SUM NULL DEFAULT "0" COMMENT '系统应答后放弃呼损数';