2022-03-30

Spark Query using Inner join instead of full join

can anyone explain the below behaviour in spark sql join. It does not matter whether I am using full_join/full_outer/left/left_outer, the physical plan always shows that Inner join is being used..

q1 = spark.sql("select count(*) from table_t1 t1 full join table_t1 t2 on t1.anchor_page_id = t2.anchor_page_id and t1.item_id = t2.item_id and t1.store_id = t2.store_id where t1.date_id = '20220323' and t2.date_id = '20220324'")
q1.explain()

== Physical Plan ==
*(6) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(5) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(5) Project
         +- *(5) SortMergeJoin [anchor_page_id#1, item_id#2, store_id#5], [anchor_page_id#19, item_id#20, store_id#23], Inner
            :- *(2) Sort [anchor_page_id#1 ASC NULLS FIRST, item_id#2 ASC NULLS FIRST, store_id#5 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(anchor_page_id#1, item_id#2, store_id#5, 200)
            :     +- *(1) Project [anchor_page_id#1, item_id#2, store_id#5]
            :        +- *(1) Filter ((isnotnull(item_id#2) && isnotnull(anchor_page_id#1)) && isnotnull(store_id#5))
            :           +- *(1) FileScan parquet table_t1[anchor_page_id#1,item_id#2,store_id#5,date_id#18] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[gs://abc..., PartitionCount: 1, PartitionFilters: [isnotnull(date_id#18), (date_id#18 = 20220323)], PushedFilters: [IsNotNull(item_id), IsNotNull(anchor_page_id), IsNotNull(store_id)], ReadSchema: struct<anchor_page_id:string,item_id:string,store_id:string>
            +- *(4) Sort [anchor_page_id#19 ASC NULLS FIRST, item_id#20 ASC NULLS FIRST, store_id#23 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(anchor_page_id#19, item_id#20, store_id#23, 200)
                  +- *(3) Project [anchor_page_id#19, item_id#20, store_id#23]
                     +- *(3) Filter ((isnotnull(anchor_page_id#19) && isnotnull(item_id#20)) && isnotnull(store_id#23))
                        +- *(3) FileScan parquet table_t1[anchor_page_id#19,item_id#20,store_id#23,date_id#36] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[gs://abc..., PartitionCount: 1, PartitionFilters: [isnotnull(date_id#36), (date_id#36 = 20220324)], PushedFilters: [IsNotNull(anchor_page_id), IsNotNull(item_id), IsNotNull(store_id)], ReadSchema: struct<anchor_page_id:string,item_id:string,store_id:string>
>>>


q2 = spark.sql("select count(*) from table_t1 t1 full outer join table_t1 t2 on t1.anchor_page_id = t2.anchor_page_id and t1.item_id = t2.item_id and t1.store_id = t2.store_id where t1.date_id = '20220323' and t2.date_id = '20220324'")

q2.explain()
== Physical Plan ==
*(6) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(5) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(5) Project
         +- *(5) SortMergeJoin [anchor_page_id#1, item_id#2, store_id#5], [anchor_page_id#42, item_id#43, store_id#46], Inner
            :- *(2) Sort [anchor_page_id#1 ASC NULLS FIRST, item_id#2 ASC NULLS FIRST, store_id#5 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(anchor_page_id#1, item_id#2, store_id#5, 200)
            :     +- *(1) Project [anchor_page_id#1, item_id#2, store_id#5]
            :        +- *(1) Filter ((isnotnull(item_id#2) && isnotnull(anchor_page_id#1)) && isnotnull(store_id#5))
            :           +- *(1) FileScan parquet table_t1[anchor_page_id#1,item_id#2,store_id#5,date_id#18] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[gs://abc..., PartitionCount: 1, PartitionFilters: [isnotnull(date_id#18), (date_id#18 = 20220323)], PushedFilters: [IsNotNull(item_id), IsNotNull(anchor_page_id), IsNotNull(store_id)], ReadSchema: struct<anchor_page_id:string,item_id:string,store_id:string>
            +- *(4) Sort [anchor_page_id#42 ASC NULLS FIRST, item_id#43 ASC NULLS FIRST, store_id#46 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(anchor_page_id#42, item_id#43, store_id#46, 200)
                  +- *(3) Project [anchor_page_id#42, item_id#43, store_id#46]
                     +- *(3) Filter ((isnotnull(store_id#46) && isnotnull(anchor_page_id#42)) && isnotnull(item_id#43))
                        +- *(3) FileScan parquet table_t1[anchor_page_id#42,item_id#43,store_id#46,date_id#59] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[gs://abc..., PartitionCount: 1, PartitionFilters: [isnotnull(date_id#59), (date_id#59 = 20220324)], PushedFilters: [IsNotNull(store_id), IsNotNull(anchor_page_id), IsNotNull(item_id)], ReadSchema: struct<anchor_page_id:string,item_id:string,store_id:string>
>>>


q3 = spark.sql("select count(*) from table_t1 t1 left join table_t1 t2 on t1.anchor_page_id = t2.anchor_page_id and t1.item_id = t2.item_id and t1.store_id = t2.store_id where t1.date_id = 20220323 and t2.date_id = 20220324")

q3.explain()

== Physical Plan ==
*(6) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(5) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(5) Project
         +- *(5) SortMergeJoin [anchor_page_id#1, item_id#2, store_id#5], [anchor_page_id#65, item_id#66, store_id#69], Inner
            :- *(2) Sort [anchor_page_id#1 ASC NULLS FIRST, item_id#2 ASC NULLS FIRST, store_id#5 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(anchor_page_id#1, item_id#2, store_id#5, 200)
            :     +- *(1) Project [anchor_page_id#1, item_id#2, store_id#5]
            :        +- *(1) Filter ((isnotnull(item_id#2) && isnotnull(anchor_page_id#1)) && isnotnull(store_id#5))
            :           +- *(1) FileScan parquet table_t1[anchor_page_id#1,item_id#2,store_id#5,date_id#18] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[gs://abc..., PartitionCount: 1, PartitionFilters: [isnotnull(date_id#18), (cast(date_id#18 as int) = 20220323)], PushedFilters: [IsNotNull(item_id), IsNotNull(anchor_page_id), IsNotNull(store_id)], ReadSchema: struct<anchor_page_id:string,item_id:string,store_id:string>
            +- *(4) Sort [anchor_page_id#65 ASC NULLS FIRST, item_id#66 ASC NULLS FIRST, store_id#69 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(anchor_page_id#65, item_id#66, store_id#69, 200)
                  +- *(3) Project [anchor_page_id#65, item_id#66, store_id#69]
                     +- *(3) Filter ((isnotnull(item_id#66) && isnotnull(store_id#69)) && isnotnull(anchor_page_id#65))
                        +- *(3) FileScan parquet table_t1[anchor_page_id#65,item_id#66,store_id#69,date_id#82] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[gs://abc..., PartitionCount: 1, PartitionFilters: [isnotnull(date_id#82), (cast(date_id#82 as int) = 20220324)], PushedFilters: [IsNotNull(item_id), IsNotNull(store_id), IsNotNull(anchor_page_id)], ReadSchema: struct<anchor_page_id:string,item_id:string,store_id:string>


q4 = spark.sql("select count(*) from table_t1 t1 left outer join table_t1 t2 on t1.anchor_page_id = t2.anchor_page_id and t1.item_id = t2.item_id and t1.store_id = t2.store_id where t1.date_id = 20220323 and t2.date_id = 20220324")

q4.explain()

== Physical Plan ==
*(6) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *(5) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(5) Project
         +- *(5) SortMergeJoin [anchor_page_id#1, item_id#2, store_id#5], [anchor_page_id#88, item_id#89, store_id#92], Inner
            :- *(2) Sort [anchor_page_id#1 ASC NULLS FIRST, item_id#2 ASC NULLS FIRST, store_id#5 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(anchor_page_id#1, item_id#2, store_id#5, 200)
            :     +- *(1) Project [anchor_page_id#1, item_id#2, store_id#5]
            :        +- *(1) Filter ((isnotnull(item_id#2) && isnotnull(anchor_page_id#1)) && isnotnull(store_id#5))
            :           +- *(1) FileScan parquet table_t1[anchor_page_id#1,item_id#2,store_id#5,date_id#18] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[gs://abc..., PartitionCount: 1, PartitionFilters: [isnotnull(date_id#18), (cast(date_id#18 as int) = 20220323)], PushedFilters: [IsNotNull(item_id), IsNotNull(anchor_page_id), IsNotNull(store_id)], ReadSchema: struct<anchor_page_id:string,item_id:string,store_id:string>
            +- *(4) Sort [anchor_page_id#88 ASC NULLS FIRST, item_id#89 ASC NULLS FIRST, store_id#92 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(anchor_page_id#88, item_id#89, store_id#92, 200)
                  +- *(3) Project [anchor_page_id#88, item_id#89, store_id#92]
                     +- *(3) Filter ((isnotnull(store_id#92) && isnotnull(item_id#89)) && isnotnull(anchor_page_id#88))
                        +- *(3) FileScan parquet table_t1[anchor_page_id#88,item_id#89,store_id#92,date_id#105] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[gs://abc..., PartitionCount: 1, PartitionFilters: [isnotnull(date_id#105), (cast(date_id#105 as int) = 20220324)], PushedFilters: [IsNotNull(store_id), IsNotNull(item_id), IsNotNull(anchor_page_id)], ReadSchema: struct<anchor_page_id:string,item_id:string,store_id:string>


 


No comments:

Post a Comment