【最佳实践】部分列更新,适用范围:2.0.2 及之后的版本

Viewed 152

一、使用场景

部分列更新的使用场景主要包括以下几个方面:

  1. 实时动态更新:在需要频繁更新某些字段的场景中,例如用户标签表中的行为信息更新,以支持广告或推荐系统的实时分析和决策。
  2. 大宽表拼接:将多张源表的数据合并成一张大宽表,可以通过部分列更新来实现。
  3. 数据修正:在需要修正某些数据的场景中,部分列更新可以有效减少更新的开销。
  4. 高频并发写入:部分列更新支持高频的并发写入,适用于需要实时更新大量行但仅涉及少数列的场景。
  5. 性能优化:在更新少数列时,部分列更新可以显著提高性能,尤其是在涉及大量行的情况下。

这些场景中,部分列更新通过减少不必要的数据写入和锁定,提升了系统的整体性能和响应速度。

注意事项

  1. 2.0 版本仅在 Unique Key 的 Merge-on-Write 实现中支持了部分列更新能力
  2. 从 2.0.2 版本开始,支持使用 INSERT INTO 进行部分列更新
  3. 不支持在有同步物化视图的表上进行部分列更新

二、实践

  • doris建表
CREATE TABLE `test_partial_update` (
  `id` int(11) NULL,
  `value` varchar(20) NULL,
  `date_time` datetime(6) not NULL DEFAULT CURRENT_TIMESTAMP,
  `dt` datetime(6) default current_timestamp(6) on update current_timestamp(6)
) ENGINE=OLAP
unique KEY(`id`)
COMMENT '部分列更新'
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

on update current_timestamp

是否在该行有列更新时将该列的值更新为当前时间 (current_timestamp)。该特性只能在开启了 Merge-on-Write 的 Unique 表上使用,开启了这个特性的列必须声明默认值,且默认值必须为 current_timestamp。如果此处声明了时间戳的精度,则该列默认值中的时间戳精度必须与该处的时间戳精度相同。

  • 首先插入一条数据:
mysql> insert into test_partial_update(id, value) values(1, "doris");
Query OK, 1 row affected (0.04 sec)
{'label':'label_3ed18eb2142c42a6_b02373286719ab46', 'status':'VISIBLE', 'txnId':'60072'}

mysql> select * from test_partial_update;
+------+-------+----------------------------+----------------------------+
| id   | value | date_time                  | dt                         |
+------+-------+----------------------------+----------------------------+
|    1 | doris | 2024-10-13 10:45:22.000000 | 2024-10-13 10:45:22.257000 |
+------+-------+----------------------------+----------------------------+
1 row in set (0.01 sec)

2.1 insert

如果只想更改value值,并且保留数据的插入时间,可以开启insert的部分列更新功能并指定插入的列名:

set enable_unique_key_partial_update=true;
mysql> insert into test_partial_update(id, value) values(1, "SelectDB");
Query OK, 1 row affected (0.03 sec)
{'label':'label_6a1e2c7306e84c81_a3ba841f7b302bb3', 'status':'VISIBLE', 'txnId':'60074'}

mysql> select * from test_partial_update;
+------+----------+----------------------------+----------------------------+
| id   | value    | date_time                  | dt                         |
+------+----------+----------------------------+----------------------------+
|    1 | SelectDB | 2024-10-13 10:45:22.000000 | 2024-10-13 10:48:12.619000 |
+------+----------+----------------------------+----------------------------+
1 row in set (0.01 sec)

可以看出此时date_time字段记录了数据的插入时间,dt字段记录了数据的更新时间。

2.2 stream load

  • 新建一份数据

vim test_partial_update.csv

1, "SelectDB"
  • 在columns处指定key列(必须包含所有 key 列,不然无法更新)以及要更新的value列,设置header:"partial_columns:true":
curl --location-trusted -u root:123456 -T test_partial_update.csv -H "format:csv" -H "column_separator:," -H "partial_columns:true"  -H "columns:id, value" http://10.16.10.6:8030/api/demo/test_partial_update/_stream_load
  • 查询数据可知date_time字段记录了数据的插入时间,dt字段记录了数据的更新时间。
