我把flink实时读取MySQL写入到Doris2.0.4时候,想做到实时写入,比如MySQL一条数据到flink直接写入到Doris,但是实际观察发现,是根据flink的checkpoint时间批量提交到doris里面。
flink链路图里面有两个flink的Doris-sink,一个读取flink数据,一个是commit,我用的是阿里云的flink,mysql是aws上面,连接器用的是下面说的2.0版本,官方文档推荐的。
麻烦问一下各位大佬有没有什么办法能够让MySQL来一条直接写入到doris,不需要等待checkpoint提交再进行批量提交,我想用streaming模式,真正实时的,请教一下各位大佬有什么方法或者参数吗?
<!-- 连接器用的2.0官方推荐版本,flink-doris-connector -->
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.16</artifactId>
<version>1.5.2</version>
</dependency>
flink代码如下:
CREATE TEMPORARY TABLE mysql_user_cdc_source (
id INT,
country_code string
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = '111.222.222.222',
'port' = '3306',
'username' = 'bytedance',
'password' = 'xxxx',
'database-name' = 'tiktok',
'table-name' = 'user',
'server-id'='1111-1120',
'server-time-zone'='UTC',
'scan.startup.mode' = 'latest-offset'
);
-- 支持同步 insert/update/delete 事件
CREATE TEMPORARY TABLE doris_user_cdc_sink (
`id` bigint ,
`country_code` string
)
WITH (
'connector' = 'doris',
'fenodes' = '1111.111.111:8030',
'table.identifier' = 'bytedance',
'username' = 'xxx',
'password' = 'xxx',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true', -- 同步删除事件
'sink.label-prefix' = 'doris_label'
);
insert into doris_user_cdc_sink
select
`id` ,
`country_code`
from mysql_user_cdc_source;