[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-11-19 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258487#comment-16258487
 ] 

Sean Owen commented on SPARK-19476:
---

Ok. These limitations are from your app though (no batching, high overhead per 
slot or something). This isn't a general problem to document.

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>Priority: Minor
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-11-19 Thread Gal Topper (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258485#comment-16258485
 ] 

Gal Topper commented on SPARK-19476:


The DB supports concurrent requests, but not batching. Meaning you can send 
1000 requests before any return, but you will need to send them separately. To 
do this synchronously, you'd need 1000 threads.

The limitation is that the iterator in foreachPartition() is effectively thread 
local. The minimal snippet in the description shows how to get a 
NullPointerException by accessing the iterator from another thread.

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>Priority: Minor
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-11-19 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258482#comment-16258482
 ] 

Sean Owen commented on SPARK-19476:
---

But if the DB doesn't like more than 1 concurrent request how do multiple 
threads help at all? Still not even clear on what you are saying is a limitation

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>Priority: Minor
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-11-19 Thread Gal Topper (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258472#comment-16258472
 ] 

Gal Topper commented on SPARK-19476:


> why not more partitions?
Because the overhead of 1 slot (or even just 1 thread) per in-flight request is 
rather high.

> Why not batch more requests to the DB?
Because it does not currently support batching for the type of operation in 
question.

Anyway, I've posted my workaround. Doesn't look like this limitation will 
change, so might as well close this and maybe document it somewhere.

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>Priority: Minor
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-11-19 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258467#comment-16258467
 ] 

Sean Owen commented on SPARK-19476:
---

Then it's just back to the question: why not more partitions? Why not batch 
more requests to the DB?

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>Priority: Minor
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-11-18 Thread Gal Topper (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258313#comment-16258313
 ] 

Gal Topper commented on SPARK-19476:


The issue with doing this synchronously is that, in my actual application 
(library, actually), I need to send a write event _per row_. Doing this 
synchronously results in an extremely slow ping pong with the database. Doing 
this asynchronously, it's no problem to send hundreds or even thousands of 
concurrent writes.

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>Priority: Minor
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-03-27 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944506#comment-15944506
 ] 

Sean Owen commented on SPARK-19476:
---

Why do that in a thread instead of doing that same work synchronously? It 
sounds like you even need to make it synchronous externally.

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>Priority: Minor
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-03-27 Thread Lucy Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944163#comment-15944163
 ] 

Lucy Yu commented on SPARK-19476:
-

bq. I don't think in general you're expected to be able to do this safely. Why 
would you do this asynchronously or with more partitions, simply?

Sorry, my simplified example had a mistake in it but a user ran into a 
NullPointerException in our actual code. In my simplified example the lambda 
function may return before the thread is complete, but the actual code enforces 
that the lambda function cannot return until the thread has finished. ie we have

{code}
df.foreachPartition(partition => {
...
numRowsAccumulator += ingestStrategy.loadPartition(targetNode, partition)
})
{code}

and loadPartition is defined 
https://github.com/memsql/memsql-spark-connector/blob/master/src/main/scala/com/memsql/spark/connector/LoadDataStrategy.scala#L18
 . Basically, the thread finishes once it has read all of the partition's data 
into a stream at which point it closes the stream, and 
stmt.executeUpdate(query.sql.toString) which is part of the function passed to 
foreachPartition waits until the stream is closed.

We do this to load the partition into a database in a constant-memory way -- by 
writing to a pipe and consuming from it at the same time. Without this approach 
they may run out of memory materializing the partition.

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>Priority: Minor
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-03-26 Thread Gal Topper (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942431#comment-15942431
 ] 

Gal Topper commented on SPARK-19476:


The example in the description does indeed fully materialize the iterator to 
very simply reproduce the issue. To be clear, the real code I'm running doesn't 
do that :-)! Instead, it pulls items from the iterator on demand. The 
workaround I described basically makes sure that only the original executor 
thread ever calls next() on the iterator, which is still done on demand, not 
all-at-once.

In my own experience, using threads works perfectly fine with the exception of 
this issue, and I've never read anything in the docs to discourage users from 
doing so. +1 for a note though, if that's really not something the authors 
intended.

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>Priority: Minor
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-03-26 Thread Gal Topper (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942419#comment-15942419
 ] 

Gal Topper commented on SPARK-19476:


I'm pretty sure we're talking about different things. My code running inside 
foreachPartition naturally doesn't need any Spark internals. It just takes the 
data and writes it to a database, and that writing process happens not to be 
single-threaded. It doesn't copy any data, and it works pretty well using the 
(far from trivial) workaround described and provided above.

If this thread local is too entrenched to feasibly fix the issue, I would at 
least suggest that this limitation be documented (e.g. "@param iterator may 
only be accessed by the original executor thread", and/or otherwise in the 
docs). That's what I'd do, anyway.

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-03-26 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942405#comment-15942405
 ] 

Sean Owen commented on SPARK-19476:
---

