doris版本:2.1.5
开发语言:golang
sql sdk: github.com/go-sql-driver/mysql,database/sql
描述:建立链接以后,首先执行db.exec("SET group_commit = async_mode;")。然后批量执行insert,速度很慢,最快的时候30s,大概执行3w行。因为文档没有给出golang的示例,所以我用了三种方式:
1:最快的就是拼成insert into table(field
) values(?),(?) ==>GroupCommitV3
2.类似文档示例的prepare,在insert反而很慢。每条30ms。 ==>GroupCommitV1
3.最慢的就是先开启事务,在prepare 。==>GroupCommitV2
问题:
golang能不能使用group commit?,应该怎么使用group commit来实现实时数据导入doris。
// open sql
url := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", cfg.UserName, cfg.Password, cfg.Host, cfg.Port, cfg.DataBase)
db, err = sql.Open("mysql", url)
if err != nil {
Error("mysql start ", err)
os.Exit(-1)
}
三种代码
func GroupCommitV1(data []any) error {
stmt, err := db.Prepare("insert into table_name (`date`,`id`,`user_id`,`amount`,`remark`) values (?,?,?,?,?)")
if err != nil {
return err
}
defer stmt.Close()
for _, one := range data {
_, err = stmt.Exec(one.Date, one.ID, one.UserId, one.Amount)
if err != nil {
return err
}
}
return nil
}
func GroupCommitV2(data []any) error {
tx,err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare("insert into table_name (`date`,`id`,`user_id`,`amount`,`remark`) values (?,?,?,?,?)")
if err != nil {
return err
}
defer stmt.Close()
for _, one := range data {
_, err = stmt.Exec(one.Date, one.ID, one.UserId, one.Amount)
if err != nil {
return err
}
}
return tx.Commit()
}
var sqlPrefix = "insert into `table_name` (`date`,`id`,`user_id`,`amount`) values "
func GroupCommitV3(data []any) error {
builder := strings.Builder{}
builder.WriteString("SET group_commit = async_mode;")
builder.WriteString(sqlPrfix)
length := len(data)
for k, v := range data {
builder.WriteString(fmt.Sprintf("('%v',%d,%d,%d)", one.Date, one.ID, one.UserId, one.Amount))
if k == length-1 {
builder.WriteString(";")
} else {
builder.WriteString(",")
}
}
_, err := db.Exec(builder.String())
return err
}
CREATE TABLE IF NOT EXISTS api.table_name
(
`date` datetimev2 NOT NULL COMMENT '时间戳',
`user_id` int NOT NULL COMMENT "用户id",
`amount` tinyint NOT NULL COMMENT '数量',
`sub_op` int NOT NULL COMMENT '操作码',
`remarks` varchar(256) default '' COMMENT '说明',
)
DUPLICATE KEY(`date`,`user_id`)
PARTITION BY RANGE(`date`)()
DISTRIBUTED BY HASH(`date`,`user_id`) BUCKETS AUTO
PROPERTIES (
"estimate_partition_size" = "5G",
"storage_policy" = "doris_policy",
"replication_num" = "1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "WEEK",
"dynamic_partition.start" = "-1",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32",
"dynamic_partition.time_zone"="Asia/Shanghai"
);