Doris 版本: 1.2.8
Flink Doris Connector版本: 1.6.1
Flink Doris Connector 往 DORIS 插入数据,往表中新增了一个字段,默认值给的是0。
历史数据默认值是 0 没问题,但是 不改代码的情况下,新插入的数据默认值都是 NULL。
麻烦问下,这个问题怎么解决呢
Doris 版本: 1.2.8
Flink Doris Connector版本: 1.6.1
Flink Doris Connector 往 DORIS 插入数据,往表中新增了一个字段,默认值给的是0。
历史数据默认值是 0 没问题,但是 不改代码的情况下,新插入的数据默认值都是 NULL。
麻烦问下,这个问题怎么解决呢
Doris 表结构如下:
CREATE TABLE `gateway_number_report` (
`result_date` date NOT NULL COMMENT '日期',
`company_id` varchar(36) NOT NULL COMMENT '公司id',
`inbound_calls` int(11) SUM NULL DEFAULT "0" COMMENT '呼入电话数量',
`outbound_calls` int(11) SUM NULL DEFAULT "0" COMMENT '外呼数'
) ENGINE=OLAP
AGGREGATE KEY(`result_date`, `company_id`)
COMMENT 'OLAP'
PARTITION BY RANGE(`result_date`)()
DISTRIBUTED BY HASH(`company_id`) BUCKETS 5
PROPERTIES (
"light_schema_change" = "true",
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "5",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.history_partition_num" = "3"
);
Flink 往 Doris 写数据代码如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val jsonStr =
"""
|{
| "result_date": "2024-06-21",
| "company_id": "3",
| "inbound_calls": 1,
| "outbound_calls": 1
|}
|""".stripMargin
val element = StringUtils.replace(jsonStr, StringUtils.SPACE, StringUtils.EMPTY)
.replace(System.lineSeparator(), StringUtils.EMPTY)
val dorisDataStream = env.fromElements[String](element)
val pro = new Properties()
pro.setProperty("format", "json")
pro.setProperty("read_json_by_line", "true")
val fenodes = "XXX"
val dorisUserName = "XXX"
val dorisPassword = "XXX"
val builder = DorisSink.builder[String]
val dorisBuilder = DorisOptions.builder
dorisBuilder.setFenodes(fenodes)
.setTableIdentifier("sobot_db.gateway_number_report")
.setUsername(dorisUserName)
.setPassword(dorisPassword)
val executionBuilder = DorisExecutionOptions.builder
executionBuilder.setLabelPrefix(s"label-gateway_number_rpt${System.currentTimeMillis}")
.setStreamLoadProp(pro)
.setDeletable(false)
builder.setDorisReadOptions(DorisReadOptions.builder.build)
.setDorisExecutionOptions(executionBuilder.build)
.setSerializer(new SimpleStringSerializer)
.setDorisOptions(dorisBuilder.build)
duplicateModelJSONStrDS.sinkTo(builder.build())
修改表结构的预计如下:
ALTER TABLE sobot_db.gateway_number_report
ADD COLUMN `inbound_answered_after_loss_calls` INT SUM NULL DEFAULT "0" COMMENT '系统应答后放弃呼损数';