2.0.13版本(1.2.7版本不存在此问题)
1.routine消费多partition ,会出现部分offset 数据不消费
2.使用数组方式写入,有异常数据比如null,整个数组内容都会写入失败
2.0.13版本(1.2.7版本不存在此问题)
1.routine消费多partition ,会出现部分offset 数据不消费
2.使用数组方式写入,有异常数据比如null,整个数组内容都会写入失败
kafka 版本2.8.1
topic 3分区 3副本
1.创建任务语句
CREATE ROUTINE LOAD tmp.test_json_load_job4 ON test_json_load
COLUMNS(
id,
name,
region,
etl_time=now()
)
PROPERTIES
(
"max_batch_interval" = "10",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"strip_outer_array" = "true",
"format" ="json",
"jsonpaths" = "[
\"$.id\",
\"$.city.name\",
\"$.city.region\"
]"
)
FROM KAFKA
(
"kafka_broker_list" = "172.31.3.73:9092,172.31.3.74:9092,172.31.3.75:9092",
"kafka_topic" = "test_topic",
"property.group.id" = "test_topic2024080201",
"property.client.id" = "test_topic2024080201",
"property.kafka_default_offsets" = "OFFSET_END"
);
2.建表语句
CREATE TABLE tmp.`test_json_load` (
`id` INT NOT NULL,
`name` VARCHAR(24) NULL,
`region` VARCHAR(30) NULL,
`etl_time` datetime NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'agv状态信息'
PARTITION BY RANGE(`etl_time`)
(
PARTITION p20240803 VALUES [('2024-08-03 00:00:00'), ('2024-08-04 00:00:00')),
PARTITION p20240804 VALUES [('2024-08-04 00:00:00'), ('2024-08-05 00:00:00')),
PARTITION p20240805 VALUES [('2024-08-05 00:00:00'), ('2024-08-06 00:00:00')))
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "day",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-730",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_allocation" = "tag.location.default: 1",
"dynamic_partition.buckets" = "32",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.history_partition_num" = "20",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"storage_medium" = "hdd",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);
3.测试用例
正常数据
[
{
"id": 123,
"city": {
"name": "beijing",
"region": "haidian"
}
},
{
"id": 456,
"city": {
"name": "beijing",
"region": "haidian"
}
}
]
异常数据
[
{
"id": 789,
"city": {
"name": "beijing",
"region": "haidian"
}
},
{
"id": 1111,
"city": null
}
]