ROUTINE LOAD部分列更新覆盖其他列

Viewed 59

表结构:
CREATE TABLE biz_pub_data_current_instant (
DATA_ID VARCHAR(32) NOT NULL,
RECEIVE_TIME DATETIME NULL DEFAULT CURRENT_TIMESTAMP,
I_L1 DECIMAL(20, 5) NULL,
I_L2 DECIMAL(20, 5) NULL,
I_L3 DECIMAL(20, 5) NULL
) ENGINE=OLAP
UNIQUE KEY(DATA_ID)
DISTRIBUTED BY HASH(DATA_ID) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V1",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728",
"enable_mow_light_delete" = "false"
);
ROUTINE LOAD:
CREATE ROUTINE LOAD demo.CURRENT_INSTANT ON biz_pub_data_current_instant
COLUMNS TERMINATED BY ",",
COLUMNS(DATA_ID,TV,I_L1,I_L2,I_L3)
PROPERTIES
(
"desired_concurrent_number" = "10",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "["$.DATA_ID","$.TV","$.I_L1","$.I_L2","$.I_L3"]",
"strip_outer_array" = "false",
"partial_columns"="true"
)
FROM KAFKA
(
"kafka_broker_list" = "xxxx:9092",
"kafka_topic" = "BIZ_PUB_DATA_CURRENT_INSTANT",
"property.group_id" = "CURRENT_INSTANT",
"property.client_id" = "STREAM",
"property.kafka_default_offsets" = "OFFSET_END"
);

KAFKA中报文:
333300000018,2024-08-21 13:45:00,BIZ_PUB_DATA_CURRENT_INSTANT#{"DATA_ID":"333300000018","TV":"2024-08-21 13:45:00","I_L1":"25.28","I_L2":"81.94"}
333300000018,2024-08-21 13:45:00,BIZ_PUB_DATA_CURRENT_INSTANT#{"DATA_ID":"333300000018","TV":"2024-08-21 13:45:00","I_L3":"161.42"}

分别发送两条报文,最后第一条报文的数据被覆盖,只有第二条报文的数据,部分列更新没有生效。
版本:2.1.5

1 Answers

没明白您的描述,部分列更新就是根据KEY列对指定 value 列进行更新的,因为你两条数据中的KEY是一样的,你导入的字段指定了所有的value列,只剩最后一条是正常的。

部分列更新:
比如kafka发了两条数据:
order_id,order_amount,order_status
1,100,待付款
1,100,待发货

order_id是KEY,你要更新order_status 这个字段,只需要指定KEY列和更新列 -H "columns:order_id,order_status" 这两个字段。