详情见如下脚本:
create table asset_data(
`dt` date not null,
`asset_code` varchar(20),
`data_time` datetime not null,
`data_type` tinyint not null,
`device_code` varchar(20) not null,
`raw_time` datetime not null,
`system_time` datetime not null default current_timestamp,
`content` variant
) unique key(`dt`, `asset_code`, `data_time`)
auto partition by range date_trunc(`dt`, 'month')()
distributed by hash(`asset_code`) buckets 8
properties(
"replication_allocation" = "tag.location.default: 1"
);
create routine load rl_workdata on asset_data
properties(
"desired_concurrent_number" = "4",
"max_error_number" = "99999999999999999",
"max_batch_interval" = "5",
"max_batch_rows" = "200000",
"max_batch_size" = "1073741824",
"strict_mode" = "true",
"format" = "json"
)
from kafka(
"kafka_broker_list" = "bigdata15:6667,bigdata16:6667,bigdata18:6667",
"kafka_topic" = "iot_workdata_doris",
"property.kafka_default_offsets" = "offset_beginning",
"property.group.id" = "doris950",
"property.client.id" = "doris950"
);
-- 1级视图
create view originalspectrum as
select * from (
SELECT split_part(COALESCE(content['VehicleId'],content['vehicleId'],content['vehicleid']),'.',1) as vehicleId ,
data_time as gpstime ,
cast(coalesce(content['AdminDivisionCode'],content['area'],content['Area'],content['admindivisioncode']) as varchar(6)) AS admindivisioncode,
cast(content['TotalEngineHours'] as double) AS TotalEngineHours,
cast(content['TotalEngineFuel'] as double) AS TotalFuelConsum,
-- 动臂升
case when cast(content['dongbeishengxiandaoyali'] as double)>0.7 then 1 else 0 end AS dongbiup,
-- 动臂降
case when cast(content['dongbeijiangxiandaoyali'] as double)>0.7 then 1 else 0 end AS dongbidown,
-- 斗杆内收
case when cast(content['douganneishouxiandaoyali'] as double)>0.7 then 1 else 0 end AS douganin,
--- 斗杆外摆
case when cast(content['douganwaibaixiandaoyali'] as double)>0.7 then 1 else 0 end AS douganout,
--- 铲斗内收
case when cast(content['chandouneishouxiandaoyali'] as double)>0.7 then 1 else 0 end AS chandouin,
--- 铲斗外摆
case when cast(content['chandouwaibaixiandaoyali'] as double)>0.7 then 1 else 0 end AS chandouout,
--- 左回转比例阀电流
case when cast(content['zuohuizhuaibilifadianliu'] as double)>0 then 1 else 0 end AS leftrotate,
-- 右回转比例阀电流
case when cast(content['youhuizhuaibilifadianliu'] as double)>0 then 1 else 0 end AS rightrotate,
-- 左行走前进比例阀电流
case when cast(COALESCE (content['zuohangzouqianjinbilifadianliu'],content['walkLeftForwardElec']) as double)>0 then 1 else 0 end AS leftfront,
-- 左行走后退比例阀电流
case when cast(COALESCE (content['zuohangzouhoutuibilifadianliu'],content['walkLeftBackElec']) as double)>0 then 1 else 0 end AS leftbehind,
-- 右行走前进比例阀电流
case when cast(COALESCE (content['youhangzouqianjinbilifadianliu'],content['walkRightForwardElec']) as double)>0 then 1 else 0 end AS rightfront,
-- 右行走后退比例阀电流
case when cast(COALESCE (content['youhangzouhoutuibilifadianliu'],content['walkRightBackElec']) as double)>0 then 1 else 0 end AS rightbehind,
--- 破碎状态
case when cast(content['posuizhuangtai'] as double)>0 then 1 else 0 end AS brokenflag,
-- 机具左回转先导压力
case when cast(content['jijuzuohuizhuaixiandaoyali'] as double)>0.7 then 1 else 0 end AS jijuleft,
-- 机具右回转先导压力
case when cast(content['jijuyouhuizhuaixiandaoyali'] as double)>0.7 then 1 else 0 end AS jijuright,
-- 机具夹紧先导压力
case when cast(content['jijugajinxiandaoyali'] as double)>0.7 then 1 else 0 end AS jijutight,
-- 机具松开先导压力
case when cast(content['jijusongkaixiandaoyali'] as double)>0.7 then 1 else 0 end AS jijuloose,
ROW_NUMBER() over (partition by split_part(COALESCE(content['VehicleId'],content['vehicleId'],content['vehicleid']),'.',1),data_time) as rownum
from demo.asset_data a
where data_type =1 and dt ='2024-3-27' and asset_code is not null and asset_code<>'' and COALESCE(content['VehicleId'],content['vehicleId'],content['vehicleid']) is not null
and content['TotalEngineHours'] is not null
)t
where rownum=1
;
-- 2级视图
create view nextaspectrum as
select vehicleid,
TotalEngineHours,
admindivisioncode,
totalfuelconsum,
gpsTime,
cast(dongbiup as int) as dongbiup,
cast(dongbidown as int) as dongbidown,
cast(douganin as int) as douganin,
cast(douganout as int) as douganout,
cast(chandouin as int) as chandouin,
cast(chandouout as int) as chandouout,
cast(leftrotate as int) as leftrotate,
cast(rightrotate as int) as rightrotate,
cast(leftfront as int) as leftfront,
cast(leftbehind as int) as leftbehind,
cast(rightbehind as int) as rightbehind,
cast(rightfront as int) as rightfront,
cast(brokenflag as int) as brokenflag,
cast(jijuleft as int) as jijuleft,
cast(jijuright as int) as jijuright,
cast(jijutight as int) as jijutight,
cast(jijuloose as int) as jijuloose,
(lag(TotalFuelConsum,1,null) over(partition by vehicleid order by gpstime)) as previousoil,
(lead(TotalFuelConsum,1,null) over(partition by vehicleid order by gpstime)) as nextoil,
lag(gpstime,1,null) over(partition by vehicleid order by gpstime) as previousgpstime,
lead(gpstime,1,null) over(partition by vehicleid order by gpstime) as nextgpstime,
(lag(totalenginehours,1,null) over(partition by vehicleid order by gpstime)) as previoushour,
(lead(totalenginehours,1,null) over(partition by vehicleid order by gpstime)) as nexthour
from originalspectrum;
-- 执行基于2级视图的查询会导致向asset_data写入数据的routine load异常,报错:ErrorReason{code=errCode = 2, msg='failed to create task: errCode = 2, detailMessage = Unknown column 'dt' in 'a''}
select vehicleid,
case when COALESCE((totalenginehours-previoushour)/2,0)+COALESCE((nexthour-totalenginehours)/2,0)>=0 and
COALESCE((totalenginehours-previoushour)/2,0)+COALESCE((nexthour-totalenginehours)/2,0)<=
COALESCE((unix_timestamp(gpstime)-unix_timestamp(previousgpstime))/7200,0)+COALESCE((unix_timestamp(nextgpstime)-unix_timestamp(gpstime))/7200,0)+0.1
then COALESCE((totalenginehours-previoushour)/2,0)+COALESCE((nexthour-totalenginehours)/2,0)
else 0 end as parthour,
case when (TotalFuelConsum-previousoil)/((unix_timestamp(gpstime)-unix_timestamp(previousgpstime))/60)<=15 and (TotalFuelConsum-previousoil)/((unix_timestamp(gpstime)-unix_timestamp(previousgpstime))/60)>0
then TotalFuelConsum-previousoil
else 0 end as partoil,
admindivisioncode,
gpstime,
dongbiup,
dongbidown,
douganin,
douganout,
chandouin,
chandouout,
leftrotate,
rightrotate,
leftfront,
leftbehind,
rightbehind,
rightfront,
brokenflag,
jijuleft,
jijuright,
jijutight,
jijuloose
from nextaspectrum;
-- 直接向asset_data插入数据,也会报类似的异常:ERROR 1054 (42S22): errCode = 2, detailMessage = Unknown column 'dt' in 'a'
-- 如果把视图全部替换为with as子查询,执行查询不会导致异常
with originalspectrum as (
select * from (
SELECT split_part(COALESCE(content['VehicleId'],content['vehicleId'],content['vehicleid']),'.',1) as vehicleId ,
data_time as gpstime ,
cast(coalesce(content[
'AdminDivisionCode'],content['area'],content['Area'],content['admindivisioncode']) as varchar(6)) AS admindivisioncode,
cast(content['TotalEngineHours'] as double) AS TotalEngineHours,
cast(content['TotalEngineFuel'] as double) AS TotalFuelConsum,
-- 动臂升
case when cast(content['dongbeishengxiandaoyali'] as double)>0.7 then 1 else 0 end AS dongbiup,
-- 动臂降
case when cast(content['dongbeijiangxiandaoyali'] as double)>0.7 then 1 else 0 end AS dongbidown,
-- 斗杆内收
case when cast(content['douganneishouxiandaoyali'] as double)>0.7 then 1 else 0 end AS douganin,
--- 斗杆外摆
case when cast(content['douganwaibaixiandaoyali'] as double)>0.7 then 1 else 0 end AS douganout,
--- 铲斗内收
case when cast(content['chandouneishouxiandaoyali'] as double)>0.7 then 1 else 0 end AS chandouin,
--- 铲斗外摆
case when cast(content['chandouwaibaixiandaoyali'] as double)>0.7 then 1 else 0 end AS chandouout,
--- 左回转比例阀电流
case when cast(content['zuohuizhuaibilifadianliu'] as double)>0 then 1 else 0 end AS leftrotate,
-- 右回转比例阀电流
case when cast(content['youhuizhuaibilifadianliu'] as double)>0 then 1 else 0 end AS rightrotate,
-- 左行走前进比例阀电流
case when cast(COALESCE (content['zuohangzouqianjinbilifadianliu'],content['walkLeftForwardElec']) as double)>0 then 1 else 0 end AS leftfront,
-- 左行走后退比例阀电流
case when cast(COALESCE (content['zuohangzouhoutuibilifadianliu'],content['walkLeftBackElec']) as double)>0 then 1 else 0 end AS leftbehind,
-- 右行走前进比例阀电流
case when cast(COALESCE (content['youhangzouqianjinbilifadianliu'],content['walkRightForwardElec']) as double)>0 then 1 else 0 end AS rightfront,
-- 右行走后退比例阀电流
case when cast(COALESCE (content['youhangzouhoutuibilifadianliu'],content['walkRightBackElec']) as double)>0 then 1 else 0 end AS rightbehind,
--- 破碎状态
case when cast(content['posuizhuangtai'] as double)>0 then 1 else 0 end AS brokenflag,
-- 机具左回转先导压力
case when cast(content['jijuzuohuizhuaixiandaoyali'] as double)>0.7 then 1 else 0 end AS jijuleft,
-- 机具右回转先导压力
case when cast(content['jijuyouhuizhuaixiandaoyali'] as double)>0.7 then 1 else 0 end AS jijuright,
-- 机具夹紧先导压力
case when cast(content['jijugajinxiandaoyali'] as double)>0.7 then 1 else 0 end AS jijutight,
-- 机具松开先导压力
case when cast(content['jijusongkaixiandaoyali'] as double)>0.7 then 1 else 0 end AS jijuloose,
ROW_NUMBER() over (partition by split_part(COALESCE(content['VehicleId'],content['vehicleId'],content['vehicleid']),'.',1),data_time) as rownum
from demo.asset_data a
where data_type =1 and dt ='2024-3-27' and asset_code is not null and asset_code<>'' and COALESCE(content['VehicleId'],content['vehicleId'],content['vehicleid']) is not null
and content['TotalEngineHours'] is not null
)t
where rownum=1
),
nextaspectrum as (
select vehicleid,
TotalEngineHours,
admindivisioncode,
totalfuelconsum,
gpsTime,
cast(dongbiup as int) as dongbiup,
cast(dongbidown as int) as dongbidown,
cast(douganin as int) as douganin,
cast(douganout as int) as douganout,
cast(chandouin as int) as chandouin,
cast(chandouout as int) as chandouout,
cast(leftrotate as int) as leftrotate,
cast(rightrotate as int) as rightrotate,
cast(leftfront as int) as leftfront,
cast(leftbehind as int) as leftbehind,
cast(rightbehind as int) as rightbehind,
cast(rightfront as int) as rightfront,
cast(brokenflag as int) as brokenflag,
cast(jijuleft as int) as jijuleft,
cast(jijuright as int) as jijuright,
cast(jijutight as int) as jijutight,
cast(jijuloose as int) as jijuloose,
(lag(TotalFuelConsum,1,null) over(partition by vehicleid order by gpstime)) as previousoil,
(lead(TotalFuelConsum,1,null) over(partition by vehicleid order by gpstime)) as nextoil,
lag(gpstime,1,null) over(partition by vehicleid order by gpstime) as previousgpstime,
lead(gpstime,1,null) over(partition by vehicleid order by gpstime) as nextgpstime,
(lag(totalenginehours,1,null) over(partition by vehicleid order by gpstime)) as previoushour,
(lead(totalenginehours,1,null) over(partition by vehicleid order by gpstime)) as nexthour
from originalspectrum
)
select
vehicleid,
case when COALESCE((totalenginehours-previoushour)/2,0)+COALESCE((nexthour-totalenginehours)/2,0)>=0 and
COALESCE((totalenginehours-previoushour)/2,0)+COALESCE((nexthour-totalenginehours)/2,0)<=
COALESCE((unix_timestamp(gpstime)-unix_timestamp(previousgpstime))/7200,0)+COALESCE((unix_timestamp(nextgpstime)-unix_timestamp(gpstime))/7200,0)+0.1
then COALESCE((totalenginehours-previoushour)/2,0)+COALESCE((nexthour-totalenginehours)/2,0)
else 0 end as parthour,
case when (TotalFuelConsum-previousoil)/((unix_timestamp(gpstime)-unix_timestamp(previousgpstime))/60)<=15 and (TotalFuelConsum-previousoil)/((unix_timestamp(gpstime)-unix_timestamp(previousgpstime))/60)>0
then TotalFuelConsum-previousoil
else 0 end as partoil,
admindivisioncode,
gpstime,
dongbiup,
dongbidown,
douganin,
douganout,
chandouin,
chandouout,
leftrotate,
rightrotate,
leftfront,
leftbehind,
rightbehind,
rightfront,
brokenflag,
jijuleft,
jijuright,
jijutight,
jijuloose
from nextaspectrum;