我在用spark-doris-connector时,用sparkSql load数据到另外一张表,由于表的一个字段存的是json格式的数据,导致再sink时候,字段切分错误

Viewed 40

我在用spark-doris-connector-3.1_2.12,版本1.3.0(源码来自git,1.2.0读取时schema会多一个字段,故进行了升级),doris2.03版本时,用sparkSql load数据到另外一张表,由于表的一个字段存的是json格式的数据,导致再sink时候,字段切分错误具体案例如下:

val createDorisTable =
      """
        |CREATE
        |TEMPORARY VIEW ods_t_entry_waste2
        |USING doris
        |OPTIONS(
        |  "table.identifier"="ods_tolldata_prod.ods_t_entry_waste2",
        |  "fenodes"="",
        |  "user"="",
        |  "password"=""
        |);
        |
        |""".stripMargin
   session.sql(createDorisTable)
    val createDorisTable2 =
      """
        |CREATE
        |TEMPORARY VIEW ods_t_entry_waste3
        |USING doris
        |OPTIONS(
        |  "table.identifier"="ods_tolldata_prod.ods_t_entry_waste3",
        |  "fenodes"="",
        |  "user"="root",
        |  "password"=""
        |);
        |
        |""".stripMargin
    session.sql(createDorisTable2)
val insertSql =
      """
        |INSERT INTO ods_t_entry_waste3
        |SELECT *
        |FROM ods_t_entry_waste2
        |where OCCURTIME >= '2024-02-19 00:00:00'
        |and OCCURTIME <'2024-02-20 00:00:00'
        |""".stripMargin
    session.sql(insertSql)

两张表结构一样,其中一个字段的存储为json数据如下:
{
"SD_ID": "数xxxx据",
"ETC_DELAY": "xxxx",
"OU_SEAT": 0
}
在做数据的insert时出错,查看日志如下:
第一行数据:切分 Reason: actual column number in csv file is less than schema column number.actual number: 89, schema column number: 94; line delimiter: [前面为正常切分 {] ,此时切分的最后一个字段"{"为上述json字段的左括号;
第二行数据切分:Reason: actual column number in csv file is less than schema column number.actual number: 1, schema column number: 94; line delimiter: [], column separator: [ ], result values:[ "SD_ID": "",]
第三行切分:result values:[ "ETC_DELAY": "xxxx",, ]
第四行切分:result values:[ "OU_SEAT": 0, ]. src line [ "OU_SEAT": 0]

查看源码发现行分割符默认为"\n",字段分割符为"\t",故原本为一行的数据因为json字段中也存在"\n"被切分成四行数据,从而导致切分的列数与表schema不一致,导致sink数据失败;

1 Answers

'doris.sink.properties.format' = 'json'