Hi,
I am seeing a broadcast failure when doing a join as follows:
Assume I have a dataframe df with ~80 million records
I do:
df2 = df.filter(cond)     # reduces to ~50 million records
grouped = broadcast(df.groupby(df2.colA).count())
total = df2.join(grouped, df2.colA == grouped.colA, "inner")
total.filter(total["count"] > 10).show()

This fails with an exception:
org.apache.spark.SparkException: Exception thrown in awaitResult:
                at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
                at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120)
                at 
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
                at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
                at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
                at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
                at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
                at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
                at 
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
                at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
                at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
                at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
                at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
                at 
org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:30)
                at 
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:62)
                at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStag 
eCodegenExec.scala:153)
                at 
org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:79)

However, if I do:
grouped.cache()
grouped.count()

before the join everything is fine (btw the grouped dataframe is 1.5MB when 
cached in memory and I have more than 4GB per executor with 8 executors, the 
full dataframe is ~8GB)

Thanks,
                Assaf.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/broadcast-fails-on-join-tp27623.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to