Hi all,
Anyone knows why Spark SQL is not using Hive buckets pruning when reading from 
bucketed Hive table?
[SPARK-40206] Spark SQL Predict Pushdown for Hive Bucketed Table - ASF JIRA 
(apache.org)<https://issues.apache.org/jira/browse/SPARK-40206>

Details also provided at the end of mail.

Regards,
Raymond


Hi team,

I was testing out Hive bucket table features.  One of the benefits as most 
documentation suggested is that bucketed hive table can be used for query 
filer/predict pushdown to improve query performance.

However through my exploration, that doesn't seem to be true. Can you please 
help to clarify if Spark SQL supports query optimizations when using Hive 
bucketed table?



How to produce the issue:

Create a Hive 3 table using the following DDL:

create table test_db.bucket_table(user_id int, key string)

comment 'A bucketed table'

partitioned by(country string)

clustered by(user_id) sorted by (key) into 10 buckets

stored as ORC;

And then insert into this table using the following PySpark script:

from pyspark.sql import SparkSession



appName = "PySpark Hive Bucketing Example"

master = "local"



# Create Spark session with Hive supported.

spark = SparkSession.builder \

    .appName(appName) \

    .master(master) \

    .enableHiveSupport() \

    .getOrCreate()



# prepare sample data for inserting into hive table

data = []

countries = ['CN', 'AU']

for i in range(0, 1000):

    data.append([int(i),  'U'+str(i), countries[i % 2]])



df = spark.createDataFrame(data, ['user_id', 'key', 'country'])

df.show()



# Save df to Hive table test_db.bucket_table



df.write.mode('append').insertInto('test_db.bucket_table')

Then query the table using the following script:

from pyspark.sql import SparkSession



appName = "PySpark Hive Bucketing Example"

master = "local"



# Create Spark session with Hive supported.

spark = SparkSession.builder \

    .appName(appName) \

    .master(master) \

    .enableHiveSupport() \

    .getOrCreate()



df = spark.sql("""select * from test_db.bucket_table

where country='AU' and user_id=101

""")

df.show()

df.explain(extended=True)

I am expecting to read from only one bucket file in HDFS but instead Spark 
scanned all bucket files in partition folder country=AU.

== Parsed Logical Plan ==

'Project [*]

 - 'Filter (('country = AU) AND ('t1.user_id = 101))

    - 'SubqueryAlias t1

       - 'UnresolvedRelation [test_db, bucket_table], [], false



== Analyzed Logical Plan ==

user_id: int, key: string, country: string

Project [user_id#20, key#21, country#22]

 - Filter ((country#22 = AU) AND (user_id#20 = 101))

    - SubqueryAlias t1

       - SubqueryAlias spark_catalog.test_db.bucket_table

          - Relation test_db.bucket_table[user_id#20,key#21,country#22] orc



== Optimized Logical Plan ==

Filter (((isnotnull(country#22) AND isnotnull(user_id#20)) AND (country#22 = 
AU)) AND (user_id#20 = 101))

 - Relation test_db.bucket_table[user_id#20,key#21,country#22] orc



== Physical Plan ==

*(1) Filter (isnotnull(user_id#20) AND (user_id#20 = 101))

 - *(1) ColumnarToRow

    - FileScan orc test_db.bucket_table[user_id#20,key#21,country#22] Batched: 
true, DataFilters: [isnotnull(user_id#20), (user_id#20 = 101)], Format: ORC, 
Location: InMemoryFileIndex(1 
paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/bucket_table/coun...,
 PartitionFilters: [isnotnull(country#22), (country#22 = AU)], PushedFilters: 
[IsNotNull(user_id), EqualTo(user_id,101)], ReadSchema: 
struct<user_id:int,key:string>

Am I doing something wrong? or is it because Spark doesn't support it? Your 
guidance and help will be appreciated.




Reply via email to