Github user dongjoon-hyun commented on a diff in the pull request:
https://github.com/apache/spark/pull/22198#discussion_r213193232
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
---
@@ -47,20 +49,47 @@ object ResolveHints {
*
* This rule must happen before common table expressions.
*/
- class ResolveBroadcastHints(conf: SQLConf) extends Rule[LogicalPlan] {
+ class ResolveBroadcastHints(conf: SQLConf, catalog: SessionCatalog)
extends Rule[LogicalPlan] {
private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN",
"MAPJOIN")
def resolver: Resolver = conf.resolver
- private def applyBroadcastHint(plan: LogicalPlan, toBroadcast:
Set[String]): LogicalPlan = {
+ private def namePartsWithDatabase(nameParts: Seq[String], database:
String): Seq[String] = {
+ if (nameParts.size == 1) {
+ database +: nameParts
+ } else {
+ nameParts
+ }
+ }
+
+ private def matchedTableIdentifier(
+ nameParts: Seq[String],
+ tableIdent: IdentifierWithDatabase): Boolean = {
+ tableIdent.database match {
+ case Some(db) if resolver(catalog.globalTempViewManager.database,
db) =>
+ val identifierList = db :: tableIdent.identifier :: Nil
+ namePartsWithDatabase(nameParts,
catalog.globalTempViewManager.database)
+ .corresponds(identifierList)(resolver)
+ case _ =>
+ val db =
tableIdent.database.getOrElse(catalog.getCurrentDatabase)
+ val identifierList = db :: tableIdent.identifier :: Nil
+ namePartsWithDatabase(nameParts, catalog.getCurrentDatabase)
--- End diff --
This part will break `temporary view` case. In the following case, no table
should be broadcasted. Also, could you add more test cases? We need to test
`table`, `global temporary view`, `temporary view`, and `view`. We may miss
some cases until now like the following.
```scala
scala> :paste
// Entering paste mode (ctrl-D to finish)
sql("set spark.sql.autoBroadcastJoinThreshold=-1")
spark.range(10).write.mode("overwrite").saveAsTable("t")
sql("create temporary view tv as select * from t")
sql("select /*+ mapjoin(default.tv) */ * from t, tv where t.id =
tv.id").explain
sql("select * from default.tv")
// Exiting paste mode, now interpreting.
== Physical Plan ==
*(2) BroadcastHashJoin [id#7L], [id#12L], Inner, BuildRight
:- *(2) Project [id#7L]
: +- *(2) Filter isnotnull(id#7L)
: +- *(2) FileScan parquet default.t[id#7L] Batched: true, Format:
Parquet, Location:
InMemoryFileIndex[file:/Users/dongjoon/PR-22198/spark-warehouse/t],
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
struct<id:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint,
true]))
+- *(1) Project [id#12L]
+- *(1) Filter isnotnull(id#12L)
+- *(1) FileScan parquet default.t[id#12L] Batched: true, Format:
Parquet, Location:
InMemoryFileIndex[file:/Users/dongjoon/PR-22198/spark-warehouse/t],
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
struct<id:bigint>
org.apache.spark.sql.AnalysisException: Table or view not found:
`default`.`tv`; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation `default`.`tv`
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]