使用flink-doris-connector连接器开启攒批模式写入写入效率很慢,共有上百万的每两分钟写一次几百条数据,设置了攒批的条数1w好像没触发,总是出发两分钟的时间间隔

Viewed 70

SET 'execution.checkpointing.interval' = '3min';
SET 'execution.checkpointing.timeout' = '300min';
SET 'cluster.evenly-spread-out-slots'='true';
SET 'table.exec.state.ttl' = '8640000';
SET 'table.local-time-zone' = 'Asia/Shanghai';
SET 'table.exec.sink.not-null-enforcer' = 'DROP';

CREATE TABLE IF NOT EXISTS DM_DELIVBILL_B_SOURCE
(
cdelivbill_bid STRING,
bapflag STRING,
barflag STRING,
casscustid STRING,
castunitid STRING,
cbiztypeid STRING,
ccosignid STRING,
cdefdoc STRING,
cdelivbill_hid STRING,
cdmsupplierid STRING,
cffileid STRING,
cfirstbid STRING,
cfirstid STRING,
cinventoryid STRING,
cinventoryvid STRING,
cprojectid STRING,
cpuorder_bb1id STRING,
cqualitylevel STRING,
creceiveaddrdocid STRING,
creceiveaddrid STRING,
creceiveareaid STRING,
creceivecustid STRING,
creceivestoreid STRING,
creceivestoreorgid STRING,
creceivestoreorgvid STRING,
crowno STRING,
csendaddrdocid STRING,
csendaddrid STRING,
csendareaid STRING,
csendstoreid STRING,
csendstoreorgid STRING,
csendstoreorgvid STRING,
csendvendorid STRING,
csettlefinorgid STRING,
csettlefinorgvid STRING,
csignerid STRING,
csrcbid STRING,
csrcid STRING,
ctakefeeid STRING,
cunitid STRING,
dbilldate STRING,
dr STRING,
drequiredate STRING,
drequiretime STRING,
dsenddate STRING,
dsendtime STRING,
dsigndate STRING,
nastnum STRING,
nmoney STRING,
nnum STRING,
nprice STRING,
nroutemile STRING,
nsignastnum STRING,
nsignnum STRING,
nsignvolume STRING,
nsignweight STRING,
nspecialmile1 STRING,
nspecialmile2 STRING,
nspecialmile3 STRING,
nspecialmile4 STRING,
nvolumn STRING,
nweight STRING,
pk_batchcode STRING,
pk_group STRING,
pk_org STRING,
ts STRING,
vbatchcode STRING,
vbdef1 STRING,
vbdef10 STRING,
vbdef11 STRING,
vbdef12 STRING,
vbdef13 STRING,
vbdef14 STRING,
vbdef15 STRING,
vbdef16 STRING,
vbdef17 STRING,
vbdef18 STRING,
vbdef19 STRING,
vbdef2 STRING,
vbdef20 STRING,
vbdef3 STRING,
vbdef4 STRING,
vbdef5 STRING,
vbdef6 STRING,
vbdef7 STRING,
vbdef8 STRING,
vbdef9 STRING,
vbnote STRING,
vbsenderphone STRING,
vchangerate STRING,
vfirstcode STRING,
vfirstrowno STRING,
vfirsttrantype STRING,
vfirsttype STRING,
vfree1 STRING,
vfree10 STRING,
vfree2 STRING,
vfree3 STRING,
vfree4 STRING,
vfree5 STRING,
vfree6 STRING,
vfree7 STRING,
vfree8 STRING,
vfree9 STRING,
vpuplancode STRING,
vreceivephone STRING,
vreceiver STRING,
vsender STRING,
vsrccode STRING,
vsrcrowno STRING,
vsrctrantype STRING,
vsrctype STRING,
PRIMARY KEY (cdelivbill_bid,dbilldate) NOT ENFORCED
)
WITH (
'connector' = 'paimon',
'path' = 'hdfs://mycluster/paimon/nc65zs.db/dm_delivbill_b',
'format' = 'debezium-json'
);

