Doris sql 语法上使用 where not in 嵌套子查询结果异常

Viewed 88

背景:
我们的一款数据产品从doris 1.2.4 升级到2.1.6 。升级后发现有个模板拼接逻辑异常。如下sql1 所示(异常为多次执行结果不唯一,且结果不正确):

 select count(*) from 
(
 select
        F0.userid,
        F0.deviceid_recommend as deviceid_recommend,
        F0.openid_recommend as openid_recommend,
        F0.phone_recommend
    from
        database.table_a F0
        join (
            select
                distinct F1.userid
            from
                database.table_b F1
            where
                1 = 1
                and (F1.visit_circleid_list_30d is not null)
        ) T on F0.userid = T.userid
    where
        1 = 1
        and F0.userid not in (
            select
                E0.userid
            from
                (
                    select
                        distinct F1.userid
                    from
                        database.table_b F1
                    where
                        1 = 1
                        and (F1.visit_circleid_list_30d = '') 
                ) E0
        )
) tmp

上面sql1 在1.2.4 版本执行正常,升级到2.1.6 后执行异常.且我在测试环境搭建了2.1.7以及在另一个正式的2.1.6 上执行sql 也发现了异常。
现在的处理逻辑是修改了拼接逻辑,如下sql2:

 select count(*) from 
(
 select
        F0.userid,
        F0.deviceid_recommend as deviceid_recommend,
        F0.openid_recommend as openid_recommend,
        F0.phone_recommend
    from
        bi_label.label_dim_usr_oneid_user_d F0
        join (
            select
                distinct F1.userid
            from
                bi_label.label_stc_usr_p5_d F1
            where
                1 = 1
                and (F1.visit_circleid_list_30d is not null)
        ) T on F0.userid = T.userid 
        where 1 = 1  
        and  not exists (
            select
                1
            from
                (
                    select
                        distinct F1.userid
                    from
                        bi_label.label_stc_usr_p5_d F1
                    where
                        1 = 1
                        and (F1.visit_circleid_list_30d = '')
                ) E0
                where F0.userid = E0.userid 
        )
) tmp 

修改后结果正常。且若逻辑修改为sql3 也是正常,如下图:

  select count(*) from 
(
 select
        F0.userid,
        F0.deviceid_recommend as deviceid_recommend,
        F0.openid_recommend as openid_recommend,
        F0.phone_recommend
    from
        database.table_a F0
        join (
            select
                distinct F1.userid
            from
                database.table_b F1
            where
                1 = 1
                and (F1.visit_circleid_list_30d is not null)
        ) T on F0.userid = T.userid
    where
        1 = 1
        and F0.userid  in (
            select
                E0.userid
            from
                (
                    select
                        distinct F1.userid
                    from
                        database.table_b F1
                    where
                        1 = 1
                        and (F1.visit_circleid_list_30d != '') 
                ) E0
        )
) tmp 

现在不知道这个问题到底是什么,是否是当前doris 存在bug.
已经做过的措施:
1.1 删除表重新建立
1.2 确认表中关联与过滤字段没有null 值。
附带 sql1,sql2,sql3 的执行计划 ,请各位大佬帮我看下:

sql_plan1:

Explain String(Old Planner)
PLAN FRAGMENT 0
  OUTPUT EXPRS:
    count(*)
  PARTITION: UNPARTITIONED

  HAS_COLO_PLAN_NODE: false

  VRESULT SINK
     MYSQL_PROTOCAL

  21:VAGGREGATE (merge finalize)
  |  output: count(count(*))
  |  group by: 
  |  cardinality=-1
  |  
  20:VEXCHANGE
     offset: 0

