【已解决】routine-kafka 数据写入数据丢失

Viewed 97

2.0.13版本(1.2.7版本不存在此问题)
1.routine消费多partition ,会出现部分offset 数据不消费
2.使用数组方式写入,有异常数据比如null,整个数组内容都会写入失败

4 Answers

【问题处理】已经解决
【问题状态】json解析格式有些行为别更,修改be参数,设置enable_simdjson_reader=false,现在恢复正常了

  1. 如何判断会出现部分offset 数据不消费
  2. 1.2.7到2.0.13json解析从rapidjson改为simdjson,这里可能是导致不一致的原因,能否提供数据,我们本地验证一下是否是这个问题导致的。

kafka 版本2.8.1
topic 3分区 3副本

1.创建任务语句
CREATE ROUTINE LOAD tmp.test_json_load_job4 ON test_json_load
COLUMNS(
		id,
		name,
		region,
        etl_time=now()
        )
PROPERTIES
(
    "max_batch_interval" = "10",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "strip_outer_array" = "true",
    "format" ="json",
    "jsonpaths" = "[
                    \"$.id\",
                    \"$.city.name\",
                    \"$.city.region\"
                    ]"
)
FROM KAFKA
(
    "kafka_broker_list" = "172.31.3.73:9092,172.31.3.74:9092,172.31.3.75:9092",
    "kafka_topic" = "test_topic",
    "property.group.id" = "test_topic2024080201",
    "property.client.id" = "test_topic2024080201",
    "property.kafka_default_offsets" = "OFFSET_END"
);
2.建表语句
CREATE TABLE tmp.`test_json_load` (
  `id` INT NOT NULL,
  `name` VARCHAR(24)  NULL,
  `region` VARCHAR(30) NULL,
  `etl_time` datetime NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'agv状态信息'
PARTITION BY RANGE(`etl_time`)
(
PARTITION p20240803 VALUES [('2024-08-03 00:00:00'), ('2024-08-04 00:00:00')),
PARTITION p20240804 VALUES [('2024-08-04 00:00:00'), ('2024-08-05 00:00:00')),
PARTITION p20240805 VALUES [('2024-08-05 00:00:00'), ('2024-08-06 00:00:00')))
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "day",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-730",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_allocation" = "tag.location.default: 1",
"dynamic_partition.buckets" = "32",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.history_partition_num" = "20",
"dynamic_partition.hot_partition_num" = "0",
"dynamic_partition.reserved_history_periods" = "NULL",
"dynamic_partition.storage_policy" = "",
"storage_medium" = "hdd",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);
3.测试用例
正常数据
[
    {
        "id": 123,
        "city": {
            "name": "beijing",
            "region": "haidian"
        }
    },
        {
        "id": 456,
        "city": {
            "name": "beijing",
            "region": "haidian"
        }
    }
    
]

异常数据
[
    {
        "id": 789,
        "city": {
            "name": "beijing",
            "region": "haidian"
        }
    },
    {
        "id": 1111,
        "city": null
    }
]