mysql-flink-doris实时同步数据Doris并不能够实时提交,大概延迟1-2分钟左右提交

Viewed 107


麻烦问一下各位大佬,flink实时同步MySQL数据到doris,比如一条数据进来之后,数据进入到flink,然后进入到doris,然后我看数据是卡在doris的commiter这里,麻烦问一下各位大佬,有没有对应的参数能够让我这个commit快一些,实时写入,或者一两秒一个批次那种.

代码如下:

CREATE  TEMPORARY TABLE mysqlcdc_source (
    id INT,
    name VARCHAR,
    age int,
    create_time TIMESTAMP,
    PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = 'xxxxxx',
  'port' = '3306',
  'username' = 'xxxx',
  'password' = 'xxxxx',
  'database-name' = 'db1',
  'table-name' = 'test_flink_2',
  'server-id'='12345',--mysql消费id,避免冲突
  'server-time-zone'='UTC',
  'scan.startup.mode' = 'latest-offset'

);


-- 支持同步 insert/update/delete 事件
CREATE TEMPORARY TABLE doris_sink (
id INT,
name STRING,
age int,
create_time TIMESTAMP
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'xxxx:8030',
  'table.identifier' = 'testdb.test_flink2',
  'username' = 'roxxxot',
  'password' = 'xxxx',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.enable-delete' = 'true',  -- 同步删除事件
  'sink.label-prefix' = 'doris_label'
);
insert into doris_sink select id,name,age,create_time from mysqlcdc_source;

3 Answers

提供一下信息:

  1. flink-connector-doris版本信息,doris 版本信息
  2. 确认下采用的是batch还是streaming模式,如果streaming ,checkpoint的设置时间是? 如果batch,sink.buffer-flush.max-rows/buffer-flush.max-bytes/sink.buffer-flush.interval设置多少
  3. 观察flink taskmanager日志,mysql 端进行数据变化,taskmanager日志中进行stream load 导入的时间是多少?
  4. 如果是写入慢了,看看负载情况,打开导入profile看看
org.apache.doris flink-doris-connector-1.16 1.5.2