PLAN FRAGMENT 1

  PARTITION: HASH_PARTITIONED: `database`.`table_a`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 20
    UNPARTITIONED

  10:VAGGREGATE (update serialize)
  |  output: count(*)
  |  group by: 
  |  cardinality=1
  |  
  9:VHASH JOIN
  |  join op: NULL AWARE LEFT ANTI JOIN(BROADCAST)[Build side of null aware left anti join must be broadcast]
  |  equal join conjunct: (<slot 26> <slot 24> <slot 3> `F1`.`userid` = `F1`.`userid`)
  |  cardinality=135,532,419
  |  vec output tuple id: 16
  |  output tuple id: 16
  |  vIntermediate tuple ids: 21 22 
  |  output slot ids: 27 
  |  hash output slot ids: 25 
  |  
  |----19:VEXCHANGE
  |       offset: 0
  |    
  6:VHASH JOIN
  |  join op: NULL AWARE LEFT ANTI JOIN(BROADCAST)[Build side of null aware left anti join must be broadcast]
  |  equal join conjunct: (<slot 23> <slot 4> = `F1`.`userid`)
  |  cardinality=135,532,419
  |  vec output tuple id: 15
  |  output tuple id: 15
  |  vIntermediate tuple ids: 19 20 
  |  output slot ids: 25 26 
  |  hash output slot ids: 23 24 
  |  
  |----16:VEXCHANGE
  |       offset: 0
  |    
  3:VHASH JOIN
  |  join op: INNER JOIN(BUCKET_SHUFFLE)[The src data has been redistributed]
  |  equal join conjunct: (`F0`.`userid` = `F1`.`userid`)
  |  runtime filters: RF000[in_or_bloom] <- `F1`.`userid`(-1/0/2097152)
  |  cardinality=135,532,419
  |  vec output tuple id: 14
  |  output tuple id: 14
  |  vIntermediate tuple ids: 17 18 
  |  output slot ids: 23 24 
  |  hash output slot ids: 2 4 
  |  
  |----13:VEXCHANGE
  |       offset: 0
  |    
  0:VOlapScanNode
     TABLE: database.table_a(table_a), PREAGGREGATION: ON
     runtime filters: RF000[in_or_bloom] -> `F0`.`userid`
     partitions=1/1 (table_a)
     tablets=10/10, tabletList=177676438,177676442,177676446 ...
     cardinality=135532419, avgRowSize=2139.754, numNodes=8
     pushAggOp=NONE

PLAN FRAGMENT 2

  PARTITION: HASH_PARTITIONED: `F1`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 19
    UNPARTITIONED

  18:VAGGREGATE (merge finalize)
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  17:VEXCHANGE
     offset: 0

PLAN FRAGMENT 3

  PARTITION: HASH_PARTITIONED: `database`.`table_b`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 17
    HASH_PARTITIONED: `F1`.`userid`

  8:VAGGREGATE (update serialize)
  |  STREAMING
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  7:VOlapScanNode
     TABLE: database.table_b(table_b), PREAGGREGATION: ON
     PREDICATES: (`F1`.`visit_circleid_list_30d` = '')
     partitions=1/1 (table_b)
     tablets=10/10, tabletList=177754743,177754747,177754751 ...
     cardinality=135532419, avgRowSize=374.96548, numNodes=8
     pushAggOp=NONE

PLAN FRAGMENT 4

  PARTITION: HASH_PARTITIONED: `F1`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 16
    UNPARTITIONED

  15:VAGGREGATE (merge finalize)
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  14:VEXCHANGE
     offset: 0

PLAN FRAGMENT 5

  PARTITION: HASH_PARTITIONED: `database`.`table_b`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 14
    HASH_PARTITIONED: `F1`.`userid`

  5:VAGGREGATE (update serialize)
  |  STREAMING
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  4:VOlapScanNode
     TABLE: database.table_b(table_b), PREAGGREGATION: ON
     PREDICATES: (`F1`.`visit_circleid_list_30d` = '')
     partitions=1/1 (table_b)
     tablets=10/10, tabletList=177754743,177754747,177754751 ...
     cardinality=135532419, avgRowSize=374.96548, numNodes=8
     pushAggOp=NONE

