jackyjfhu opened a new issue, #6943:
URL: https://github.com/apache/kyuubi/issues/6943

   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   
   
   ### Search before asking
   
   - [x] I have searched in the 
[issues](https://github.com/apache/kyuubi/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Describe the bug
   
   kyuuiby:1.9.0
   Spark:3.4.2
   
   Currently, when connecting to hivemetastore and Spark through 
kyuubi-hive-connector and testing performance with TPC-DS, it is found that 
some SQL cannot be dynamically partitioned.
   
   q33.sql
   `with ss as (
    select
             i_manufact_id,sum(ss_ext_sales_price) total_sales
    from
       store_sales,
       date_dim,
            customer_address,
            item
    where
            i_manufact_id in (select
     i_manufact_id
   from
    item
   where i_category in ('Home'))
    and     ss_item_sk              = i_item_sk
    and     ss_sold_date_sk         = d_date_sk
    and     d_year                  = 1998
    and     d_moy                   = 5
    and     ss_addr_sk              = ca_address_sk
    and     ca_gmt_offset           = -6 
    group by i_manufact_id),
    cs as (
    select
             i_manufact_id,sum(cs_ext_sales_price) total_sales
    from
       catalog_sales,
       date_dim,
            customer_address,
            item
    where
            i_manufact_id               in (select
     i_manufact_id
   from
    item
   where i_category in ('Home'))
    and     cs_item_sk              = i_item_sk
    and     cs_sold_date_sk         = d_date_sk
    and     d_year                  = 1998
    and     d_moy                   = 5
    and     cs_bill_addr_sk         = ca_address_sk
    and     ca_gmt_offset           = -6 
    group by i_manufact_id),
    ws as (
    select
             i_manufact_id,sum(ws_ext_sales_price) total_sales
    from
       web_sales,
       date_dim,
            customer_address,
            item
    where
            i_manufact_id               in (select
     i_manufact_id
   from
    item
   where i_category in ('Home'))
    and     ws_item_sk              = i_item_sk
    and     ws_sold_date_sk         = d_date_sk
    and     d_year                  = 1998
    and     d_moy                   = 5
    and     ws_bill_addr_sk         = ca_address_sk
    and     ca_gmt_offset           = -6
    group by i_manufact_id)
     select  i_manufact_id ,sum(total_sales) total_sales
    from  (select * from ss 
           union all
           select * from cs 
           union all
           select * from ws) tmp1
    group by i_manufact_id
    order by total_sales
   limit 100`
   
   If kyuubi-hive-connecotor is not used, the execution plan will have dynamic 
partition pruning
   
   <img width="802" alt="Image" 
src="https://github.com/user-attachments/assets/e2bca6a2-bcee-4b96-9e79-11642c29800c";
 />
   
   If kyuubi-hive-connecotor is used,
   
   there will be no dynamic partition pruning in the execution plan
   
   <img width="925" alt="Image" 
src="https://github.com/user-attachments/assets/ceffe5b5-10de-4861-b5da-8f86cbb17305";
 />
   
   ### Affects Version(s)
   
   1.9.0
   
   ### Kyuubi Server Log Output
   
   ```logtalk
   
   ```
   
   ### Kyuubi Engine Log Output
   
   ```logtalk
   
   ```
   
   ### Kyuubi Server Configurations
   
   ```yaml
   
   ```
   
   ### Kyuubi Engine Configurations
   
   ```yaml
   spark.plugins 
com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin
   spark.sql.catalog.cos8lcu4e_hive.hadoop.hive.metastore.token.signature 
thrift://xxx:7004,thrift://xxxx:7004
   spark.sql.catalog.cos8lcu4e_iceberg.hadoop.hive.metastore.token.signature 
thrift://xxxx:7004,thrift://xxxx:7004
   
spark.sql.catalog.default_catalog_hive_0o29qw56.hadoop.hive.metastore.token.signature
 thrift://xxx:40016
   
spark.sql.catalog.default_catalog_iceberg_0o29qw56.hadoop.hive.metastore.token.signature
 thrift://xxx:40016
   spark.sql.catalog.kyuubi_cos8lcu4e_hive 
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
   
spark.sql.catalog.kyuubi_cos8lcu4e_hive.hadoop.hive.metastore.token.signature 
thrift://xxx:7004,thrift://xxx:7004
   spark.sql.catalog.kyuubi_cos8lcu4e_hive.hive.metastore.kerberos.principal 
hadoop/_HOST@TBDS-0O29QW56
   spark.sql.catalog.kyuubi_cos8lcu4e_hive.hive.metastore.uris 
thrift://xxx:7004,thrift://xxxx:7004
   spark.sql.catalog.kyuubi_default_catalog_hive_0o29qw56 
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog
   
spark.sql.catalog.kyuubi_default_catalog_hive_0o29qw56.hadoop.hive.metastore.token.signature
 thrift://xxx:40016
   
spark.sql.catalog.kyuubi_default_catalog_hive_0o29qw56.hive.metastore.kerberos.principal
 hadoop/xxxx@xxx
   spark.sql.catalog.kyuubi_default_catalog_hive_0o29qw56.hive.metastore.uris 
thrift://xxx:xxx
   spark.sql.catalog.origin_cos8lcu4e_iceberg 
org.apache.iceberg.spark.SparkCatalog
   
spark.sql.catalog.origin_cos8lcu4e_iceberg.hadoop.hive.metastore.token.signature
 thrift://xxxx:7004,thrift://xxx:7004
   spark.sql.catalog.origin_cos8lcu4e_iceberg.type hive
   spark.sql.catalog.origin_cos8lcu4e_iceberg.uri 
thrift://xxx:7004,thrift://xxx:7004
   spark.sql.catalog.origin_default_catalog_iceberg_0o29qw56 
org.apache.iceberg.spark.SparkCatalog
   ```
   
   ### Additional context
   
   Dynamic partition cutting core logic:
   
   private def prune(plan: LogicalPlan): LogicalPlan = {
       plan transformUp {
         .......
               var filterableScan = **getFilterableTableScan**(l, left)
               if (filterableScan.isDefined && canPruneLeft(joinType) &&
                   hasPartitionPruningFilter(right)) {
                 newLeft = insertPredicate(l, newLeft, r, right, rightKeys, 
filterableScan.get)
               } else {
                 filterableScan = getFilterableTableScan(r, right)
                 if (filterableScan.isDefined && canPruneRight(joinType) &&
                     hasPartitionPruningFilter(left) ) {
                   newRight = insertPredicate(r, newRight, l, left, leftKeys, 
filterableScan.get)
                 }
               }
             case _ =>
           }
   .......
           Join(newLeft, newRight, joinType, Some(condition), hint)
       }
     }
   
   def getFilterableTableScan(a: Expression, plan: LogicalPlan): 
