开始总以为是map_agg的问题,最后发现是建表语句的问题。字段类型设置为string。
CREATE TABLE IF NOT EXISTS `data_center`.`dws_xf_deviceanalog_agg`
(
deviceid INT COMMENT '设备ID',
total_date VARCHAR(10) COMMENT '统计日期',
json_last_data String COMMENT '最新数据JSON',
arrary_count String COMMENT '设备数量',
**array_line_value String COMMENT '时间线数据JSON',**
last_update DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '最后更新时间'
) ENGINE=OLAP
UNIQUE KEY(deviceid, total_date)
COMMENT "设备统计汇总表"
DISTRIBUTED BY HASH(deviceid) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"min_load_replica_num" = "1",
"estimate_partition_size" = "10G",
"storage_format" = "V2"
);
调整后 array_line_value字段类型修改为MAP<STRING,STRING>
DROP TABLE IF EXISTS `dws_xf_deviceanalog_agg`;
CREATE TABLE IF NOT EXISTS `data_center`.`dws_xf_deviceanalog_agg`
(
deviceid INT COMMENT '设备ID',
total_date VARCHAR(10) COMMENT '统计日期',
json_last_data VARIANT COMMENT '最新数据JSON',
arrary_count int COMMENT '设备数量',
array_line_value MAP<STRING,STRING> COMMENT '时间线数据JSON',
last_update DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '最后更新时间'
) ENGINE=OLAP
UNIQUE KEY(deviceid, total_date)
COMMENT "设备统计汇总表"
DISTRIBUTED BY HASH(deviceid) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"min_load_replica_num" = "1",
"estimate_partition_size" = "10G",
"storage_format" = "V2"
);
再次插入成功
********** 终极版本 **********
DROP TABLE IF EXISTS `dws_xf_deviceanalog_agg`;
CREATE TABLE IF NOT EXISTS `data_center`.`dws_xf_deviceanalog_agg`
(
deviceid INT COMMENT '设备ID',
total_date VARCHAR(10) COMMENT '统计日期',
json_last_data VARIANT COMMENT '最新数据JSON',
arrary_count int COMMENT '设备数量',
array_line_value MAP<STRING, MAP<String,String>> COMMENT '时间线数据JSON',
last_update DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '最后更新时间'
) ENGINE=OLAP
UNIQUE KEY(deviceid, total_date)
COMMENT "设备统计汇总表"
DISTRIBUTED BY HASH(deviceid) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"min_load_replica_num" = "1",
"estimate_partition_size" = "10G",
"storage_format" = "V2"
);
INSERT overwrite table dws_xf_deviceanalog_agg partition(*) (deviceid,total_date,json_last_data,arrary_count,array_line_value)
WITH hours AS (
SELECT
LPAD(hh, 2, '0') AS 24hour
FROM (
SELECT 0 AS hh UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4
UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8
UNION ALL SELECT 9 UNION ALL SELECT 10 UNION ALL SELECT 11
UNION ALL SELECT 12 UNION ALL SELECT 13 UNION ALL SELECT 14 UNION ALL SELECT 15
UNION ALL SELECT 16 UNION ALL SELECT 17 UNION ALL SELECT 18 UNION ALL SELECT 19
UNION ALL SELECT 20 UNION ALL SELECT 21 UNION ALL SELECT 22 UNION ALL SELECT 23
) AS hour_table
WHERE hh <= HOUR(NOW())
),
device_hours AS (
SELECT
deviceid,
DATE(NOW()) AS totaldate,
analog_value_type_id AS analog_type_id,
24hour
FROM
(SELECT DISTINCT deviceid, analog_value_type_id FROM dwd_xf_deviceanalog_dync) AS devices
CROSS JOIN
hours
)插入语句
select deviceid,totaldate ,
JSON_OBJECT(
'温度', CASE WHEN MAX(CASE WHEN analog_type_id = 3 THEN new_value END) IS NOT NULL THEN MAX(CASE WHEN analog_type_id = 3 THEN new_value END) END,
'电流', CASE WHEN MAX(CASE WHEN analog_type_id = 9 THEN new_value END) IS NOT NULL THEN MAX(CASE WHEN analog_type_id = 9 THEN new_value END) END,
'剩余电流', CASE WHEN MAX(CASE WHEN analog_type_id = 128 THEN new_value END) IS NOT NULL THEN MAX(CASE WHEN analog_type_id = 128 THEN new_value END) END,
'压力', CASE WHEN MAX(CASE WHEN analog_type_id IN (4, 5, 1011) THEN new_value END) IS NOT NULL THEN MAX(CASE WHEN analog_type_id IN (4, 5, 1011) THEN new_value END) END,
'高精度电压', CASE WHEN MAX(CASE WHEN analog_type_id = 256 THEN new_value END) IS NOT NULL THEN MAX(CASE WHEN analog_type_id = 256 THEN new_value END) END,
'高度', CASE WHEN MAX(CASE WHEN analog_type_id = 2 THEN new_value END) IS NOT NULL THEN MAX(CASE WHEN analog_type_id = 2 THEN new_value END) END,
'气体浓度', CASE WHEN MAX(CASE WHEN analog_type_id = 6 THEN new_value END) IS NOT NULL THEN MAX(CASE WHEN analog_type_id = 6 THEN new_value END) END,
'红光传感器采样值', CASE WHEN MAX(CASE WHEN analog_type_id = 264 THEN new_value END) IS NOT NULL THEN MAX(CASE WHEN analog_type_id = 264 THEN new_value END) END,
'蓝光传感器采样值', CASE WHEN MAX(CASE WHEN analog_type_id = 268 THEN new_value END) IS NOT NULL THEN MAX(CASE WHEN analog_type_id = 268 THEN new_value END) END
) AS json_last_data ,
count(deviceid) as arrary_count ,
MAP_AGG(cast(analog_type_id as varchar ),line_value) AS array_line_value
from (
SELECT
device_hours.deviceid,
device_hours.totaldate,
device_hours.analog_type_id,
MAX(new_value) as new_value ,
MAP_AGG(device_hours.24hour,new_value) AS line_value
FROM (
SELECT
deviceid,
createtime,
DATE(createtime) AS iotdate,
DATE_FORMAT(createtime, '%H') AS iottime,
analog_value_type_id,
value,
LAST_VALUE(value, 1) OVER (
PARTITION BY deviceid, analog_value_type_id, DATE(createtime)
ORDER BY createtime
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS new_value
FROM
dwd_xf_deviceanalog_dync
) AS ranked
RIGHT JOIN device_hours
ON device_hours.deviceid = ranked.deviceid
AND ranked.iottime = device_hours.24hour
AND device_hours.analog_type_id = ranked.analog_value_type_id
GROUP BY
device_hours.deviceid,
device_hours.totaldate,
device_hours.analog_type_id
) as analogagg
group by analogagg.deviceid,analogagg.totaldate
ORDER BY
deviceid
最终数据结果

{
"258": {
"09": null,"10": null,"08": null,"05": null,"02": null,"04": null,"07": null,"11": null,"00": null,"01": null,"03": null,"06": "19.0000"
},
"6": {
"09": null,"10": null,"08": null,"05": null,"02": null,"04": null,"07": null,"11": null,"00": null,"01": null,"03": null,"06": "0.0000"
},
"130": {
"09": null,"10": null,"08": null,"05": null,"02": null,"04": null,"07": null,"11": null,"00": null,"01": null,"03": null,"06": "25.0000"
},
"260": {
"09": null,"10": null,"08": null,"05": null,"02": null,"04": null,"07": null,"11": null,"00": null,"01": null,"03": null,"06": "0.0000"
},
"259": {
"09": null,"10": null,"08": null,"05": null,"02": null,"04": null,"07": null,"11": null,"00": null,"01": null,"03": null,"06": "-67.0000"
},
"3": {
"09": null,"10": null,"08": null,"05": null,"02": null,"04": null,"07": null,"11": null,"00": null,"01": null,"03": null,"06": "19.0000"
}
}
主要利用了可变数据类型和map类型,由于我将多个工况的二次聚集,所以map类型应该要全部枚举出类型结构,要是能模糊就更好了。
可能支持map<string,objcet>,希望官网人员解答。目前已经支持 MAP<STRING, MAP<String,String>> 嵌套已经很强了。
唯一美中不足的是map不能排序,提供给接口或前端时,需要按时间再key值进行一次排序。