问题:这里入库的时间是按着获取第一条kafka数据的时间
修改 max_batch_interval 参数为1 后,偶尔也会出现1.2条入库时间小于kafka数据创建时间的问题
1、创建kafka测试表A
CREATE TABLE `kafka_testA` (
`id` INT NULL,
`name` VARCHAR(50) NULL,
`age` VARCHAR(50)NULL,
`create_date` DATETIME NULL COMMENT "kafka 数据生成时间",
`update_date` DATETIME NULL DEFAULT CURRENT_TIMESTAMP COMMENT "doris 入库时间"
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
2、Routine Load 导入作业
CREATE ROUTINE LOAD db_test.routine_load_csv ON kafka_testA
COLUMNS TERMINATED BY ",",
COLUMNS(id, name,age,create_date)
PROPERTIES(
"desired_concurrent_number"="1",
"max_batch_interval"="10",
"format"="json",
"jsonpaths"="[\"$.id\",\"$.name\",\"$.age\",\"$.create_date\"]"
)
FROM KAFKA(
"kafka_broker_list" = "XXXXXX:9092",
"kafka_topic" = "kafaka_test",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
3、模拟kafka数据数据
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 创建用于连接Kafka的Properties配置
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.31.129:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class); // Example
// JsonDeserializer
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
//2. 创建一个生产者对象KafkaProducer
for (int i = 0; i < 1000000; i++) {
//这里睡眠1.5秒 才勉强保持和doris max_batch_interval=1 的 入库时间一致
Thread.sleep(1500);
SimpleDateFormat sdf = new SimpleDateFormat( " yyyy-MM-dd HH:mm:ss " );
String format = sdf.format(new Date());
UserPojo build =UserPojo.builder().id(i).name("N" + i).age(23).create_date((format)).build();
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(build);
// 获取返回值Future,该对象封装了返回值
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("kafaka_test",String.valueOf(i),jsonObject.toJSONString()));
// 调用一个Future.get()方法等待响应 同步阻塞
RecordMetadata recordMetadata = future.get();
}
// 5. 关闭生产者
producer.close();
}
4、查看情况