一、使用场景
部分列更新的使用场景主要包括以下几个方面:
- 实时动态更新:在需要频繁更新某些字段的场景中,例如用户标签表中的行为信息更新,以支持广告或推荐系统的实时分析和决策。
- 大宽表拼接:将多张源表的数据合并成一张大宽表,可以通过部分列更新来实现。
- 数据修正:在需要修正某些数据的场景中,部分列更新可以有效减少更新的开销。
- 高频并发写入:部分列更新支持高频的并发写入,适用于需要实时更新大量行但仅涉及少数列的场景。
- 性能优化:在更新少数列时,部分列更新可以显著提高性能,尤其是在涉及大量行的情况下。
这些场景中,部分列更新通过减少不必要的数据写入和锁定,提升了系统的整体性能和响应速度。
注意事项
- 2.0 版本仅在 Unique Key 的 Merge-on-Write 实现中支持了部分列更新能力
- 从 2.0.2 版本开始,支持使用 INSERT INTO 进行部分列更新
- 不支持在有同步物化视图的表上进行部分列更新
二、实践
- doris建表
CREATE TABLE `test_partial_update` (
`id` int(11) NULL,
`value` varchar(20) NULL,
`date_time` datetime(6) not NULL DEFAULT CURRENT_TIMESTAMP,
`dt` datetime(6) default current_timestamp(6) on update current_timestamp(6)
) ENGINE=OLAP
unique KEY(`id`)
COMMENT '部分列更新'
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
on update current_timestamp
是否在该行有列更新时将该列的值更新为当前时间 (
current_timestamp
)。该特性只能在开启了 Merge-on-Write 的 Unique 表上使用,开启了这个特性的列必须声明默认值,且默认值必须为current_timestamp
。如果此处声明了时间戳的精度,则该列默认值中的时间戳精度必须与该处的时间戳精度相同。
- 首先插入一条数据:
mysql> insert into test_partial_update(id, value) values(1, "doris");
Query OK, 1 row affected (0.04 sec)
{'label':'label_3ed18eb2142c42a6_b02373286719ab46', 'status':'VISIBLE', 'txnId':'60072'}
mysql> select * from test_partial_update;
+------+-------+----------------------------+----------------------------+
| id | value | date_time | dt |
+------+-------+----------------------------+----------------------------+
| 1 | doris | 2024-10-13 10:45:22.000000 | 2024-10-13 10:45:22.257000 |
+------+-------+----------------------------+----------------------------+
1 row in set (0.01 sec)
2.1 insert
如果只想更改value值,并且保留数据的插入时间,可以开启insert的部分列更新功能并指定插入的列名:
set enable_unique_key_partial_update=true;
mysql> insert into test_partial_update(id, value) values(1, "SelectDB");
Query OK, 1 row affected (0.03 sec)
{'label':'label_6a1e2c7306e84c81_a3ba841f7b302bb3', 'status':'VISIBLE', 'txnId':'60074'}
mysql> select * from test_partial_update;
+------+----------+----------------------------+----------------------------+
| id | value | date_time | dt |
+------+----------+----------------------------+----------------------------+
| 1 | SelectDB | 2024-10-13 10:45:22.000000 | 2024-10-13 10:48:12.619000 |
+------+----------+----------------------------+----------------------------+
1 row in set (0.01 sec)
可以看出此时date_time字段记录了数据的插入时间,dt字段记录了数据的更新时间。
2.2 stream load
- 新建一份数据
vim test_partial_update.csv
1, "SelectDB"
- 在columns处指定key列(必须包含所有 key 列,不然无法更新)以及要更新的value列,设置header:"partial_columns:true":
curl --location-trusted -u root:123456 -T test_partial_update.csv -H "format:csv" -H "column_separator:," -H "partial_columns:true" -H "columns:id, value" http://10.16.10.6:8030/api/demo/test_partial_update/_stream_load
- 查询数据可知date_time字段记录了数据的插入时间,dt字段记录了数据的更新时间。
mysql> select * from test_partial_update;
+------+-------------+----------------------------+----------------------------+
| id | value | date_time | dt |
+------+-------------+----------------------------+----------------------------+
| 1 | "SelectDB" | 2024-10-13 10:45:22.000000 | 2024-10-13 11:11:51.795000 |
+------+-------------+----------------------------+----------------------------+
1 row in set (0.01 sec)
2.3 Flink Connector
- mysql建表,插入数据:
CREATE TABLE `test_partial_update` (
`id` INT(11) NULL,
`value` VARCHAR(20) NULL,
`date_time` DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
`dt` DATETIME(6) DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6)
);
mysql> insert into test_partial_update(id, value) values(1, "SelectDB");
Query OK, 1 row affected (0.00 sec)
mysql> select * from test_partial_update;
+------+----------+----------------------------+----------------------------+
| id | value | date_time | dt |
+------+----------+----------------------------+----------------------------+
| 1 | SelectDB | 2024-10-14 16:08:14.058728 | 2024-10-14 16:08:14.058728 |
+------+----------+----------------------------+----------------------------+
1 row in set (0.00 sec)
- 进入flink
./bin/sql-client.sh embedded
- 开启Checkpoint
Flink作业周期性执行checkpoint,记录Binlog位点,当作业发生Failover时,便会从之前记录的Binlog位点继续处理。
Flink SQL> SET execution.checkpointing.interval = 3s;
- 创建MySQL CDC表和Dois sink表,并设置'sink.properties.partial_columns' = 'true'
CREATE TABLE cdc_mysql_source (
id INT,
`value` STRING,
date_time TIMESTAMP,
dt TIMESTAMP
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '10.16.10.6',
'port' = '3326',
'username' = 'root',
'password' = '123456',
'database-name' = 'test',
'scan.incremental.snapshot.chunk.key-column'='id',
'table-name' = 'test_partial_update'
);
CREATE TABLE doris_sink (
id INT,
`value` STRING,
date_time TIMESTAMP,
dt TIMESTAMP
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'demo.test_partial_update',
'username' = 'root',
'password' = '123456',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.properties.columns' = 'id,name,bank,age',
'sink.properties.partial_columns' = 'true' -- 开启部分列更新
);
insert into doris_sink(id, `value`) select id,`value` from cdc_mysql_source;
- 去 doris 查询数据可知date_time字段记录了数据的插入时间,dt字段记录了数据的更新时间。
mysql> select * from test_partial_update;
+------+-------------+----------------------------+----------------------------+
| id | value | date_time | dt |
+------+-------------+----------------------------+----------------------------+
| 1 | "SelectDB" | 2024-10-13 10:45:22.000000 | 2024-10-14 16:11:23.546000 |
+------+-------------+----------------------------+----------------------------+
1 row in set (0.01 sec)
常见问题FAQ
Q1. 如果开启了部分列更新之后插入报错"errCode = 2, detailMessage = Insert has filtered data in strict mode"
- 原因:控制 insert 语句是否开启严格模式的会话变量enable_insert_strict的默认值为 true,即 insert 语句默认开启严格模式,而在严格模式下进行部分列更新不允许更新不存在的 key。所以,在使用 insert 语句进行部分列更新的时候如果希望能插入不存在的 key,需要在enable_unique_key_partial_update设置为 true 的基础上同时将enable_insert_strict设置为 false。
解决方法: set enable_insert_strict=false;
Q2. MySQL CDC表数据同步失败。
- 原因1:mysql未开启binlog
解决办法:
可以通过以下命令检查 binlog 是否已启用:
SHOW VARIABLES LIKE 'log_bin';
如果 log_bin
的值是 OFF
,需要在 MySQL 配置文件中启用 binlog:
[mysqld]
log_bin = mysql-bin
binlog_format = ROW
server_id = 1
重启 MySQL 服务后生效。
-
原因2:检查 MySQL 表
test_partial_update
的字段类型与 Flink 表 DDL 是否匹配。字段类型不匹配可能会导致数据读取问题。 -
原因3:flink 任务报错 MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
解决方法:set time_zone='+8:00'; #或者 set persist time_zone='+8:00';