---flinksql-doris-DM_DELIVBILL_B---
CREATE TABLE IF NOT EXISTS DM_DELIVBILL_B_SINK
(
cdelivbill_bid STRING,
bapflag STRING,
barflag STRING,
casscustid STRING,
castunitid STRING,
cbiztypeid STRING,
ccosignid STRING,
cdefdoc STRING,
cdelivbill_hid STRING,
cdmsupplierid STRING,
cffileid STRING,
cfirstbid STRING,
cfirstid STRING,
cinventoryid STRING,
cinventoryvid STRING,
cprojectid STRING,
cpuorder_bb1id STRING,
cqualitylevel STRING,
creceiveaddrdocid STRING,
creceiveaddrid STRING,
creceiveareaid STRING,
creceivecustid STRING,
creceivestoreid STRING,
creceivestoreorgid STRING,
creceivestoreorgvid STRING,
crowno STRING,
csendaddrdocid STRING,
csendaddrid STRING,
csendareaid STRING,
csendstoreid STRING,
csendstoreorgid STRING,
csendstoreorgvid STRING,
csendvendorid STRING,
csettlefinorgid STRING,
csettlefinorgvid STRING,
csignerid STRING,
csrcbid STRING,
csrcid STRING,
ctakefeeid STRING,
cunitid STRING,
dbilldate STRING,
dr STRING,
drequiredate STRING,
drequiretime STRING,
dsenddate STRING,
dsendtime STRING,
dsigndate STRING,
nastnum STRING,
nmoney STRING,
nnum STRING,
nprice STRING,
nroutemile STRING,
nsignastnum STRING,
nsignnum STRING,
nsignvolume STRING,
nsignweight STRING,
nspecialmile1 STRING,
nspecialmile2 STRING,
nspecialmile3 STRING,
nspecialmile4 STRING,
nvolumn STRING,
nweight STRING,
pk_batchcode STRING,
pk_group STRING,
pk_org STRING,
ts STRING,
vbatchcode STRING,
vbdef1 STRING,
vbdef10 STRING,
vbdef11 STRING,
vbdef12 STRING,
vbdef13 STRING,
vbdef14 STRING,
vbdef15 STRING,
vbdef16 STRING,
vbdef17 STRING,
vbdef18 STRING,
vbdef19 STRING,
vbdef2 STRING,
vbdef20 STRING,
vbdef3 STRING,
vbdef4 STRING,
vbdef5 STRING,
vbdef6 STRING,
vbdef7 STRING,
vbdef8 STRING,
vbdef9 STRING,
vbnote STRING,
vbsenderphone STRING,
vchangerate STRING,
vfirstcode STRING,
vfirstrowno STRING,
vfirsttrantype STRING,
vfirsttype STRING,
vfree1 STRING,
vfree10 STRING,
vfree2 STRING,
vfree3 STRING,
vfree4 STRING,
vfree5 STRING,
vfree6 STRING,
vfree7 STRING,
vfree8 STRING,
vfree9 STRING,
vpuplancode STRING,
vreceivephone STRING,
vreceiver STRING,
vsender STRING,
vsrccode STRING,
vsrcrowno STRING,
vsrctrantype STRING,
vsrctype STRING,
PRIMARY KEY (cdelivbill_bid,dbilldate) NOT ENFORCED
) WITH (
${Doris_DB},
'table.identifier' = 'nc65zs.DM_DELIVBILL_B',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true',
'sink.properties.strict_mode'= 'true',
'sink.enable.batch-mode' = 'true',
'sink.buffer-flush.interval' = '120s',
'sink.buffer-flush.max-rows' = '10000',
'sink.label-prefix' = 'nc65zs_DM_DELIVBILL_B'
);

