dense_rank函数排序问题

Viewed 43

我的需求是查询源表为debtor_id, channel_distribution, order_no相同的分配一个相同的序号order_id,SQL如下:

CREATE TABLE test.sales_order_item
(	
	id                          BIGINT (32) COMMENT '明细ID',
	order_id                    BIGINT (32) COMMENT '订单id',
	debtor_id                   BIGINT (32) COMMENT '企业id',
	order_no                    VARCHAR(256) COMMENT '订单号',
	channel_distribution        VARCHAR(256) COMMENT '分销渠道',
	goods_no                    VARCHAR(256) COMMENT '商品代码',
	qty                         INT ( 11 ) COMMENT '商品数量'
) UNIQUE KEY(id)
COMMENT '销售明细'
DISTRIBUTED BY HASH(id) BUCKETS 32
PROPERTIES ('replication_num' = '1');

insert overwrite table test.sales_order_item(id, order_id, debtor_id, order_no, channel_distribution, goods_no, qty)
select
	row_number() over (order by debtor_id, channel_distribution, order_no, goods_no) as id,
	dense_rank() over (order by debtor_id, channel_distribution, order_no) as order_id,
	debtor_id, order_no, channel_distribution, goods_no, qty
from (
	select
		a.ent_id as debtor_id
		,a.ent_order_no as order_no
		,'TO C' as channel_distribution
		,b.bar_code as goods_no
		,sum(b.qty) as qty
	from ods_gel_pro.doc_sale_order_to_c a
	inner join ods_gel_pro.doc_sale_order_to_c_goods b on a.id = b.sale_order_to_c_id
	group by 1, 2, 3, 4
) t;

源表数据量有700多万,执行sql插入目标表后再验证数据的时候发行dense_rank排序不太对,相同debtor_id, channel_distribution, order_no的数据有多个order_id。源表只有几万数据倒是没看出问题,数量一多问题就出来了。

select debtor_id, order_no, channel_distribution, group_concat(distinct cast(order_id as string)) as order_ids from test.sales_order_item
group by debtor_id, order_no, channel_distribution
having count(distinct order_id) > 1

以下是explain

