Hi, I am seeing a broadcast failure when doing a join as follows: Assume I have a dataframe df with ~80 million records I do: df2 = df.filter(cond) # reduces to ~50 million records grouped = broadcast(df.groupby(df2.colA).count()) total = df2.join(grouped, df2.colA == grouped.colA, "inner") total.filter(total["count"] > 10).show()
This fails with an exception: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120) at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153) at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:30) at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:62) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStag eCodegenExec.scala:153) at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:79) However, if I do: grouped.cache() grouped.count() before the join everything is fine (btw the grouped dataframe is 1.5MB when cached in memory and I have more than 4GB per executor with 8 executors, the full dataframe is ~8GB) Thanks, Assaf. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/broadcast-fails-on-join-tp27623.html Sent from the Apache Spark User List mailing list archive at Nabble.com.