PLAN FRAGMENT 6

  PARTITION: HASH_PARTITIONED: `F1`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 13
    BUCKET_SHFFULE_HASH_PARTITIONED: `F1`.`userid`

  12:VAGGREGATE (merge finalize)
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  11:VEXCHANGE
     offset: 0

PLAN FRAGMENT 7

  PARTITION: HASH_PARTITIONED: `database`.`table_b`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 11
    HASH_PARTITIONED: `F1`.`userid`

  2:VAGGREGATE (update serialize)
  |  STREAMING
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  1:VOlapScanNode
     TABLE: database.table_b(table_b), PREAGGREGATION: ON
     PREDICATES: (`F1`.`visit_circleid_list_30d` IS NOT NULL)
     partitions=1/1 (table_b)
     tablets=10/10, tabletList=177754743,177754747,177754751 ...
     cardinality=135532419, avgRowSize=374.96548, numNodes=8
     pushAggOp=NONE

sql_plan2:

Explain String(Old Planner)
PLAN FRAGMENT 0
  OUTPUT EXPRS:
    count(*)
  PARTITION: UNPARTITIONED

  HAS_COLO_PLAN_NODE: false

  VRESULT SINK
     MYSQL_PROTOCAL

  15:VAGGREGATE (merge finalize)
  |  output: count(count(*))
  |  group by: 
  |  cardinality=-1
  |  
  14:VEXCHANGE
     offset: 0

PLAN FRAGMENT 1

  PARTITION: HASH_PARTITIONED: `database`.`table_a`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 14
    UNPARTITIONED

  7:VAGGREGATE (update serialize)
  |  output: count(*)
  |  group by: 
  |  cardinality=1
  |  
  6:VHASH JOIN
  |  join op: LEFT ANTI JOIN(BUCKET_SHUFFLE)[The src data has been redistributed]
  |  equal join conjunct: (<slot 19> <slot 4> = `F1`.`userid`)
  |  cardinality=135,532,419
  |  vec output tuple id: 11
  |  output tuple id: 11
  |  vIntermediate tuple ids: 14 15 
  |  output slot ids: 21 
  |  hash output slot ids: 19 
  |  
  |----13:VEXCHANGE
  |       offset: 0
  |    
  3:VHASH JOIN
  |  join op: INNER JOIN(BUCKET_SHUFFLE)[The src data has been redistributed]
  |  equal join conjunct: (`F0`.`userid` = `F1`.`userid`)
  |  runtime filters: RF000[in_or_bloom] <- `F1`.`userid`(-1/0/2097152)
  |  cardinality=135,532,419
  |  vec output tuple id: 10
  |  output tuple id: 10
  |  vIntermediate tuple ids: 12 13 
  |  output slot ids: 19 
  |  hash output slot ids: 4 
  |  
  |----10:VEXCHANGE
  |       offset: 0
  |    
  0:VOlapScanNode
     TABLE: database.table_a(table_a), PREAGGREGATION: ON
     runtime filters: RF000[in_or_bloom] -> `F0`.`userid`
     partitions=1/1 (table_a)
     tablets=10/10, tabletList=177676438,177676442,177676446 ...
     cardinality=135532419, avgRowSize=2139.754, numNodes=8
     pushAggOp=NONE

PLAN FRAGMENT 2

  PARTITION: HASH_PARTITIONED: `F1`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 13
    BUCKET_SHFFULE_HASH_PARTITIONED: `F1`.`userid`

  12:VAGGREGATE (merge finalize)
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  11:VEXCHANGE
     offset: 0

PLAN FRAGMENT 3

  PARTITION: HASH_PARTITIONED: `database`.`table_b`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 11
    HASH_PARTITIONED: `F1`.`userid`

  5:VAGGREGATE (update serialize)
  |  STREAMING
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  4:VOlapScanNode
     TABLE: database.table_b(table_b), PREAGGREGATION: ON
     PREDICATES: (`F1`.`visit_circleid_list_30d` = '')
     partitions=1/1 (table_b)
     tablets=10/10, tabletList=177754743,177754747,177754751 ...
     cardinality=135532419, avgRowSize=374.96548, numNodes=8
     pushAggOp=NONE

