表结构:
CREATE TABLE biz_pub_data_current_instant
(
DATA_ID
VARCHAR(32) NOT NULL,
RECEIVE_TIME
DATETIME NULL DEFAULT CURRENT_TIMESTAMP,
I_L1
DECIMAL(20, 5) NULL,
I_L2
DECIMAL(20, 5) NULL,
I_L3
DECIMAL(20, 5) NULL
) ENGINE=OLAP
UNIQUE KEY(DATA_ID
)
DISTRIBUTED BY HASH(DATA_ID
) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V1",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728",
"enable_mow_light_delete" = "false"
);
ROUTINE LOAD:
CREATE ROUTINE LOAD demo.CURRENT_INSTANT ON biz_pub_data_current_instant
COLUMNS TERMINATED BY ",",
COLUMNS(DATA_ID,TV,I_L1,I_L2,I_L3)
PROPERTIES
(
"desired_concurrent_number" = "10",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "["$.DATA_ID","$.TV","$.I_L1","$.I_L2","$.I_L3"]",
"strip_outer_array" = "false",
"partial_columns"="true"
)
FROM KAFKA
(
"kafka_broker_list" = "xxxx:9092",
"kafka_topic" = "BIZ_PUB_DATA_CURRENT_INSTANT",
"property.group_id" = "CURRENT_INSTANT",
"property.client_id" = "STREAM",
"property.kafka_default_offsets" = "OFFSET_END"
);
KAFKA中报文:
333300000018,2024-08-21 13:45:00,BIZ_PUB_DATA_CURRENT_INSTANT#{"DATA_ID":"333300000018","TV":"2024-08-21 13:45:00","I_L1":"25.28","I_L2":"81.94"}
333300000018,2024-08-21 13:45:00,BIZ_PUB_DATA_CURRENT_INSTANT#{"DATA_ID":"333300000018","TV":"2024-08-21 13:45:00","I_L3":"161.42"}
分别发送两条报文,最后第一条报文的数据被覆盖,只有第二条报文的数据,部分列更新没有生效。
版本:2.1.5