flink实时写入doris2.0.4数据延迟问题

Viewed 66

我把flink实时读取MySQL写入到Doris2.0.4时候,想做到实时写入,比如MySQL一条数据到flink直接写入到Doris,但是实际观察发现,是根据flink的checkpoint时间批量提交到doris里面。

flink链路图里面有两个flink的Doris-sink,一个读取flink数据,一个是commit,我用的是阿里云的flink,mysql是aws上面,连接器用的是下面说的2.0版本,官方文档推荐的。

麻烦问一下各位大佬有没有什么办法能够让MySQL来一条直接写入到doris,不需要等待checkpoint提交再进行批量提交,我想用streaming模式,真正实时的,请教一下各位大佬有什么方法或者参数吗?

<!-- 连接器用的2.0官方推荐版本,flink-doris-connector -->
<dependency>
  <groupId>org.apache.doris</groupId>
  <artifactId>flink-doris-connector-1.16</artifactId>
  <version>1.5.2</version>
</dependency>  

flink代码如下:
CREATE  TEMPORARY TABLE mysql_user_cdc_source (
    id INT,
    country_code string
    PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '111.222.222.222',
  'port' = '3306',
  'username' = 'bytedance',
  'password' = 'xxxx',
  'database-name' = 'tiktok',
  'table-name' = 'user',
  'server-id'='1111-1120',
  'server-time-zone'='UTC',
  'scan.startup.mode' = 'latest-offset'
);

-- 支持同步 insert/update/delete 事件
CREATE TEMPORARY TABLE doris_user_cdc_sink (
  `id` bigint ,
  `country_code` string 
) 
WITH (
  'connector' = 'doris',
  'fenodes' = '1111.111.111:8030',
  'table.identifier' = 'bytedance',
  'username' = 'xxx',
  'password' = 'xxx',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.enable-delete' = 'true',  -- 同步删除事件
  'sink.label-prefix' = 'doris_label'

);

insert into doris_user_cdc_sink 
select 
  `id` ,
  `country_code` 
 from mysql_user_cdc_source;

1 Answers

【问题状态】处理中
【问题处理】实时流处理目前不支持,现在只能攒批写入,可以把checkpoint时间设置的稍微短一点