PLAN FRAGMENT 0
  OUTPUT EXPRS:
    id[#89]
    order_id[#90]
    debtor_id[#82]
    order_no[#83]
    channel_distribution[#84]
    goods_no[#85]
    qty[#86]
    __DORIS_DELETE_SIGN__[#87]
    __DORIS_VERSION_COL__[#88]
  PARTITION: UNPARTITIONED

  HAS_COLO_PLAN_NODE: false

  OLAP TABLE SINK
    TUPLE ID: 12
    RANDOM
    IS_PARTIAL_UPDATE: false
  10:VANALYTIC(649)
  |  functions: [dense_rank()]
  |  order by: debtor_id[#82] ASC NULLS FIRST, order_no[#83] ASC NULLS FIRST
  |  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  |  distribute expr lists: 
  |  
  9:VANALYTIC(646)
  |  functions: [row_number()]
  |  order by: debtor_id[#82] ASC NULLS FIRST, order_no[#83] ASC NULLS FIRST, goods_no[#85] ASC NULLS FIRST
  |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  |  distribute expr lists: 
  |  
  8:VMERGING-EXCHANGE
     offset: 0
     distribute expr lists: debtor_id[#82], order_no[#83], goods_no[#85]

PLAN FRAGMENT 1

  PARTITION: HASH_PARTITIONED: ent_id[#67], ent_order_no[#68], bar_code[#69]

  HAS_COLO_PLAN_NODE: true

  STREAM DATA SINK
    EXCHANGE ID: 08
    UNPARTITIONED

  7:VSORT(637)
  |  order by: debtor_id[#82] ASC, order_no[#83] ASC, goods_no[#85] ASC
  |  algorithm: full sort
  |  offset: 0
  |  distribute expr lists: 
  |  
  6:VAGGREGATE (merge finalize)(631)
  |  output: sum(partial_sum(qty)[#70])[#74]
  |  group by: ent_id[#67], ent_order_no[#68], bar_code[#69]
  |  sortByGroupKey:false
  |  cardinality=9,197,542
  |  final projections: ent_id[#71], ent_order_no[#72], 'TO C', bar_code[#73], CAST(qty[#74] AS int), 0, 0
  |  final project output tuple id: 8
  |  distribute expr lists: ent_id[#67], ent_order_no[#68], bar_code[#69]
  |  
  5:VEXCHANGE
     offset: 0
     distribute expr lists: 

PLAN FRAGMENT 2

  PARTITION: HASH_PARTITIONED: id[#18]

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 05
    HASH_PARTITIONED: ent_id[#67], ent_order_no[#68], bar_code[#69]

  4:VAGGREGATE (update serialize)(625)
  |  STREAMING
  |  output: partial_sum(qty[#66])[#70]
  |  group by: ent_id[#63], ent_order_no[#64], bar_code[#65]
  |  sortByGroupKey:false
  |  cardinality=9,197,542
  |  distribute expr lists: 
  |  
  3:VHASH JOIN(619)
  |  join op: INNER JOIN(BUCKET_SHUFFLE)[]
  |  equal join conjunct: (id[#54] = sale_order_to_c_id[#15])
  |  runtime filters: RF000[in_or_bloom] <- sale_order_to_c_id[#15](5054499/8388608/8388608)
  |  cardinality=9,197,542
  |  vec output tuple id: 5
  |  output tuple id: 5
  |  vIntermediate tuple ids: 4 
  |  hash output slot ids: 16 17 55 56 
  |  final projections: ent_id[#58], ent_order_no[#59], bar_code[#61], qty[#62]
  |  final project output tuple id: 5
  |  distribute expr lists: id[#54]
  |  distribute expr lists: sale_order_to_c_id[#15]
  |  
  |----1:VEXCHANGE
  |       offset: 0
  |       distribute expr lists: 
  |    
  2:VOlapScanNode(600)
     TABLE: ods_gel_pro.doc_sale_order_to_c(doc_sale_order_to_c), PREAGGREGATION: OFF. Reason: __DORIS_DELETE_SIGN__ is used as conjuncts.
     PREDICATES: (__DORIS_DELETE_SIGN__[#51] = 0)
     runtime filters: RF000[in_or_bloom] -> id[#18]
     partitions=1/1 (doc_sale_order_to_c)
     tablets=32/32, tabletList=21454901,21454903,21454905 ...
     cardinality=5822327, avgRowSize=321.74512, numNodes=3
     pushAggOp=NONE
     final projections: id[#18], ent_id[#19], ent_order_no[#20]
     final project output tuple id: 3

PLAN FRAGMENT 3

  PARTITION: HASH_PARTITIONED: id[#0]

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 01
    BUCKET_SHFFULE_HASH_PARTITIONED: sale_order_to_c_id[#15]

  0:VOlapScanNode(607)
     TABLE: ods_gel_pro.doc_sale_order_to_c_goods(doc_sale_order_to_c_goods), PREAGGREGATION: ON
     PREDICATES: (__DORIS_DELETE_SIGN__[#12] = 0)
     partitions=1/1 (doc_sale_order_to_c_goods)
     tablets=32/32, tabletList=21455270,21455272,21455274 ...
     cardinality=8083347, avgRowSize=67.15573, numNodes=3
     pushAggOp=NONE
     final projections: sale_order_to_c_id[#1], bar_code[#3], qty[#4]
     final project output tuple id: 1


========== MATERIALIZATIONS ==========


Statistics
 planed with unknown column statistics
1 Answers

按照这个描述来看是查询结果不一致是吧,这个能提供一个最小复现成本的case吗?
或者提供下查询结果不一致的对比的 profile 和 explain。我们进一步分析下的。