You don't need an executor per I/O call, you need one slot. I don't believe 
what you're doing is generally going to work; you're building around Spark's 
execution model. You will probably need to rewrite to restrict the async 
processing to something that doesn't need to access these Spark objects 
somehow, or do something like copy the data off the iterator if that's 
feasible, to avoid this access.

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-03-26 Thread Gal Topper (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942380#comment-15942380
 ] 

Gal Topper commented on SPARK-19476:


BTW, my workaround was quite painful: I created a single-threaded producer that 
used the original executor thread to feed the data into the async stream. 
Here's the code, in case anyone stumbles on this and is looking for a way out:

{code:title=SingleThreadedPublisher.scala|borderStyle=solid}
import org.reactivestreams.{Publisher, Subscriber, Subscription}

/** A reactive streams publisher to work around SPARK-19476 by only using 
Spark's iterator from the original thread. */
private class SingleThreadedPublisher[T] extends Publisher[T] {

  private var cancelled = false
  private var demand = 0L
  private var subscriber: Subscriber[_ >: T] = _
  private val waitForDemandObject = new Object

  override def subscribe(s: Subscriber[_ >: T]): Unit = {
this.subscriber = s
val subscription = new Subscription {
  override def cancel(): Unit = waitForDemandObject.synchronized {
cancelled = true
waitForDemandObject.notify()
  }

  override def request(n: Long): Unit = {
waitForDemandObject.synchronized {
  demand += n
  waitForDemandObject.notify()
}
  }
}
s.onSubscribe(subscription)
  }

  private def produce(element: T): Unit = {
demand -= 1
subscriber.onNext(element)
  }

  def push(iterator: Iterator[T]): Unit = {
iterator.takeWhile(_ => !cancelled).foreach { element =>
  waitForDemandObject.synchronized {
if (demand > 0L) {
  produce(element)
} else {
  waitForDemandObject.wait()
  produce(element)
}
  }
}
if (!cancelled) {
  subscriber.onComplete()
}
  }
}
{code}

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-03-26 Thread Gal Topper (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942326#comment-15942326
 ] 

Gal Topper commented on SPARK-19476:


In our case, we use Akka Streams to make many parallel, async I/O calls. To 
speed this up, we allow a large number of in-flight requests. It would not be 
feasible to have as many executors as we do in-flight requests, nor should the 
two be coupled IMO.

One solution that came to my mind is to simply assign 
TaskContext.get().taskMetrics() to a private val (see 
[here|https://github.com/apache/spark/blob/362ee93296a0de6342b4339e941e6a11f445c5b2/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala#L107])
 and use that later on when the iterator is exhausted (see 
[here|https://github.com/apache/spark/blob/362ee93296a0de6342b4339e941e6a11f445c5b2/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala#L419]).
 Generally speaking, iterators are not expected to be thread-safe, but also not 
to be tethered to any one specific thread, and I think the suggested change 
would comply with this standard contract.

I could make a small PR if the idea makes sense.

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-03-24 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941104#comment-15941104
 ] 

Sean Owen commented on SPARK-19476:
---

I don't think in general you're expected to be able to do this safely. Why 
would you do this asynchronously or with more partitions, simply?

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
> TungstenAggregationIterator uses a ThreadLocal variable that returns null 
> when called from a thread other than the original thread that got the 
> iterator from Spark. From examining the code, this does not appear to differ 
> between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not 
> documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

2017-03-24 Thread Lucy Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941079#comment-15941079
 ] 

Lucy Yu commented on SPARK-19476:
-

I believe we've seen a similar issue raised here:
https://github.com/memsql/memsql-spark-connector/issues/31

It seems that

{code}
sqlDf.foreachPartition(partition => {
  new Thread(new Runnable {
override def run(): Unit = {
  for (row <- partition) {
// do nothing here, just force the partition to be fully iterated over
  }
}
  }).start()
})
{code}

results in

{code}
java.lang.NullPointerException
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortedIterator.loadNext(UnsafeInMemorySorter.java:287)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:573)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$2.loadNext(UnsafeSorterSpillMerger.java:86)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:161)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:148)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(UnknownSource)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at 
org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.fetchNextRow(WindowExec.scala:301)
at 
org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.fetchNextPartition(WindowExec.scala:361)
at 
org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:391)
at 
org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:290)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(UnknownSource)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
com.cainc.data.etl.lake.TestMartOutput$$anonfun$main$3$$anon$1.run(TestMartOutput.scala:42)
at java.lang.Thread.run(Thread.java:745)
{code}

> Running threads in Spark DataFrame foreachPartition() causes 
> NullPointerException
> -
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Gal Topper
>
> First reported on [Stack 
> overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me 
> except for when the underlying iterator is TungstenAggregationIterator. Here 
> is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
>   val sc = new SparkContext("local", "reproduce")
>   val sqlContext = new SQLContext(sc)
>   import sqlContext.implicits._
>   val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>   df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
>   }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - 
>