- 集群环境:Selectdb 2.1.7
- Flink实时消费Kafka数据,底层通过Stream Load同步到Doris表中,采用的是Group Commit的异步模式(async_mode)
- 当某些BE节点突然宕机时(偶尔大查询导致),会出现部分数据丢失(发生异常的那个时间点),Flink taskmanager中也未出现任何异常错误,Flink代码中本身也做了错误重试,还是没效果。
- 丢失的数据一般几十条或者百来条样子。
--核心代码如下:
public Boolean commitDataToDoris(JSONArray jsonArrayData, String dorisTableName) {
if (jsonArrayData == null || jsonArrayData.isEmpty()) {
return true;
}
HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
String label;
String location;
String loadUrlStr = String.format(LOAD_URL_PATTERN, this.dorisFeHost, this.dorisFeHttpPort, STG_DB, dorisTableName);
String columns = String.join(",", jsonArrayData.getJSONObject(0).keySet());
// jsonFormat格式如: "[\"$.name\",\"$.age\"]"
String jsonFormat = "[\"$." + columns.replaceAll(",", "\",\"\\$.") + "\"]";
// check fe status and get fe connection
try {
Calendar calendar = Calendar.getInstance();
// 设置导入的label, 全局唯一flink_import_<dorisDb>_<dorisDb>_yyyyMMdd_HHMMSS_UUID
label = String.format("flink_import_%s_%s_%s%02d%02d_%02d%02d%02d_%s", STG_DB, dorisTableName, calendar.get(Calendar.YEAR),
calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY),
calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));
feConn = getHttpConnection(loadUrlStr, label, columns, jsonFormat);
int status = feConn.getResponseCode();
if (status != 307) {
LOG.error("fe conn status is not redirect 307,status: {},need retry.", status);
return false;
}
location = feConn.getHeaderField("Location");
if (location == null) {
LOG.error("fe redirect location is null");
return false;
}
} catch (Exception ex) {
LOG.error("fe server connect fail,error: {},need retry.", ex.getMessage());
return false;
} finally {
if (feConn != null) {
feConn.disconnect();
}
}
// get be connection
try {
beConn = getHttpConnection(location, label, columns, jsonFormat);
BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
bos.write(jsonArrayData.toString().getBytes());
bos.close();
// doris commit提交,数据可见
int responseCode = beConn.getResponseCode();
if (responseCode != 200) {
LOG.error("Doris Be ResponseCode not equal 200,ResponseCode:{}", responseCode);
}
return true;
} catch (Exception e) {
String err = "batch commit data to be fail,label: " + label + ",location: " + location + ",error: " + e.getMessage();
LOG.error(err);
return false;
} finally {
if (beConn != null) {
beConn.disconnect();
}
}
}