[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258487#comment-16258487 ] Sean Owen commented on SPARK-19476: --- Ok. These limitations are from your app though (no batching, high overhead per slot or something). This isn't a general problem to document. > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper >Priority: Minor > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258485#comment-16258485 ] Gal Topper commented on SPARK-19476: The DB supports concurrent requests, but not batching. Meaning you can send 1000 requests before any return, but you will need to send them separately. To do this synchronously, you'd need 1000 threads. The limitation is that the iterator in foreachPartition() is effectively thread local. The minimal snippet in the description shows how to get a NullPointerException by accessing the iterator from another thread. > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper >Priority: Minor > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258482#comment-16258482 ] Sean Owen commented on SPARK-19476: --- But if the DB doesn't like more than 1 concurrent request how do multiple threads help at all? Still not even clear on what you are saying is a limitation > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper >Priority: Minor > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258472#comment-16258472 ] Gal Topper commented on SPARK-19476: > why not more partitions? Because the overhead of 1 slot (or even just 1 thread) per in-flight request is rather high. > Why not batch more requests to the DB? Because it does not currently support batching for the type of operation in question. Anyway, I've posted my workaround. Doesn't look like this limitation will change, so might as well close this and maybe document it somewhere. > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper >Priority: Minor > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258467#comment-16258467 ] Sean Owen commented on SPARK-19476: --- Then it's just back to the question: why not more partitions? Why not batch more requests to the DB? > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper >Priority: Minor > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258313#comment-16258313 ] Gal Topper commented on SPARK-19476: The issue with doing this synchronously is that, in my actual application (library, actually), I need to send a write event _per row_. Doing this synchronously results in an extremely slow ping pong with the database. Doing this asynchronously, it's no problem to send hundreds or even thousands of concurrent writes. > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper >Priority: Minor > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944506#comment-15944506 ] Sean Owen commented on SPARK-19476: --- Why do that in a thread instead of doing that same work synchronously? It sounds like you even need to make it synchronous externally. > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper >Priority: Minor > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944163#comment-15944163 ] Lucy Yu commented on SPARK-19476: - bq. I don't think in general you're expected to be able to do this safely. Why would you do this asynchronously or with more partitions, simply? Sorry, my simplified example had a mistake in it but a user ran into a NullPointerException in our actual code. In my simplified example the lambda function may return before the thread is complete, but the actual code enforces that the lambda function cannot return until the thread has finished. ie we have {code} df.foreachPartition(partition => { ... numRowsAccumulator += ingestStrategy.loadPartition(targetNode, partition) }) {code} and loadPartition is defined https://github.com/memsql/memsql-spark-connector/blob/master/src/main/scala/com/memsql/spark/connector/LoadDataStrategy.scala#L18 . Basically, the thread finishes once it has read all of the partition's data into a stream at which point it closes the stream, and stmt.executeUpdate(query.sql.toString) which is part of the function passed to foreachPartition waits until the stream is closed. We do this to load the partition into a database in a constant-memory way -- by writing to a pipe and consuming from it at the same time. Without this approach they may run out of memory materializing the partition. > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper >Priority: Minor > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942431#comment-15942431 ] Gal Topper commented on SPARK-19476: The example in the description does indeed fully materialize the iterator to very simply reproduce the issue. To be clear, the real code I'm running doesn't do that :-)! Instead, it pulls items from the iterator on demand. The workaround I described basically makes sure that only the original executor thread ever calls next() on the iterator, which is still done on demand, not all-at-once. In my own experience, using threads works perfectly fine with the exception of this issue, and I've never read anything in the docs to discourage users from doing so. +1 for a note though, if that's really not something the authors intended. > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper >Priority: Minor > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942419#comment-15942419 ] Gal Topper commented on SPARK-19476: I'm pretty sure we're talking about different things. My code running inside foreachPartition naturally doesn't need any Spark internals. It just takes the data and writes it to a database, and that writing process happens not to be single-threaded. It doesn't copy any data, and it works pretty well using the (far from trivial) workaround described and provided above. If this thread local is too entrenched to feasibly fix the issue, I would at least suggest that this limitation be documented (e.g. "@param iterator may only be accessed by the original executor thread", and/or otherwise in the docs). That's what I'd do, anyway. > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942405#comment-15942405 ] Sean Owen commented on SPARK-19476: --- You don't need an executor per I/O call, you need one slot. I don't believe what you're doing is generally going to work; you're building around Spark's execution model. You will probably need to rewrite to restrict the async processing to something that doesn't need to access these Spark objects somehow, or do something like copy the data off the iterator if that's feasible, to avoid this access. > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942380#comment-15942380 ] Gal Topper commented on SPARK-19476: BTW, my workaround was quite painful: I created a single-threaded producer that used the original executor thread to feed the data into the async stream. Here's the code, in case anyone stumbles on this and is looking for a way out: {code:title=SingleThreadedPublisher.scala|borderStyle=solid} import org.reactivestreams.{Publisher, Subscriber, Subscription} /** A reactive streams publisher to work around SPARK-19476 by only using Spark's iterator from the original thread. */ private class SingleThreadedPublisher[T] extends Publisher[T] { private var cancelled = false private var demand = 0L private var subscriber: Subscriber[_ >: T] = _ private val waitForDemandObject = new Object override def subscribe(s: Subscriber[_ >: T]): Unit = { this.subscriber = s val subscription = new Subscription { override def cancel(): Unit = waitForDemandObject.synchronized { cancelled = true waitForDemandObject.notify() } override def request(n: Long): Unit = { waitForDemandObject.synchronized { demand += n waitForDemandObject.notify() } } } s.onSubscribe(subscription) } private def produce(element: T): Unit = { demand -= 1 subscriber.onNext(element) } def push(iterator: Iterator[T]): Unit = { iterator.takeWhile(_ => !cancelled).foreach { element => waitForDemandObject.synchronized { if (demand > 0L) { produce(element) } else { waitForDemandObject.wait() produce(element) } } } if (!cancelled) { subscriber.onComplete() } } } {code} > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942326#comment-15942326 ] Gal Topper commented on SPARK-19476: In our case, we use Akka Streams to make many parallel, async I/O calls. To speed this up, we allow a large number of in-flight requests. It would not be feasible to have as many executors as we do in-flight requests, nor should the two be coupled IMO. One solution that came to my mind is to simply assign TaskContext.get().taskMetrics() to a private val (see [here|https://github.com/apache/spark/blob/362ee93296a0de6342b4339e941e6a11f445c5b2/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala#L107]) and use that later on when the iterator is exhausted (see [here|https://github.com/apache/spark/blob/362ee93296a0de6342b4339e941e6a11f445c5b2/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala#L419]). Generally speaking, iterators are not expected to be thread-safe, but also not to be tethered to any one specific thread, and I think the suggested change would comply with this standard contract. I could make a small PR if the idea makes sense. > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941104#comment-15941104 ] Sean Owen commented on SPARK-19476: --- I don't think in general you're expected to be able to do this safely. Why would you do this asynchronously or with more partitions, simply? > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - > TungstenAggregationIterator uses a ThreadLocal variable that returns null > when called from a thread other than the original thread that got the > iterator from Spark. From examining the code, this does not appear to differ > between recent Spark versions. > However, this limitation is specific to TungstenAggregationIterator, and not > documented, as far as I'm aware. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941079#comment-15941079 ] Lucy Yu commented on SPARK-19476: - I believe we've seen a similar issue raised here: https://github.com/memsql/memsql-spark-connector/issues/31 It seems that {code} sqlDf.foreachPartition(partition => { new Thread(new Runnable { override def run(): Unit = { for (row <- partition) { // do nothing here, just force the partition to be fully iterated over } } }).start() }) {code} results in {code} java.lang.NullPointerException at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortedIterator.loadNext(UnsafeInMemorySorter.java:287) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:573) at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$2.loadNext(UnsafeSorterSpillMerger.java:86) at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:161) at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:148) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(UnknownSource) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.fetchNextRow(WindowExec.scala:301) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.fetchNextPartition(WindowExec.scala:361) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:391) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:290) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(UnknownSource) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at com.cainc.data.etl.lake.TestMartOutput$$anonfun$main$3$$anon$1.run(TestMartOutput.scala:42) at java.lang.Thread.run(Thread.java:745) {code} > Running threads in Spark DataFrame foreachPartition() causes > NullPointerException > - > > Key: SPARK-19476 > URL: https://issues.apache.org/jira/browse/SPARK-19476 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Gal Topper > > First reported on [Stack > overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition]. > I use multiple threads inside foreachPartition(), which works great for me > except for when the underlying iterator is TungstenAggregationIterator. Here > is a minimal code snippet to reproduce: > {code:title=Reproduce.scala|borderStyle=solid} > import scala.concurrent.ExecutionContext.Implicits.global > import scala.concurrent.duration.Duration > import scala.concurrent.{Await, Future} > import org.apache.spark.SparkContext > import org.apache.spark.sql.SQLContext > object Reproduce extends App { > val sc = new SparkContext("local", "reproduce") > val sqlContext = new SQLContext(sc) > import sqlContext.implicits._ > val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count() > df.foreachPartition { iterator => > val f = Future(iterator.toVector) > Await.result(f, Duration.Inf) > } > } > {code} > When I run this, I get: > {noformat} > java.lang.NullPointerException > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > {noformat} > I believe I actually understand why this happens - >