golang 如何使用group commit

Viewed 110

doris版本:2.1.5
开发语言:golang
sql sdk: github.com/go-sql-driver/mysql,database/sql
描述:建立链接以后,首先执行db.exec("SET group_commit = async_mode;")。然后批量执行insert,速度很慢,最快的时候30s,大概执行3w行。因为文档没有给出golang的示例,所以我用了三种方式:
1:最快的就是拼成insert into table(field) values(?),(?) ==>GroupCommitV3
2.类似文档示例的prepare,在insert反而很慢。每条30ms。 ==>GroupCommitV1
3.最慢的就是先开启事务,在prepare 。==>GroupCommitV2
问题:
golang能不能使用group commit?,应该怎么使用group commit来实现实时数据导入doris。

// open sql
url := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", cfg.UserName, cfg.Password, cfg.Host, cfg.Port, cfg.DataBase)

	db, err = sql.Open("mysql", url)
	if err != nil {
		Error("mysql start ", err)
		os.Exit(-1)
	}

三种代码

func GroupCommitV1(data []any) error {
	stmt, err := db.Prepare("insert into table_name (`date`,`id`,`user_id`,`amount`,`remark`) values (?,?,?,?,?)")
	if err != nil {
		return err
	}

	defer stmt.Close()
	for _, one := range data {
		_, err = stmt.Exec(one.Date, one.ID, one.UserId, one.Amount)
		if err != nil {
			return err
		}
	}

	return nil
}
func GroupCommitV2(data []any) error {
    tx,err := db.Begin() 
	if err != nil {
		return err
	}

	defer tx.Rollback()
	stmt, err := tx.Prepare("insert into table_name (`date`,`id`,`user_id`,`amount`,`remark`) values (?,?,?,?,?)")
	if err != nil {
		return err
	}

	defer stmt.Close()
	for _, one := range data {
		_, err = stmt.Exec(one.Date, one.ID, one.UserId, one.Amount)
		if err != nil {
			return err
		}
	}

	return tx.Commit()
}
var sqlPrefix = "insert into `table_name` (`date`,`id`,`user_id`,`amount`) values "
func GroupCommitV3(data []any) error {
	builder := strings.Builder{}
    builder.WriteString("SET group_commit = async_mode;")
	builder.WriteString(sqlPrfix)
	length := len(data)
	for k, v := range data {
		builder.WriteString(fmt.Sprintf("('%v',%d,%d,%d)", one.Date, one.ID, one.UserId, one.Amount))
		if k == length-1 {
			builder.WriteString(";")
		} else {
			builder.WriteString(",")
		}
	}

	_, err := db.Exec(builder.String())
	return err
}
CREATE TABLE IF NOT EXISTS api.table_name 
( 
		`date` datetimev2 NOT NULL  COMMENT '时间戳', 
		`user_id` int NOT NULL COMMENT "用户id", 
		`amount` tinyint NOT NULL  COMMENT '数量', 
		`sub_op` int NOT NULL  COMMENT '操作码', 
		`remarks` varchar(256) default ''  COMMENT '说明', 
) 
DUPLICATE KEY(`date`,`user_id`) 
PARTITION BY RANGE(`date`)()
DISTRIBUTED BY HASH(`date`,`user_id`) BUCKETS AUTO 
PROPERTIES ( 
	"estimate_partition_size" = "5G", 
	"storage_policy" = "doris_policy", 
	"replication_num" = "1", 
	"dynamic_partition.enable" = "true", 
  "dynamic_partition.time_unit" = "WEEK",
	"dynamic_partition.start" = "-1", 
  "dynamic_partition.end" = "1", 
  "dynamic_partition.prefix" = "p", 
  "dynamic_partition.buckets" = "32", 
	"dynamic_partition.time_zone"="Asia/Shanghai" 
);

1 Answers

参考代码:

package main
import (
 "database/sql"
 "fmt"
 "math/rand"
 "strings"
 "sync"
 "sync/atomic"
 "time"
 _ "github.com/go-sql-driver/mysql"
)
const (
 host     = "127.0.0.1"
 port     = 9038
 db       = "test"
 user     = "root"
 password = ""
 table    = "async_lineitem"
)
var (
 threadCount = 20
 batchSize   = 100
)
var totalInsertedRows int64
var rowsInsertedLastSecond int64
func main() {
 dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", user, password, host, port, db)
 db, err := sql.Open("mysql", dbDSN)
 if err != nil {
 fmt.Printf("Error opening database: %s\n", err)
 return
 }
 defer db.Close()
 var wg sync.WaitGroup
 for i := 0; i < threadCount; i++ {
 wg.Add(1)
 go func() {
 defer wg.Done()
 groupCommitInsertBatch(db)
 }()
 }
 // Start a thread to print the total and per-second row count
 go logInsertStatistics()
 wg.Wait()
}
func groupCommitInsertBatch(db *sql.DB) {
 for {
 valueStrings := make([]string, 0, batchSize)
 valueArgs := make([]interface{}, 0, batchSize*16)
 for i := 0; i < batchSize; i++ {
 for i = 0; i < batchSize; i++ {
 valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
 valueArgs = append(valueArgs, rand.Intn(1000))
 valueArgs = append(valueArgs, rand.Intn(1000))
 valueArgs = append(valueArgs, rand.Intn(1000))
 valueArgs = append(valueArgs, rand.Intn(1000))
 valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
 valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
 valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
 valueArgs = append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})
 valueArgs = append(valueArgs, "N")
 valueArgs = append(valueArgs, "O")
 valueArgs = append(valueArgs, time.Now())
 valueArgs = append(valueArgs, time.Now())
 valueArgs = append(valueArgs, time.Now())
 valueArgs = append(valueArgs, "DELIVER IN PERSON")
 valueArgs = append(valueArgs, "SHIP")
 valueArgs = append(valueArgs, "N/A")
 }
 }
 stmt := fmt.Sprintf("INSERT INTO %s VALUES %s",
 table, strings.Join(valueStrings, ","))
 _, err := db.Exec(stmt, valueArgs...)
 if err != nil {
 fmt.Printf("Error executing batch: %s\n", err)
 return
 }
 atomic.AddInt64(&rowsInsertedLastSecond, int64(batchSize))
 atomic.AddInt64(&totalInsertedRows, int64(batchSize))
 }
}
func logInsertStatistics() {
 for {
 time.Sleep(1 * time.Second)
 fmt.Printf("Total inserted rows: %d\n", totalInsertedRows)
 fmt.Printf("Rows inserted in the last second: %d\n", rowsInsertedLastSecond)
 rowsInsertedLastSecond = 0
 }
}