ROUTINE LOAD 入库时间有误差

Viewed 84

问题:这里入库的时间是按着获取第一条kafka数据的时间
修改 max_batch_interval 参数为1 后,偶尔也会出现1.2条入库时间小于kafka数据创建时间的问题
image.png
1、创建kafka测试表A

CREATE TABLE `kafka_testA` (
  `id` INT NULL,
  `name` VARCHAR(50) NULL,
  `age` VARCHAR(50)NULL,
  `create_date` DATETIME NULL COMMENT "kafka 数据生成时间",
  `update_date` DATETIME NULL DEFAULT CURRENT_TIMESTAMP COMMENT "doris 入库时间"
) ENGINE=OLAP
DUPLICATE  KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);

2、Routine Load 导入作业

CREATE ROUTINE LOAD db_test.routine_load_csv ON kafka_testA
COLUMNS TERMINATED BY ",",
COLUMNS(id, name,age,create_date)
PROPERTIES(
	"desired_concurrent_number"="1",
	"max_batch_interval"="10",
    "format"="json",
    "jsonpaths"="[\"$.id\",\"$.name\",\"$.age\",\"$.create_date\"]"
)
FROM KAFKA(
    "kafka_broker_list" = "XXXXXX:9092",
    "kafka_topic" = "kafaka_test",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

3、模拟kafka数据数据

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 创建用于连接Kafka的Properties配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.31.129:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("enable.idempotence", "true");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class); // Example
        // JsonDeserializer
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        //2. 创建一个生产者对象KafkaProducer

        for (int i = 0; i < 1000000; i++) {
            //这里睡眠1.5秒 才勉强保持和doris max_batch_interval=1 的 入库时间一致
            Thread.sleep(1500);
            SimpleDateFormat sdf = new SimpleDateFormat( " yyyy-MM-dd HH:mm:ss " );
            String format = sdf.format(new Date());
            UserPojo build =UserPojo.builder().id(i).name("N" + i).age(23).create_date((format)).build();
            JSONObject jsonObject = (JSONObject) JSONObject.toJSON(build);
            // 获取返回值Future,该对象封装了返回值
            Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("kafaka_test",String.valueOf(i),jsonObject.toJSONString()));
            // 调用一个Future.get()方法等待响应  同步阻塞
            RecordMetadata recordMetadata = future.get();
        }
        // 5. 关闭生产者
        producer.close();
    }

4、查看情况
image.png

2 Answers

这个现象是正常的,一个task的所有时间都会和第一条的时间一样,因为规划的时候就把这个时间定下来了。

image.png
无法复现你的情况。我这数据没有小于生成时间的,入库时间均大于生成时间,是否你的服务器时钟未同步或有偏差。
后来我把Thread.sleep(1500);注掉当成性能测试时,出现有入库小于生成的,但时间仅差1秒。在39万中有3000条左右这类的数据。