spark-doris-connector 读取doris2.1.7 dateTimeV2数据异常

Viewed 37

环境

apache doris 2.1.7
服务器:centos7.9
服务器时区:
[root@demo ~]# timedatectl
Local time: Thu 2025-01-09 06:36:23 +06
Universal time: Thu 2025-01-09 00:36:23 UTC
RTC time: Thu 2025-01-09 00:36:23
Time zone: Asia/Dhaka (+06, +0600)
NTP enabled: n/a
NTP synchronized: yes
RTC in local TZ: no
DST active: n/a

jdk:1.8_201

doris 时区信息:
SELECT @@time_zone
@@time_zone|
-----------+
Asia/Dhaka |

spark-doris-connector :1.3.2
spark版本: 3.2.3

建表语句

建表语句:
CREATE TABLE demo.dwd_d_daily_data
(
    meter_no     varchar(64) NOT NULL COMMENT '表号',
    frz_dtime     DATETIMEV2 NOT NULL COMMENT '日期',
    regid         varchar(30) NOT NULL COMMENT '数据标识',
    dataval       decimal(20, 4) COMMENT '表码值',
    data_flag     smallint DEFAULT "0" COMMENT '数据类型(0:正常数据,1:vee修正,2:手工编辑)',
    unit          varchar(15) COMMENT '单位',
    version       smallint DEFAULT "0" comment '版本号(标记数据变更)',
    crt_dt        DATETIMEV2 comment '创建时间',
    crt_by        varchar(100) comment '创建人',
    last_edit_dt  DATETIMEV2 comment '最后修改时间',
    last_edit_by  varchar(100) comment '最后修改人'
) ENGINE=OLAP
  UNIQUE  KEY(`meter_no`,`frz_dtime`,`regid`)
COMMENT '日冻结'
PARTITION BY RANGE(`frz_dtime`)()
DISTRIBUTED BY HASH(`meter_no`) BUCKETS AUTO
PROPERTIES (
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.prefix" = "pd",
    "dynamic_partition.buckets" = "16",
    "dynamic_partition.end" = "30",
    "storage_format" = "V2",
    "storage_type" = "COLUMN"
);

写入数据

写入的数据:
INSERT INTO starmdm.dwd_d_daily_data (meter_no,frz_dtime,regid,dataval,data_flag,unit,version,fmd_flag,crt_dt,crt_by,last_edit_dt,last_edit_by) VALUES
         ('10000000037587','2024-04-13 00:00:00','11010000',10,0,NULL,2,0,'2024-04-13 01:00:24','auto','2024-04-13 11:00:19','auto'),
         ('10000000037588','2024-04-13 00:00:00','11010000',20,0,NULL,2,0,'2024-04-13 01:00:24','auto','2024-04-13 11:00:19','auto'),
         ('10000000037589','2024-04-13 00:00:00','11010000',30,0,NULL,2,0,'2024-04-13 01:00:24','auto','2024-04-13 11:00:19','auto'),
         ('10000000037590','2024-04-13 00:00:00','11010000',40,0,NULL,2,0,'2024-04-13 01:00:24','auto','2024-04-13 11:00:19','auto'),
         ('10000000037591','2024-04-13 00:00:00','11010000',50,0,NULL,2,0,'2024-04-13 01:00:24','auto','2024-04-13 11:00:19','auto');

工具类

import org.apache.doris.spark.cfg.ConfigurationOptions
import org.apache.spark.internal.Logging
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

def readByConnector(tableName: String,spark: SparkSession):DataFrame = { 
    spark.read  
      .format("doris") 
      .option(ConfigurationOptions.DORIS_TABLE_IDENTIFIER, tableName)  
      .option(ConfigurationOptions.DORIS_BATCH_SIZE,"8192" )  
      .option(ConfigurationOptions.DORIS_FENODES, "192.168.70.130:18030")  
      .option("user", "root") 
      .option("password", "star2019") 
      .option("doris.filter.query.in.max.count", "3000")   
      .load()
	 
  } 

单机执行命令

单机版 spark-shell 
val tableName = "demo.dwd_d_daily_data"
val df2 = readByConnector(tableName,spark)
df2.show()

25/01/09 06:46:40 WARN TransportClientFactory: DNS resolution succeed for demo/192.168.70.130:44582 took 15472 ms
+----------+-------------------+--------+---------+---------+----+-------+-------------------+------+-------------------+------------+
|  meter_no|          frz_dtime|   regid|  dataval|data_flag|unit|version|             crt_dt|crt_by|       last_edit_dt|last_edit_by|
+----------+-------------------+--------+---------+---------+----+-------+-------------------+------+-------------------+------------+
|2204703024|2024-12-28 22:00:00|11010000|9054.2060|        0| kWh|      0|2025-01-02 10:39:09|  auto|2025-01-02 10:39:40|        auto|
|2204703024|2024-12-28 22:00:00|11010010|3226.9760|        0| kWh|      0|2025-01-02 10:39:09|  auto|2025-01-02 10:39:40|        auto|
|2204703024|2024-12-28 22:00:00|11010020|5827.2290|        0| kWh|      0|2025-01-02 10:39:09|  auto|2025-01-02 10:39:40|        auto|
|2204703024|2024-12-28 22:00:00|11090000|9054.2060|        0| kWh|      0|2025-01-02 10:39:09|  auto|2025-01-02 10:39:40|        auto|
+----------+-------------------+--------+---------+---------+----+-------+-------------------+------+-------------------+------------+

问题

df2.show()
frz_dtime 比写入时少了2小时,为什么?

已经采取的措施

1.手动设置时区

spark.conf.set("spark.sql.session.timeZone", "Asia/Dhaka")

def readByConnector(tableName: String, spark: SparkSession): DataFrame = {
// 确保 Spark Session 使用正确的时区
spark.conf.set("spark.sql.session.timeZone", "Asia/Dhaka")

spark.read
  .format("doris")
  .option(ConfigurationOptions.DORIS_TABLE_IDENTIFIER, tableName)
  .option(ConfigurationOptions.DORIS_BATCH_SIZE, "8192")
  .option(ConfigurationOptions.DORIS_FENODES, "192.168.70.130:18030")
  .option("user", "root")
  .option("password", "star2019")
  .option("doris.filter.query.in.max.count", "3000")
  .option("doris.timezone", "Asia/Dhaka")
  // 添加更多时区相关配置
  .option("spark.sql.session.timeZone", "Asia/Dhaka")
  .option("jdbc.timezone", "Asia/Dhaka")
  .load()

}
升级connector 到 24.0.0 都不能解决问题.

2. 使用jdbc方式读取,问题解决,但是我需要改动很多代码,是否有更加便捷的方案?

1 Answers

image.png
image.png
你这两个感觉读的时间不是一天的?
如果确实时间少了,你可以在spark的conf里面设置下这个
spark.driver.extraJavaOptions=user.timezone=Asia/Dhaka
spark.executor.extraJavaOptions=user.timezone=Asia/Dhaka