mysql> select * from test_partial_update;
+------+-------------+----------------------------+----------------------------+
| id   | value       | date_time                  | dt                         |
+------+-------------+----------------------------+----------------------------+
|    1 |  "SelectDB" | 2024-10-13 10:45:22.000000 | 2024-10-13 11:11:51.795000 |
+------+-------------+----------------------------+----------------------------+
1 row in set (0.01 sec)
  • mysql建表,插入数据:
CREATE TABLE `test_partial_update` (
  `id` INT(11) NULL,
  `value` VARCHAR(20) NULL,
  `date_time` DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
  `dt` DATETIME(6) DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6)
);

mysql> insert into test_partial_update(id, value) values(1, "SelectDB");
Query OK, 1 row affected (0.00 sec)

mysql> select * from test_partial_update;
+------+----------+----------------------------+----------------------------+
| id   | value    | date_time                  | dt                         |
+------+----------+----------------------------+----------------------------+
|    1 | SelectDB | 2024-10-14 16:08:14.058728 | 2024-10-14 16:08:14.058728 |
+------+----------+----------------------------+----------------------------+
1 row in set (0.00 sec)
  • 进入flink
./bin/sql-client.sh embedded
  • 开启Checkpoint

Flink作业周期性执行checkpoint,记录Binlog位点,当作业发生Failover时,便会从之前记录的Binlog位点继续处理。

Flink SQL> SET execution.checkpointing.interval = 3s;
  • 创建MySQL CDC表和Dois sink表,并设置'sink.properties.partial_columns' = 'true'
CREATE TABLE cdc_mysql_source (
    id INT,
    `value` STRING,
    date_time TIMESTAMP,
    dt TIMESTAMP
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '10.16.10.6',
    'port' = '3326', 
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test',
    'scan.incremental.snapshot.chunk.key-column'='id',
    'table-name' = 'test_partial_update'
);

CREATE TABLE doris_sink (
    id INT,
    `value` STRING,
    date_time TIMESTAMP,
    dt TIMESTAMP
) 
WITH (
  'connector' = 'doris',
  'fenodes' = '127.0.0.1:8030',
  'table.identifier' = 'demo.test_partial_update',
  'username' = 'root',
  'password' = '123456',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial_columns' = 'true' -- 开启部分列更新
);


insert into doris_sink(id, `value`) select id,`value` from cdc_mysql_source;
  • 去 doris 查询数据可知date_time字段记录了数据的插入时间,dt字段记录了数据的更新时间。
mysql> select * from test_partial_update;
+------+-------------+----------------------------+----------------------------+
| id   | value       | date_time                  | dt                         |
+------+-------------+----------------------------+----------------------------+
|    1 |  "SelectDB" | 2024-10-13 10:45:22.000000 | 2024-10-14 16:11:23.546000 |
+------+-------------+----------------------------+----------------------------+
1 row in set (0.01 sec)

常见问题FAQ

Q1. 如果开启了部分列更新之后插入报错"errCode = 2, detailMessage = Insert has filtered data in strict mode"

  • 原因:控制 insert 语句是否开启严格模式的会话变量enable_insert_strict的默认值为 true,即 insert 语句默认开启严格模式,而在严格模式下进行部分列更新不允许更新不存在的 key。所以,在使用 insert 语句进行部分列更新的时候如果希望能插入不存在的 key,需要在enable_unique_key_partial_update设置为 true 的基础上同时将enable_insert_strict设置为 false。

解决方法: set enable_insert_strict=false;

Q2. MySQL CDC表数据同步失败。

  • 原因1:mysql未开启binlog

解决办法:

可以通过以下命令检查 binlog 是否已启用:

SHOW VARIABLES LIKE 'log_bin';

如果 log_bin 的值是 OFF,需要在 MySQL 配置文件中启用 binlog:

[mysqld]
log_bin = mysql-bin
binlog_format = ROW
server_id = 1

重启 MySQL 服务后生效。

  • 原因2:检查 MySQL 表 test_partial_update 的字段类型与 Flink 表 DDL 是否匹配。字段类型不匹配可能会导致数据读取问题。

  • 原因3:flink 任务报错 MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
    解决方法:

    set time_zone='+8:00';
    #或者
    set persist time_zone='+8:00';
    
1 Answers

部分列更新可以参考这个文档,有问题可以随时联系官方人员。