[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17565392#comment-17565392 ] Victor Delépine commented on SPARK-19609: - Hey folks. Given that this issue was bulk-closed a while ago but still exists, I've taken the liberty of opening a new ticket for it, to make sure it can be triaged again and hopefully fixed :) Here's the new one https://issues.apache.org/jira/browse/SPARK-39753 > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-19609 > URL: https://issues.apache.org/jira/browse/SPARK-19609 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk >Priority: Major > Labels: bulk-closed > > For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. > An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. > This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17542881#comment-17542881 ] Victor Delépine commented on SPARK-19609: - [~yumwang] happy to help, but could you clarify what you would like to see in the benchmark? > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-19609 > URL: https://issues.apache.org/jira/browse/SPARK-19609 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk >Priority: Major > Labels: bulk-closed > > For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. > An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. > This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539884#comment-17539884 ] Yuming Wang commented on SPARK-19609: - [~devict] Do you have benchmark number if pushdown join constraints as Filter? > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-19609 > URL: https://issues.apache.org/jira/browse/SPARK-19609 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk >Priority: Major > Labels: bulk-closed > > For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. > An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. > This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539527#comment-17539527 ] Victor Delépine commented on SPARK-19609: - Hey [~hyukjin.kwon], I can confirm that this is still an issue in Spar 3.2.1, as my team just ran into this doing a stream/static broadcast join. Would it make sense to reopen this ticket or create a new one, so that it's on the radar? This could have a pretty large impact on a lot of pipelines. And thanks [~jwesteen] for the workaround! That helped us a lot > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-19609 > URL: https://issues.apache.org/jira/browse/SPARK-19609 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk >Priority: Major > Labels: bulk-closed > > For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. > An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. > This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results. -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233785#comment-17233785 ] Jackson Westeen commented on SPARK-19609: - For what it's worth I think we're running into this in Spark 3.0 right now. Inner join column between two Dataframe's when used with the broadcast() hint is not getting pushed down to the datasource. Manually collecting the results of the smaller DF .select(joinColumn) back to the driver and using with .isInCollection seems to be an effective workaround, but it would be nice if this wasn't necessary. > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-19609 > URL: https://issues.apache.org/jira/browse/SPARK-19609 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk >Priority: Major > Labels: bulk-closed > > For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. > An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. > This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947019#comment-16947019 ] Hyukjin Kwon commented on SPARK-19609: -- It was resolved as incomplete as it indicates EOL release and has been inactive more than a year, as discussed in dev mailing list. Please reopen if you can verify it is still and issue in Spark 2.4+ > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-19609 > URL: https://issues.apache.org/jira/browse/SPARK-19609 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk >Priority: Major > Labels: bulk-closed > > For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. > An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. > This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947001#comment-16947001 ] Nick Dimiduk commented on SPARK-19609: -- Hi [~hyukjin.kwon], mind adding a comment as to why this issue was closed? Has the functionality been implemented elsewhere? How about a link off to the relevant JIRA so I know what fix version to look for? Thanks! > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-19609 > URL: https://issues.apache.org/jira/browse/SPARK-19609 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk >Priority: Major > Labels: bulk-closed > > For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. > An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. > This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511746#comment-16511746 ] David McLennan commented on SPARK-19609: This feature would be extremely useful in making external data lookups more efficient. For example, you have a stream of data coming in with a window of 10,000 messages. You need to join each message with reference data on external services to enrich it (for example accounts and products). Today, you would either have to pull the entire external data sources into the executors (expensive on all sides - even the small datasets are many 10's of gigabyres), or lookup the external datasets key by key on a per message basis, which is very chatty from a communication perspective. If this feature is implemented, it could reduce the amount of data transfer significantly, if the cardinality of the join keys is low (i.e. you might have 10,000 messages, but they reference only 15 unique accounts and 50 unique products.) It would also relieve the author of the burden of having to implement something which does this themselves - they could just register the dataframes, run a sql context ontop of it, and go. > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-19609 > URL: https://issues.apache.org/jira/browse/SPARK-19609 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk >Priority: Major > > For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. > An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. > This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16156341#comment-16156341 ] ivorzhou commented on SPARK-19609: -- It is very important issue, is there any plan to add this feature? > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-19609 > URL: https://issues.apache.org/jira/browse/SPARK-19609 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk > > For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. > An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. > This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020287#comment-16020287 ] Jayesh lalwani commented on SPARK-19609: Just like to point out that this would help applications that are using the Structured Streaming capabilities of Spark immensely, especially when you are talking about implementing applications that [~matei] refers to as "Continuous applications" One of the selling points of Structured Streaming is that you can join streaming data with data from batch sources, like databases. In these cases, it is frequently required that streaming data be joined with database tables that contain millions(or even billions) of rows. This meets the exact profile of data that this ticket describes: A smaller set of data being joined with a larger set The problem is that without this change, there is no good way of joining a streaming data frame with a large database table. You can either a) use spark.read.jdbc to read the huge table into a data frame, and then join it with a streaming data frame OR b) call df.map/df.mapWithPartition on your streaming data frame, and run a database query yourself The problem right now with a) is that the huge table is read at every execution of a micro batch. In fact, if your spark application has multiple sinks, it will run it once for every sink, because Spark doesn't have a good mechanism for specifying data frames that need to be cached for the micro batch. The problem with b) is that it adds complexity into the application level code. And you are really leaving the Dataframe abstraction when you are calling the map function. For example, what if your SQL query was getting 1000x records. Suddenly, you have millions of records being created in each partition, and you have to tell Spark to repartition the output, which incurs cost. Ideally, we would like to use spark.read.jdbc followed by a join with the streaming dataframe, and have Spark push down the join predicates to the database. This will result in less data being pulled into Spark in every micro batch. > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-19609 > URL: https://issues.apache.org/jira/browse/SPARK-19609 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk > > For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. > An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. > This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872797#comment-15872797 ] gagan taneja commented on SPARK-19609: -- This can be further extended to join on the column that are also partitioned column For example Table Address is partitioned based on postal_code Table Location which contain location_name and potal_code Query SELECT * FROM address JOIN location ON postal_code WHERE location_name = 'San Jose' If the query is re-written as a in clause the optimizer will be able to prune the partitions which would be significantly faster > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-19609 > URL: https://issues.apache.org/jira/browse/SPARK-19609 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk > > For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. > An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. > This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org