PLAN FRAGMENT 4

  PARTITION: HASH_PARTITIONED: `F1`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 10
    BUCKET_SHFFULE_HASH_PARTITIONED: `F1`.`userid`

  9:VAGGREGATE (merge finalize)
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  8:VEXCHANGE
     offset: 0

PLAN FRAGMENT 5

  PARTITION: HASH_PARTITIONED: `database`.`table_b`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 08
    HASH_PARTITIONED: `F1`.`userid`

  2:VAGGREGATE (update serialize)
  |  STREAMING
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  1:VOlapScanNode
     TABLE: database.table_b(table_b), PREAGGREGATION: ON
     PREDICATES: (`F1`.`visit_circleid_list_30d` IS NOT NULL)
     partitions=1/1 (table_b)
     tablets=10/10, tabletList=177754743,177754747,177754751 ...
     cardinality=135532419, avgRowSize=374.96548, numNodes=8
     pushAggOp=NONE

sql_plan3:

Explain String(Old Planner)
PLAN FRAGMENT 0
  OUTPUT EXPRS:
    count(*)
  PARTITION: UNPARTITIONED

  HAS_COLO_PLAN_NODE: false

  VRESULT SINK
     MYSQL_PROTOCAL

  21:VAGGREGATE (merge finalize)
  |  output: count(count(*))
  |  group by: 
  |  cardinality=-1
  |  
  20:VEXCHANGE
     offset: 0

PLAN FRAGMENT 1

  PARTITION: HASH_PARTITIONED: `database`.`table_a`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 20
    UNPARTITIONED

  10:VAGGREGATE (update serialize)
  |  output: count(*)
  |  group by: 
  |  cardinality=1
  |  
  9:VHASH JOIN
  |  join op: LEFT SEMI JOIN(BROADCAST)[The src data has been redistributed]
  |  equal join conjunct: (<slot 26> <slot 24> <slot 3> `F1`.`userid` = `F1`.`userid`)
  |  cardinality=135,532,419
  |  vec output tuple id: 16
  |  output tuple id: 16
  |  vIntermediate tuple ids: 21 22 
  |  output slot ids: 27 
  |  hash output slot ids: 25 
  |  
  |----19:VEXCHANGE
  |       offset: 0
  |    
  6:VHASH JOIN
  |  join op: LEFT SEMI JOIN(BUCKET_SHUFFLE)[The src data has been redistributed]
  |  equal join conjunct: (<slot 23> <slot 4> = `F1`.`userid`)
  |  runtime filters: RF000[in_or_bloom] <- `F1`.`userid`(-1/0/2097152)
  |  cardinality=135,532,419
  |  vec output tuple id: 15
  |  output tuple id: 15
  |  vIntermediate tuple ids: 19 20 
  |  output slot ids: 25 26 
  |  hash output slot ids: 23 24 
  |  
  |----16:VEXCHANGE
  |       offset: 0
  |    
  3:VHASH JOIN
  |  join op: INNER JOIN(BUCKET_SHUFFLE)[The src data has been redistributed]
  |  equal join conjunct: (`F0`.`userid` = `F1`.`userid`)
  |  runtime filters: RF001[in_or_bloom] <- `F1`.`userid`(-1/0/2097152)
  |  cardinality=135,532,419
  |  vec output tuple id: 14
  |  output tuple id: 14
  |  vIntermediate tuple ids: 17 18 
  |  output slot ids: 23 24 
  |  hash output slot ids: 2 4 
  |  
  |----13:VEXCHANGE
  |       offset: 0
  |    
  0:VOlapScanNode
     TABLE: database.table_a(table_a), PREAGGREGATION: ON
     runtime filters: RF000[in_or_bloom] -> <slot 4>, RF001[in_or_bloom] -> `F0`.`userid`
     partitions=1/1 (table_a)
     tablets=10/10, tabletList=177676438,177676442,177676446 ...
     cardinality=135532419, avgRowSize=2139.7542, numNodes=8
     pushAggOp=NONE