Option[LogicalPlan] = {
       val srcInfo: Option[(Expression, LogicalPlan)] = 
findExpressionAndTrackLineageDown(a, plan)
       srcInfo.flatMap {
         case (resExp, l: LogicalRelation) =>
           l.relation match {
             case fs: HadoopFsRelation =>
               val partitionColumns = AttributeSet(
                 l.resolve(fs.partitionSchema, 
fs.sparkSession.sessionState.analyzer.resolver))
               if (resExp.references.subsetOf(partitionColumns)) {
                 return Some(l)
               } else {
                 None
               }
             case _ => None
           }
         case (resExp, l: HiveTableRelation) =>
           if (resExp.references.subsetOf(AttributeSet(l.partitionCols))) {
             return Some(l)
           } else {
             None
           }
         case (resExp, **r @ DataSourceV2ScanRelation(_, scan: 
SupportsRuntimeV2Filtering, _, _, _)**) =>
           val filterAttrs = 
V2ExpressionUtils.resolveRefs[Attribute](scan.filterAttributes, r)
           if (resExp.references.subsetOf(AttributeSet(filterAttrs))) {
             Some(r)
           } else {
             None
           }
         case _ => None
       }
     }
   
   
![Image](https://github.com/user-attachments/assets/9cc1c78f-047a-430b-9364-8bd81c51525b)
   
   
![Image](https://github.com/user-attachments/assets/01391b94-9403-4bde-980a-eac004d4b14e)
   
   
![Image](https://github.com/user-attachments/assets/7f292e7e-b672-46c2-8755-924fb18d3d94)
   
   
![Image](https://github.com/user-attachments/assets/1d15ad0f-8233-4136-be71-7d89fb35467c)
   
   ### Are you willing to submit PR?
   
   - [ ] Yes. I would be willing to submit a PR with guidance from the Kyuubi 
community to fix.
   - [ ] No. I cannot submit a PR at this time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@kyuubi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@kyuubi.apache.org
For additional commands, e-mail: notifications-h...@kyuubi.apache.org

Reply via email to