我的需求是查询源表为debtor_id, channel_distribution, order_no相同的分配一个相同的序号order_id,SQL如下:
CREATE TABLE test.sales_order_item
(
id BIGINT (32) COMMENT '明细ID',
order_id BIGINT (32) COMMENT '订单id',
debtor_id BIGINT (32) COMMENT '企业id',
order_no VARCHAR(256) COMMENT '订单号',
channel_distribution VARCHAR(256) COMMENT '分销渠道',
goods_no VARCHAR(256) COMMENT '商品代码',
qty INT ( 11 ) COMMENT '商品数量'
) UNIQUE KEY(id)
COMMENT '销售明细'
DISTRIBUTED BY HASH(id) BUCKETS 32
PROPERTIES ('replication_num' = '1');
insert overwrite table test.sales_order_item(id, order_id, debtor_id, order_no, channel_distribution, goods_no, qty)
select
row_number() over (order by debtor_id, channel_distribution, order_no, goods_no) as id,
dense_rank() over (order by debtor_id, channel_distribution, order_no) as order_id,
debtor_id, order_no, channel_distribution, goods_no, qty
from (
select
a.ent_id as debtor_id
,a.ent_order_no as order_no
,'TO C' as channel_distribution
,b.bar_code as goods_no
,sum(b.qty) as qty
from ods_gel_pro.doc_sale_order_to_c a
inner join ods_gel_pro.doc_sale_order_to_c_goods b on a.id = b.sale_order_to_c_id
group by 1, 2, 3, 4
) t;
源表数据量有700多万,执行sql插入目标表后再验证数据的时候发行dense_rank排序不太对,相同debtor_id, channel_distribution, order_no的数据有多个order_id。源表只有几万数据倒是没看出问题,数量一多问题就出来了。
select debtor_id, order_no, channel_distribution, group_concat(distinct cast(order_id as string)) as order_ids from test.sales_order_item
group by debtor_id, order_no, channel_distribution
having count(distinct order_id) > 1
以下是explain
PLAN FRAGMENT 0
OUTPUT EXPRS:
id[#89]
order_id[#90]
debtor_id[#82]
order_no[#83]
channel_distribution[#84]
goods_no[#85]
qty[#86]
__DORIS_DELETE_SIGN__[#87]
__DORIS_VERSION_COL__[#88]
PARTITION: UNPARTITIONED
HAS_COLO_PLAN_NODE: false
OLAP TABLE SINK
TUPLE ID: 12
RANDOM
IS_PARTIAL_UPDATE: false
10:VANALYTIC(649)
| functions: [dense_rank()]
| order by: debtor_id[#82] ASC NULLS FIRST, order_no[#83] ASC NULLS FIRST
| window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
| distribute expr lists:
|
9:VANALYTIC(646)
| functions: [row_number()]
| order by: debtor_id[#82] ASC NULLS FIRST, order_no[#83] ASC NULLS FIRST, goods_no[#85] ASC NULLS FIRST
| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
| distribute expr lists:
|
8:VMERGING-EXCHANGE
offset: 0
distribute expr lists: debtor_id[#82], order_no[#83], goods_no[#85]
PLAN FRAGMENT 1
PARTITION: HASH_PARTITIONED: ent_id[#67], ent_order_no[#68], bar_code[#69]
HAS_COLO_PLAN_NODE: true
STREAM DATA SINK
EXCHANGE ID: 08
UNPARTITIONED
7:VSORT(637)
| order by: debtor_id[#82] ASC, order_no[#83] ASC, goods_no[#85] ASC
| algorithm: full sort
| offset: 0
| distribute expr lists:
|
6:VAGGREGATE (merge finalize)(631)
| output: sum(partial_sum(qty)[#70])[#74]
| group by: ent_id[#67], ent_order_no[#68], bar_code[#69]
| sortByGroupKey:false
| cardinality=9,197,542
| final projections: ent_id[#71], ent_order_no[#72], 'TO C', bar_code[#73], CAST(qty[#74] AS int), 0, 0
| final project output tuple id: 8
| distribute expr lists: ent_id[#67], ent_order_no[#68], bar_code[#69]
|
5:VEXCHANGE
offset: 0
distribute expr lists:
PLAN FRAGMENT 2
PARTITION: HASH_PARTITIONED: id[#18]
HAS_COLO_PLAN_NODE: false
STREAM DATA SINK
EXCHANGE ID: 05
HASH_PARTITIONED: ent_id[#67], ent_order_no[#68], bar_code[#69]
4:VAGGREGATE (update serialize)(625)
| STREAMING
| output: partial_sum(qty[#66])[#70]
| group by: ent_id[#63], ent_order_no[#64], bar_code[#65]
| sortByGroupKey:false
| cardinality=9,197,542
| distribute expr lists:
|
3:VHASH JOIN(619)
| join op: INNER JOIN(BUCKET_SHUFFLE)[]
| equal join conjunct: (id[#54] = sale_order_to_c_id[#15])
| runtime filters: RF000[in_or_bloom] <- sale_order_to_c_id[#15](5054499/8388608/8388608)
| cardinality=9,197,542
| vec output tuple id: 5
| output tuple id: 5
| vIntermediate tuple ids: 4
| hash output slot ids: 16 17 55 56
| final projections: ent_id[#58], ent_order_no[#59], bar_code[#61], qty[#62]
| final project output tuple id: 5
| distribute expr lists: id[#54]
| distribute expr lists: sale_order_to_c_id[#15]
|
|----1:VEXCHANGE
| offset: 0
| distribute expr lists:
|
2:VOlapScanNode(600)
TABLE: ods_gel_pro.doc_sale_order_to_c(doc_sale_order_to_c), PREAGGREGATION: OFF. Reason: __DORIS_DELETE_SIGN__ is used as conjuncts.
PREDICATES: (__DORIS_DELETE_SIGN__[#51] = 0)
runtime filters: RF000[in_or_bloom] -> id[#18]
partitions=1/1 (doc_sale_order_to_c)
tablets=32/32, tabletList=21454901,21454903,21454905 ...
cardinality=5822327, avgRowSize=321.74512, numNodes=3
pushAggOp=NONE
final projections: id[#18], ent_id[#19], ent_order_no[#20]
final project output tuple id: 3
PLAN FRAGMENT 3
PARTITION: HASH_PARTITIONED: id[#0]
HAS_COLO_PLAN_NODE: false
STREAM DATA SINK
EXCHANGE ID: 01
BUCKET_SHFFULE_HASH_PARTITIONED: sale_order_to_c_id[#15]
0:VOlapScanNode(607)
TABLE: ods_gel_pro.doc_sale_order_to_c_goods(doc_sale_order_to_c_goods), PREAGGREGATION: ON
PREDICATES: (__DORIS_DELETE_SIGN__[#12] = 0)
partitions=1/1 (doc_sale_order_to_c_goods)
tablets=32/32, tabletList=21455270,21455272,21455274 ...
cardinality=8083347, avgRowSize=67.15573, numNodes=3
pushAggOp=NONE
final projections: sale_order_to_c_id[#1], bar_code[#3], qty[#4]
final project output tuple id: 1
========== MATERIALIZATIONS ==========
Statistics
planed with unknown column statistics