V2.1.7,Steam Load实时同步数据,采用Group Commit的异步模式(async_mode),当BE节点出现宕机时,会出现部分数据丢失现象

Viewed 16
  1. 集群环境:Selectdb 2.1.7
  2. Flink实时消费Kafka数据,底层通过Stream Load同步到Doris表中,采用的是Group Commit的异步模式(async_mode)
  3. 当某些BE节点突然宕机时(偶尔大查询导致),会出现部分数据丢失(发生异常的那个时间点),Flink taskmanager中也未出现任何异常错误,Flink代码中本身也做了错误重试,还是没效果。
  4. 丢失的数据一般几十条或者百来条样子。

--核心代码如下:

public Boolean commitDataToDoris(JSONArray jsonArrayData, String dorisTableName) {
        if (jsonArrayData == null || jsonArrayData.isEmpty()) {
            return true;
        }

        HttpURLConnection feConn = null;
        HttpURLConnection beConn = null;
        String label;
        String location;

        String loadUrlStr = String.format(LOAD_URL_PATTERN, this.dorisFeHost, this.dorisFeHttpPort, STG_DB, dorisTableName);
        String columns = String.join(",", jsonArrayData.getJSONObject(0).keySet());
        // jsonFormat格式如: "[\"$.name\",\"$.age\"]"
        String jsonFormat = "[\"$." + columns.replaceAll(",", "\",\"\\$.") + "\"]";

        // check fe status and get fe connection
        try {
            Calendar calendar = Calendar.getInstance();
            // 设置导入的label, 全局唯一flink_import_<dorisDb>_<dorisDb>_yyyyMMdd_HHMMSS_UUID
            label = String.format("flink_import_%s_%s_%s%02d%02d_%02d%02d%02d_%s", STG_DB, dorisTableName, calendar.get(Calendar.YEAR),
                    calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY),
                    calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
                    UUID.randomUUID().toString().replaceAll("-", ""));

            feConn = getHttpConnection(loadUrlStr, label, columns, jsonFormat);
            int status = feConn.getResponseCode();
            if (status != 307) {
                LOG.error("fe conn status is not redirect 307,status: {},need retry.", status);
                return false;
            }
            location = feConn.getHeaderField("Location");
            if (location == null) {
                LOG.error("fe redirect location is null");
                return false;
            }
        } catch (Exception ex) {
            LOG.error("fe server connect fail,error: {},need retry.", ex.getMessage());
            return false;
        } finally {
            if (feConn != null) {
                feConn.disconnect();
            }
        }

        // get be connection
        try {
            beConn = getHttpConnection(location, label, columns, jsonFormat);
            BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
            bos.write(jsonArrayData.toString().getBytes());
            bos.close();

            // doris commit提交,数据可见
            int responseCode = beConn.getResponseCode();
            if (responseCode != 200) {
                LOG.error("Doris Be ResponseCode not equal 200,ResponseCode:{}", responseCode);
            }

            return true;
        } catch (Exception e) {
            String err = "batch commit data to be fail,label: " + label + ",location: " + location + ",error: " + e.getMessage();
            LOG.error(err);
            return false;
        } finally {
            if (beConn != null) {
                beConn.disconnect();
            }
        }
    }
0 Answers