PLAN FRAGMENT 2

  PARTITION: HASH_PARTITIONED: `F1`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 19
    UNPARTITIONED

  18:VAGGREGATE (merge finalize)
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  17:VEXCHANGE
     offset: 0

PLAN FRAGMENT 3

  PARTITION: HASH_PARTITIONED: `database`.`table_b`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 17
    HASH_PARTITIONED: `F1`.`userid`

  8:VAGGREGATE (update serialize)
  |  STREAMING
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  7:VOlapScanNode
     TABLE: database.table_b(table_b), PREAGGREGATION: ON
     PREDICATES: (`F1`.`visit_circleid_list_30d` != '')
     partitions=1/1 (table_b)
     tablets=10/10, tabletList=177754743,177754747,177754751 ...
     cardinality=135532419, avgRowSize=374.96548, numNodes=8
     pushAggOp=NONE

PLAN FRAGMENT 4

  PARTITION: HASH_PARTITIONED: `F1`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 16
    BUCKET_SHFFULE_HASH_PARTITIONED: `F1`.`userid`

  15:VAGGREGATE (merge finalize)
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  14:VEXCHANGE
     offset: 0

PLAN FRAGMENT 5

  PARTITION: HASH_PARTITIONED: `database`.`table_b`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 14
    HASH_PARTITIONED: `F1`.`userid`

  5:VAGGREGATE (update serialize)
  |  STREAMING
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  4:VOlapScanNode
     TABLE: database.table_b(table_b), PREAGGREGATION: ON
     PREDICATES: (`F1`.`visit_circleid_list_30d` != '')
     partitions=1/1 (table_b)
     tablets=10/10, tabletList=177754743,177754747,177754751 ...
     cardinality=135532419, avgRowSize=374.96548, numNodes=8
     pushAggOp=NONE

PLAN FRAGMENT 6

  PARTITION: HASH_PARTITIONED: `F1`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 13
    BUCKET_SHFFULE_HASH_PARTITIONED: `F1`.`userid`

  12:VAGGREGATE (merge finalize)
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  11:VEXCHANGE
     offset: 0

PLAN FRAGMENT 7

  PARTITION: HASH_PARTITIONED: `database`.`table_b`.`userid`

  HAS_COLO_PLAN_NODE: false

  STREAM DATA SINK
    EXCHANGE ID: 11
    HASH_PARTITIONED: `F1`.`userid`

  2:VAGGREGATE (update serialize)
  |  STREAMING
  |  group by: `F1`.`userid`
  |  cardinality=-1
  |  
  1:VOlapScanNode
     TABLE: database.table_b(table_b), PREAGGREGATION: ON
     PREDICATES: (`F1`.`visit_circleid_list_30d` IS NOT NULL)
     partitions=1/1 (table_b)
     tablets=10/10, tabletList=177754743,177754747,177754751 ...
     cardinality=135532419, avgRowSize=374.96548, numNodes=8
     pushAggOp=NONE
2 Answers

【问题状态】已经定位
【问题处理】这里有两个join,先执行 LEFT ANTI JOIN 后执行 INNER JOIN 就是对的,反之则是错的。目前可以手动指定来规避,待修复

set enable_nereids_planner=true;
看下开启新优化器之后,执行的结果是否有错误?
enable_pipeline_engine, enable_pipeline_x_engine这两个变量结果如何?如果为false,先把enable_pipeline_engine调整为true看下结果,再把enable_pipeline_x_engine调整为true看下结果,是否变正确了?
如果sql1仍然错误,那么set parallel_pipeline_task_num=1;拿一下sql1,sql2和sql3的profile,我来分析一下