INSERT INTO DM_DELIVBILL_B_SINK(cdelivbill_bid, bapflag, barflag, casscustid, castunitid, cbiztypeid, ccosignid, cdefdoc, cdelivbill_hid, cdmsupplierid, cffileid, cfirstbid, cfirstid, cinventoryid, cinventoryvid, cprojectid, cpuorder_bb1id, cqualitylevel, creceiveaddrdocid, creceiveaddrid, creceiveareaid, creceivecustid, creceivestoreid, creceivestoreorgid, creceivestoreorgvid, crowno, csendaddrdocid, csendaddrid, csendareaid, csendstoreid, csendstoreorgid, csendstoreorgvid, csendvendorid, csettlefinorgid, csettlefinorgvid, csignerid, csrcbid, csrcid, ctakefeeid, cunitid, dbilldate, dr, drequiredate, drequiretime, dsenddate, dsendtime, dsigndate, nastnum, nmoney, nnum, nprice, nroutemile, nsignastnum, nsignnum, nsignvolume, nsignweight, nspecialmile1, nspecialmile2, nspecialmile3, nspecialmile4, nvolumn, nweight, pk_batchcode, pk_group, pk_org, ts, vbatchcode, vbdef1, vbdef10, vbdef11, vbdef12, vbdef13, vbdef14, vbdef15, vbdef16, vbdef17, vbdef18, vbdef19, vbdef2, vbdef20, vbdef3, vbdef4, vbdef5, vbdef6, vbdef7, vbdef8, vbdef9, vbnote, vbsenderphone, vchangerate, vfirstcode, vfirstrowno, vfirsttrantype, vfirsttype, vfree1, vfree10, vfree2, vfree3, vfree4, vfree5, vfree6, vfree7, vfree8, vfree9, vpuplancode, vreceivephone, vreceiver, vsender, vsrccode, vsrcrowno, vsrctrantype, vsrctype)
SELECT cdelivbill_bid, bapflag, barflag, casscustid, castunitid, cbiztypeid, ccosignid, cdefdoc, cdelivbill_hid, cdmsupplierid, cffileid, cfirstbid, cfirstid, cinventoryid, cinventoryvid, cprojectid, cpuorder_bb1id, cqualitylevel, creceiveaddrdocid, creceiveaddrid, creceiveareaid, creceivecustid, creceivestoreid, creceivestoreorgid, creceivestoreorgvid, crowno, csendaddrdocid, csendaddrid, csendareaid, csendstoreid, csendstoreorgid, csendstoreorgvid, csendvendorid, csettlefinorgid, csettlefinorgvid, csignerid, csrcbid, csrcid, ctakefeeid, cunitid, dbilldate, dr, drequiredate, drequiretime, dsenddate, dsendtime, dsigndate, nastnum, nmoney, nnum, nprice, nroutemile, nsignastnum, nsignnum, nsignvolume, nsignweight, nspecialmile1, nspecialmile2, nspecialmile3, nspecialmile4, nvolumn, nweight, pk_batchcode, pk_group, pk_org, ts, vbatchcode, vbdef1, vbdef10, vbdef11, vbdef12, vbdef13, vbdef14, vbdef15, vbdef16, vbdef17, vbdef18, vbdef19, vbdef2, vbdef20, vbdef3, vbdef4, vbdef5, vbdef6, vbdef7, vbdef8, vbdef9, vbnote, vbsenderphone, vchangerate, vfirstcode, vfirstrowno, vfirsttrantype, vfirsttype, vfree1, vfree10, vfree2, vfree3, vfree4, vfree5, vfree6, vfree7, vfree8, vfree9, vpuplancode, vreceivephone, vreceiver, vsender, vsrccode, vsrcrowno, vsrctrantype, vsrctype
FROM DM_DELIVBILL_B_SOURCE where cdelivbill_bid is not null;写入的攒批写入的stream load日志

2 Answers
  1. 提供一下doris版本,connector版本,多截取几次导入记录

看看taskmanager报什么错