[jira] [Updated] (SPARK-28860) Using ColumnStats of join key to get TableAccessCardinality when finding star joins in ReorderJoinRule
[ https://issues.apache.org/jira/browse/SPARK-28860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lai Zhou updated SPARK-28860: - Description: Now the star-schema detection uses TableAccessCardinality to reorder DimTables when there is a selectiveStarJoin . [StarSchemaDetection.scala#L341|https://github.com/apache/spark/blob/98e1a4cea44d7cb2f6d502c0202ad3cac2a1ad8d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala#L341] {code:java} if (isSelectiveStarJoin(dimTables, conditions)) { val reorderDimTables = dimTables.map { plan => TableAccessCardinality(plan, getTableAccessCardinality(plan)) } .sortBy(_.size).map { case TableAccessCardinality(p1, _) => p1 }{code} But the getTableAccessCardinality method does't consider the ColumnStats of the equi-join-key. I'm not sure if we should compute Join cardinality for the dimTable based on it's join key here. [~ioana-delaney] was: Now the star-schema detection uses TableAccessCardinality to reorder DimTables when there is a selectiveStarJoin . [StarSchemaDetection.scala#L341|https://github.com/apache/spark/blob/98e1a4cea44d7cb2f6d502c0202ad3cac2a1ad8d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala#L341] {code:java} if (isSelectiveStarJoin(dimTables, conditions)) { val reorderDimTables = dimTables.map { plan => TableAccessCardinality(plan, getTableAccessCardinality(plan)) }.sortBy(_.size).map { case TableAccessCardinality(p1, _) => p1 }{code} But the getTableAccessCardinality method does't consider the ColumnStats of the equi-join-key. I'm not sure if we should compute Join cardinality for the dimTable based on it's join key here. [~ioana-delaney] > Using ColumnStats of join key to get TableAccessCardinality when finding > star joins in ReorderJoinRule > --- > > Key: SPARK-28860 > URL: https://issues.apache.org/jira/browse/SPARK-28860 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.3 >Reporter: Lai Zhou >Priority: Minor > > Now the star-schema detection uses TableAccessCardinality to reorder > DimTables when there is a selectiveStarJoin . > [StarSchemaDetection.scala#L341|https://github.com/apache/spark/blob/98e1a4cea44d7cb2f6d502c0202ad3cac2a1ad8d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala#L341] > {code:java} > if (isSelectiveStarJoin(dimTables, conditions)) { > val reorderDimTables = dimTables.map { > plan => TableAccessCardinality(plan, getTableAccessCardinality(plan)) } > .sortBy(_.size).map { > case TableAccessCardinality(p1, _) => p1 > }{code} > > But the getTableAccessCardinality method does't consider the ColumnStats of > the equi-join-key. I'm not sure if we should compute Join cardinality for the > dimTable based on it's join key here. > [~ioana-delaney] > > > > -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28860) Using ColumnStats of join key to get TableAccessCardinality when finding star joins in ReorderJoinRule
Lai Zhou created SPARK-28860: Summary: Using ColumnStats of join key to get TableAccessCardinality when finding star joins in ReorderJoinRule Key: SPARK-28860 URL: https://issues.apache.org/jira/browse/SPARK-28860 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.3 Reporter: Lai Zhou Now the star-schema detection uses TableAccessCardinality to reorder DimTables when there is a selectiveStarJoin . [StarSchemaDetection.scala#L341|https://github.com/apache/spark/blob/98e1a4cea44d7cb2f6d502c0202ad3cac2a1ad8d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala#L341] {code:java} if (isSelectiveStarJoin(dimTables, conditions)) { val reorderDimTables = dimTables.map { plan => TableAccessCardinality(plan, getTableAccessCardinality(plan)) }.sortBy(_.size).map { case TableAccessCardinality(p1, _) => p1 }{code} But the getTableAccessCardinality method does't consider the ColumnStats of the equi-join-key. I'm not sure if we should compute Join cardinality for the dimTable based on it's join key here. [~ioana-delaney] -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-13882) Remove org.apache.spark.sql.execution.local
[ https://issues.apache.org/jira/browse/SPARK-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862641#comment-16862641 ] Lai Zhou edited comment on SPARK-13882 at 6/13/19 8:07 AM: --- hi,[~rxin], is this iterator-based local mode will be re-introduced in the future ? I think a direct iterator-based local mode will be high-efficiency , that can help people to do real-time queries. was (Author: hhlai1990): hi,[~rxin], is this iterator-based local mode will be re-introduced in the future ? I think a direct iterator-based local mode will be high-efficiency , than can help people to do real-time queries. > Remove org.apache.spark.sql.execution.local > --- > > Key: SPARK-13882 > URL: https://issues.apache.org/jira/browse/SPARK-13882 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > Fix For: 2.0.0 > > > We introduced some local operators in org.apache.spark.sql.execution.local > package but never fully wired the engine to actually use these. We still plan > to implement a full local mode, but it's probably going to be fairly > different from what the current iterator-based local mode would look like. > Let's just remove them for now, and we can always re-introduced them in the > future by looking at branch-1.6. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13882) Remove org.apache.spark.sql.execution.local
[ https://issues.apache.org/jira/browse/SPARK-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16862641#comment-16862641 ] Lai Zhou commented on SPARK-13882: -- hi,[~rxin], is this iterator-based local mode will be re-introduced in the future ? I think a direct iterator-based local mode will be high-efficiency , than can help people to do real-time queries. > Remove org.apache.spark.sql.execution.local > --- > > Key: SPARK-13882 > URL: https://issues.apache.org/jira/browse/SPARK-13882 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > Fix For: 2.0.0 > > > We introduced some local operators in org.apache.spark.sql.execution.local > package but never fully wired the engine to actually use these. We still plan > to implement a full local mode, but it's probably going to be fairly > different from what the current iterator-based local mode would look like. > Let's just remove them for now, and we can always re-introduced them in the > future by looking at branch-1.6. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9983) Local physical operators for query execution
[ https://issues.apache.org/jira/browse/SPARK-9983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16860976#comment-16860976 ] Lai Zhou commented on SPARK-9983: - [~rxin], we now use Calcite to build a high performance hive sql engine , it's released as opensource now. see [https://github.com/51nb/marble] It works fine for real-time ML scene in our financial business. But I think it's not the best solution. I think adding a single-node version of DataFrame to spark may be the best solution, because spark sql has natural compatibility with Hive sql, and people can enjoy the benefits of the excellent optimizer, vectorized execution, code gen...etc . > Local physical operators for query execution > > > Key: SPARK-9983 > URL: https://issues.apache.org/jira/browse/SPARK-9983 > Project: Spark > Issue Type: Story > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > In distributed query execution, there are two kinds of operators: > (1) operators that exchange data between different executors or threads: > examples include broadcast, shuffle. > (2) operators that process data in a single thread: examples include project, > filter, group by, etc. > This ticket proposes clearly differentiating them and creating local > operators in Spark. This leads to a lot of benefits: easier to test, easier > to optimize data exchange, better design (single responsibility), and > potentially even having a hyper-optimized single-node version of DataFrame. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9983) Local physical operators for query execution
[ https://issues.apache.org/jira/browse/SPARK-9983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16860976#comment-16860976 ] Lai Zhou edited comment on SPARK-9983 at 6/11/19 12:18 PM: --- [~rxin], we now use Calcite to build a high performance hive sql engine , it's released as opensource now. see [https://github.com/51nb/marble] It works fine for real-time ML scene in our financial business. But I think it's not the best solution. Adding a single-node version of DataFrame to spark may be the best solution, because spark sql has natural compatibility with Hive sql, and people can enjoy the benefits of the excellent optimizer, vectorized execution, code gen...etc . was (Author: hhlai1990): [~rxin], we now use Calcite to build a high performance hive sql engine , it's released as opensource now. see [https://github.com/51nb/marble] It works fine for real-time ML scene in our financial business. But I think it's not the best solution. I think adding a single-node version of DataFrame to spark may be the best solution, because spark sql has natural compatibility with Hive sql, and people can enjoy the benefits of the excellent optimizer, vectorized execution, code gen...etc . > Local physical operators for query execution > > > Key: SPARK-9983 > URL: https://issues.apache.org/jira/browse/SPARK-9983 > Project: Spark > Issue Type: Story > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > In distributed query execution, there are two kinds of operators: > (1) operators that exchange data between different executors or threads: > examples include broadcast, shuffle. > (2) operators that process data in a single thread: examples include project, > filter, group by, etc. > This ticket proposes clearly differentiating them and creating local > operators in Spark. This leads to a lot of benefits: easier to test, easier > to optimize data exchange, better design (single responsibility), and > potentially even having a hyper-optimized single-node version of DataFrame. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9983) Local physical operators for query execution
[ https://issues.apache.org/jira/browse/SPARK-9983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16860976#comment-16860976 ] Lai Zhou edited comment on SPARK-9983 at 6/11/19 12:15 PM: --- [~rxin], we now use Calcite to build a high performance hive sql engine , it's released as opensource now. see [https://github.com/51nb/marble] It works fine for real-time ML scene in our financial business. But I think it's not the best solution. I think adding a single-node version of DataFrame to spark may be the best solution, because spark sql has natural compatibility with Hive sql, and people can enjoy the benefits of the excellent optimizer, vectorized execution, code gen...etc . was (Author: hhlai1990): [~rxin], we now use Calcite to build a high performance hive sql engine , it's released as opensource now. see [https://github.com/51nb/marble] It works fine for real-time ML scene in our financial business. But I think it's not the best solution. I think adding a single-node version of DataFrame to spark may be the best solution, because spark sql has natural compatibility with Hive sql, and people can enjoy the benefits of the excellent optimizer, vectorized execution, code gen...etc . > Local physical operators for query execution > > > Key: SPARK-9983 > URL: https://issues.apache.org/jira/browse/SPARK-9983 > Project: Spark > Issue Type: Story > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > In distributed query execution, there are two kinds of operators: > (1) operators that exchange data between different executors or threads: > examples include broadcast, shuffle. > (2) operators that process data in a single thread: examples include project, > filter, group by, etc. > This ticket proposes clearly differentiating them and creating local > operators in Spark. This leads to a lot of benefits: easier to test, easier > to optimize data exchange, better design (single responsibility), and > potentially even having a hyper-optimized single-node version of DataFrame. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9983) Local physical operators for query execution
[ https://issues.apache.org/jira/browse/SPARK-9983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16860959#comment-16860959 ] Lai Zhou edited comment on SPARK-9983 at 6/11/19 11:51 AM: --- [~rxin], `a hyper-optimized single-node version of DataFrame`, do you have any roadmap about it? In real world, we use spark sql to handle our ETL jobs on Hive. We may extract a lots of user's variables by complex sql queries, which will be the input for machine-learning models. But when we want to migrate the jobs to real-time system, we always need to interpret these sql queries by another programming language, which requires a lot of work. Now the local mode of spark sql is not a direct and high performance execution mode, I think it will make great sense to have a hyper-optimized single-node version of DataFrame. was (Author: hhlai1990): [~rxin], `a hyper-optimized single-node version of DataFrame`, do you have any roadmap about it? In real world, we use spark sql to handle our ETL jobs on Hive. We may extract a lots of user's variables by complex sql queries, which will be the input for machine-learning models. But when we want to migrate the jobs to real-time system, we always need to interpret these sql queries by another programming language, which requires a lot of work. Now the local mode of spark sql is not a direct and high performance execution mode, I think it will make great sense to have a high hyper-optimized single-node. > Local physical operators for query execution > > > Key: SPARK-9983 > URL: https://issues.apache.org/jira/browse/SPARK-9983 > Project: Spark > Issue Type: Story > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > In distributed query execution, there are two kinds of operators: > (1) operators that exchange data between different executors or threads: > examples include broadcast, shuffle. > (2) operators that process data in a single thread: examples include project, > filter, group by, etc. > This ticket proposes clearly differentiating them and creating local > operators in Spark. This leads to a lot of benefits: easier to test, easier > to optimize data exchange, better design (single responsibility), and > potentially even having a hyper-optimized single-node version of DataFrame. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9983) Local physical operators for query execution
[ https://issues.apache.org/jira/browse/SPARK-9983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16860959#comment-16860959 ] Lai Zhou commented on SPARK-9983: - [~rxin], `a hyper-optimized single-node version of DataFrame`, do you have any roadmap about it? In real world, we use spark sql to handle our ETL jobs on Hive. We may extract a lots of user's variables by complex sql queries, which will be the input for machine-learning models. But when we want to migrate the jobs to real-time system, we always need to interpret these sql queries by another programming language, which requires a lot of work. Now the local mode of spark sql is not a direct and high performance execution mode, I think it will make great sense to have a high hyper-optimized single-node. > Local physical operators for query execution > > > Key: SPARK-9983 > URL: https://issues.apache.org/jira/browse/SPARK-9983 > Project: Spark > Issue Type: Story > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > In distributed query execution, there are two kinds of operators: > (1) operators that exchange data between different executors or threads: > examples include broadcast, shuffle. > (2) operators that process data in a single thread: examples include project, > filter, group by, etc. > This ticket proposes clearly differentiating them and creating local > operators in Spark. This leads to a lot of benefits: easier to test, easier > to optimize data exchange, better design (single responsibility), and > potentially even having a hyper-optimized single-node version of DataFrame. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org