[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-03-27 Thread Lucy Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944163#comment-15944163
 ] 

Lucy Yu commented on SPARK-19476:
-

bq. I don't think in general you're expected to be able to do this safely. Why 
would you do this asynchronously or with more partitions, simply?

Sorry, my simplified example had a mistake in it but a user ran into a 
NullPointerException in our actual code. In my simplified example the lambda 
function may return before the thread is complete, but the actual code enforces 
that the lambda function cannot return until the thread has finished. ie we have

{code}
df.foreachPartition(partition => {
...
numRowsAccumulator += ingestStrategy.loadPartition(targetNode, partition)
})
{code}

and loadPartition is defined 
https://github.com/memsql/memsql-spark-connector/blob/master/src/main/scala/com/memsql/spark/connector/LoadDataStrategy.scala#L18
 . Basically, the thread finishes once it has read all of the partition's data 
into a stream at which point it closes the stream, and 
stmt.executeUpdate(query.sql.toString) which is part of the function passed to 
foreachPartition waits until the stream is closed.

We do this to load the partition into a database in a constant-memory way -- by 
writing to a pipe and consuming from it at the same time. Without this approach 
they may run out of memory materializing the partition.

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>Priority: Minor
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-03-24 Thread Lucy Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941079#comment-15941079
 ] 

Lucy Yu commented on SPARK-19476:
-

I believe we've seen a similar issue raised here:
https://github.com/memsql/memsql-spark-connector/issues/31

It seems that

{code}
sqlDf.foreachPartition(partition => {
  new Thread(new Runnable {
override def run(): Unit = {
  for (row <- partition) {
// do nothing here, just force the partition to be fully iterated over
  }
}
  }).start()
})
{code}

results in

{code}
java.lang.NullPointerException
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortedIterator.loadNext(UnsafeInMemorySorter.java:287)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:573)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$2.loadNext(UnsafeSorterSpillMerger.java:86)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:161)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:148)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(UnknownSource)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at 
org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.fetchNextRow(WindowExec.scala:301)
at 
org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.fetchNextPartition(WindowExec.scala:361)
at 
org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:391)
at 
org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:290)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(UnknownSource)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
com.cainc.data.etl.lake.TestMartOutput$$anonfun$main$3$$anon$1.run(TestMartOutput.scala:42)
at java.lang.Thread.run(Thread.java:745)
{code}

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
>