[jira] [Commented] (SPARK-23614) Union produces incorrect results when caching is used

2018-05-18 Thread Morten Hornbech (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480454#comment-16480454
 ] 

Morten Hornbech commented on SPARK-23614:
-

In the example provided caching is required to produce the bug, and I'm pretty 
sure aggregation is required as well

> Union produces incorrect results when caching is used
> -
>
> Key: SPARK-23614
> URL: https://issues.apache.org/jira/browse/SPARK-23614
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Morten Hornbech
>Assignee: Liang-Chi Hsieh
>Priority: Major
>  Labels: correctness
> Fix For: 2.3.1, 2.4.0
>
>
> We just upgraded from 2.2 to 2.3 and our test suite caught this error:
> {code:java}
> case class TestData(x: Int, y: Int, z: Int)
> val frame = session.createDataset(Seq(TestData(1, 2, 3), TestData(4, 5, 
> 6))).cache()
> val group1 = frame.groupBy("x").agg(min(col("y")) as "value")
> val group2 = frame.groupBy("x").agg(min(col("z")) as "value")
> group1.union(group2).show()
> // +---+-+
> // | x|value|
> // +---+-+
> // | 1| 2|
> // | 4| 5|
> // | 1| 2|
> // | 4| 5|
> // +---+-+
> group2.union(group1).show()
> // +---+-+
> // | x|value|
> // +---+-+
> // | 1| 3|
> // | 4| 6|
> // | 1| 3|
> // | 4| 6|
> // +---+-+
> {code}
> The error disappears if the first data frame is not cached or if the two 
> group by's use separate copies. I'm not sure exactly what happens on the 
> insides of Spark, but errors that produce incorrect results rather than 
> exceptions always concerns me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23614) Union produces incorrect results when caching is used

2018-03-06 Thread Morten Hornbech (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Morten Hornbech updated SPARK-23614:

Description: 
We just upgraded from 2.2 to 2.3 and our test suite caught this error:
{code:java}
case class TestData(x: Int, y: Int, z: Int)

val frame = session.createDataset(Seq(TestData(1, 2, 3), TestData(4, 5, 
6))).cache()
val group1 = frame.groupBy("x").agg(min(col("y")) as "value")
val group2 = frame.groupBy("x").agg(min(col("z")) as "value")
group1.union(group2).show()
// +---+-+
// | x|value|
// +---+-+
// | 1| 2|
// | 4| 5|
// | 1| 2|
// | 4| 5|
// +---+-+
group2.union(group1).show()
// +---+-+
// | x|value|
// +---+-+
// | 1| 3|
// | 4| 6|
// | 1| 3|
// | 4| 6|
// +---+-+
{code}
The error disappears if the first data frame is not cached or if the two group 
by's use separate copies. I'm not sure exactly what happens on the insides of 
Spark, but errors that produce incorrect results rather than exceptions always 
concerns me.

  was:
We just upgraded from 2.2 to 2.3 and our test suite caught this error:

{code:java}
val frame = session.createDataset(Seq(TestData(1, 2, 3), TestData(4, 5, 
6))).cache()
val group1 = frame.groupBy("x").agg(min(col("y")) as "value")
val group2 = frame.groupBy("x").agg(min(col("z")) as "value")
group1.union(group2).show()
// +---+-+
// | x|value|
// +---+-+
// | 1| 2|
// | 4| 5|
// | 1| 2|
// | 4| 5|
// +---+-+
group2.union(group1).show()
// +---+-+
// | x|value|
// +---+-+
// | 1| 3|
// | 4| 6|
// | 1| 3|
// | 4| 6|
// +---+-+
{code}

The error disappears if the first data frame is not cached or if the two group 
by's use separate copies. I'm not sure exactly what happens on the insides of 
Spark, but errors that produce incorrect results rather than exceptions always 
concerns me.


> Union produces incorrect results when caching is used
> -
>
> Key: SPARK-23614
> URL: https://issues.apache.org/jira/browse/SPARK-23614
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Morten Hornbech
>Priority: Major
>
> We just upgraded from 2.2 to 2.3 and our test suite caught this error:
> {code:java}
> case class TestData(x: Int, y: Int, z: Int)
> val frame = session.createDataset(Seq(TestData(1, 2, 3), TestData(4, 5, 
> 6))).cache()
> val group1 = frame.groupBy("x").agg(min(col("y")) as "value")
> val group2 = frame.groupBy("x").agg(min(col("z")) as "value")
> group1.union(group2).show()
> // +---+-+
> // | x|value|
> // +---+-+
> // | 1| 2|
> // | 4| 5|
> // | 1| 2|
> // | 4| 5|
> // +---+-+
> group2.union(group1).show()
> // +---+-+
> // | x|value|
> // +---+-+
> // | 1| 3|
> // | 4| 6|
> // | 1| 3|
> // | 4| 6|
> // +---+-+
> {code}
> The error disappears if the first data frame is not cached or if the two 
> group by's use separate copies. I'm not sure exactly what happens on the 
> insides of Spark, but errors that produce incorrect results rather than 
> exceptions always concerns me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23614) Union produces incorrect results when caching is used

2018-03-06 Thread Morten Hornbech (JIRA)
Morten Hornbech created SPARK-23614:
---

 Summary: Union produces incorrect results when caching is used
 Key: SPARK-23614
 URL: https://issues.apache.org/jira/browse/SPARK-23614
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: Morten Hornbech


We just upgraded from 2.2 to 2.3 and our test suite caught this error:

{code:java}
val frame = session.createDataset(Seq(TestData(1, 2, 3), TestData(4, 5, 
6))).cache()
val group1 = frame.groupBy("x").agg(min(col("y")) as "value")
val group2 = frame.groupBy("x").agg(min(col("z")) as "value")
group1.union(group2).show()
// +---+-+
// | x|value|
// +---+-+
// | 1| 2|
// | 4| 5|
// | 1| 2|
// | 4| 5|
// +---+-+
group2.union(group1).show()
// +---+-+
// | x|value|
// +---+-+
// | 1| 3|
// | 4| 6|
// | 1| 3|
// | 4| 6|
// +---+-+
{code}

The error disappears if the first data frame is not cached or if the two group 
by's use separate copies. I'm not sure exactly what happens on the insides of 
Spark, but errors that produce incorrect results rather than exceptions always 
concerns me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22438) OutOfMemoryError on very small data sets

2017-11-03 Thread Morten Hornbech (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237553#comment-16237553
 ] 

Morten Hornbech commented on SPARK-22438:
-

I honestly can't see whether those are duplicates. I have seen these OOM errors 
occur in a lot of very different situations, and I am not an expert on Spark 
Core. Since the linked issue is resolved you could run my 10 lines of code on 
your 2.3.0 development build to know for sure.

> OutOfMemoryError on very small data sets
> 
>
> Key: SPARK-22438
> URL: https://issues.apache.org/jira/browse/SPARK-22438
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Morten Hornbech
>Priority: Critical
>
> We have a customer that uses Spark as an engine for running SQL on a 
> collection of small datasets, typically no greater than a few thousand rows. 
> Recently we started observing out-of-memory errors on some new workloads. 
> Even though the datasets were only a few kilobytes, the job would almost 
> immediately spike to > 10GB of memory usage, producing an out-of-memory error 
> on the modest hardware (2 CPUs, 16 RAM) that is used. Using larger hardware 
> and allocating more memory to Spark (4 CPUs, 32 RAM) made the job complete, 
> but still with an unreasonable high memory usage.
> The query involved was a left join on two datasets. In some, but not all, 
> cases we were able to remove or reduce the problem by rewriting the query to 
> use an exists sub-select instead. After a lot of debugging we were able to 
> reproduce the problem locally with the following test:
> {code:java}
> case class Data(value: String)
> val session = SparkSession.builder.master("local[1]").getOrCreate()
> import session.implicits._
> val foo = session.createDataset((1 to 500).map(i => Data(i.toString)))
> val bar = session.createDataset((1 to 1).map(i => Data(i.toString)))
> foo.persist(StorageLevel.MEMORY_ONLY)
> foo.createTempView("foo")
> bar.createTempView("bar")
> val result = session.sql("select * from bar left join foo on bar.value = 
> foo.value")
> result.coalesce(2).collect()
> {code}
> Running this produces the error below:
> {code:java}
> java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
>at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:127)
>at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:372)
>at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
>at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
>at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
>at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:774)
>at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.(SortMergeJoinExec.scala:649)
>at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:198)
>at 
> org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:136)
>at 
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>at 
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
>at 
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
>at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
>at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
>at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>at 

[jira] [Updated] (SPARK-22438) OutOfMemoryError on very small data sets

2017-11-03 Thread Morten Hornbech (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Morten Hornbech updated SPARK-22438:

Description: 
We have a customer that uses Spark as an engine for running SQL on a collection 
of small datasets, typically no greater than a few thousand rows. Recently we 
started observing out-of-memory errors on some new workloads. Even though the 
datasets were only a few kilobytes, the job would almost immediately spike to > 
10GB of memory usage, producing an out-of-memory error on the modest hardware 
(2 CPUs, 16 RAM) that is used. Using larger hardware and allocating more memory 
to Spark (4 CPUs, 32 RAM) made the job complete, but still with an unreasonable 
high memory usage.

The query involved was a left join on two datasets. In some, but not all, cases 
we were able to remove or reduce the problem by rewriting the query to use an 
exists sub-select instead. After a lot of debugging we were able to reproduce 
the problem locally with the following test:

{code:java}
case class Data(value: String)

val session = SparkSession.builder.master("local[1]").getOrCreate()
import session.implicits._

val foo = session.createDataset((1 to 500).map(i => Data(i.toString)))
val bar = session.createDataset((1 to 1).map(i => Data(i.toString)))

foo.persist(StorageLevel.MEMORY_ONLY)

foo.createTempView("foo")
bar.createTempView("bar")

val result = session.sql("select * from bar left join foo on bar.value = 
foo.value")
result.coalesce(2).collect()
{code}

Running this produces the error below:

{code:java}
java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
   at 
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:127)
   at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:372)
   at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
   at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
   at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
 Source)
   at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
   at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
   at 
org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:774)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.(SortMergeJoinExec.scala:649)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:198)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:136)
   at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
   at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
   at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   at org.apache.spark.scheduler.Task.run(Task.scala:108)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)
{code}

The exact failure point varies with the number of threads given to spark, the 
"coalesce" value and the number of rows in "foo". Using an inner join, removing 
the call to persist, removing the call to coalease (or using repartition) will 
all independently make the error go away.

The reason persist and 

[jira] [Created] (SPARK-22438) OutOfMemoryError on very small data sets

2017-11-03 Thread Morten Hornbech (JIRA)
Morten Hornbech created SPARK-22438:
---

 Summary: OutOfMemoryError on very small data sets
 Key: SPARK-22438
 URL: https://issues.apache.org/jira/browse/SPARK-22438
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: Morten Hornbech
Priority: Critical


We have a customer that uses Spark as an engine for running SQL on a collection 
of small datasets, typically no greater than a few thousand rows. Recently we 
started observing out-of-memory errors on some new workloads. Even though the 
datasets were only a few kilobytes, the job would almost immediately spike to > 
10GB of memory usage, producing an out-of-memory error on the modest hardware 
(2 CPUs, 16 RAM) that is used. Using larger hardware and allocating more memory 
to Spark (4 CPUs, 32 RAM) made the job complete, but still with an unreasonable 
high memory usage.

The query involved was a left join on two datasets. In some, but not all, cases 
we were able to remove or reduce the problem by rewriting the query to use an 
exists sub-select instead. After a lot of debugging we were able to reproduce 
the problem locally with the following test:

{{
case class Data(value: String)

val session = SparkSession.builder.master("local[1]").getOrCreate()
import session.implicits._

val foo = session.createDataset((1 to 500).map(i => Data(i.toString)))
val bar = session.createDataset((1 to 1).map(i => Data(i.toString)))

foo.persist(StorageLevel.MEMORY_ONLY)

foo.createTempView("foo")
bar.createTempView("bar")

val result = session.sql("select * from bar left join foo on bar.value = 
foo.value")
result.coalesce(2).collect()
}}

Running this produces the error below:

{{
java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
   at 
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:127)
   at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:372)
   at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
   at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
   at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
 Source)
   at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
   at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
   at 
org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:774)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.(SortMergeJoinExec.scala:649)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:198)
   at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:136)
   at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
   at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
   at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
   at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   at org.apache.spark.scheduler.Task.run(Task.scala:108)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)
}}

The exact failure point varies with the number of threads given to spark, the 
"coalesce" value and the number of rows 

[jira] [Commented] (SPARK-17875) Remove unneeded direct dependence on Netty 3.x

2017-05-19 Thread Morten Hornbech (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017059#comment-16017059
 ] 

Morten Hornbech commented on SPARK-17875:
-

Sorry, I have now. If the class files are indeed in the flume assembly my best 
guess is that this occurs because of binary compatibility issues between 4.0 
and 3.8 triggered by static members upon load of ChannelPipelineFactory. I can 
see that ChannelPipelineFactory does not exist in 4.0 but it references 
ChannelPipeline in its class definition which does. So if that was loaded from 
4.0 things could go wrong. If an upgrade of flume to netty 4.0 is a major task 
a simpler solution would be to shade netty 3.8 in the flume assembly. That way 
you should be able to get rid of it in spark-core.

> Remove unneeded direct dependence on Netty 3.x
> --
>
> Key: SPARK-17875
> URL: https://issues.apache.org/jira/browse/SPARK-17875
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 2.0.1
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Trivial
>
> The Spark build declares a dependency on Netty 3.x and 4.x, but only 4.x is 
> used. It's best to remove the 3.x dependency (and while we're at it, update a 
> few things like license info)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17875) Remove unneeded direct dependence on Netty 3.x

2017-05-19 Thread Morten Hornbech (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017001#comment-16017001
 ] 

Morten Hornbech commented on SPARK-17875:
-

We were just hit by a runtime error caused by this apparently obsolete 
dependency. More specifically the version of SslHandler used by netty 3.8 is 
not binary compatible with the one we use (and the one spark-core uses in netty 
4.0). 

We can get around this by shading our own dependency, but I think its a bit 
nasty having this floating around risking unnecessary runtime errors - 
dependency management is difficult enough as it is :-) Could we reopen the 
issue?

> Remove unneeded direct dependence on Netty 3.x
> --
>
> Key: SPARK-17875
> URL: https://issues.apache.org/jira/browse/SPARK-17875
> Project: Spark
>  Issue Type: Improvement
>  Components: Build
>Affects Versions: 2.0.1
>Reporter: Sean Owen
>Assignee: Sean Owen
>Priority: Trivial
>
> The Spark build declares a dependency on Netty 3.x and 4.x, but only 4.x is 
> used. It's best to remove the 3.x dependency (and while we're at it, update a 
> few things like license info)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14560) Cooperative Memory Management for Spillables

2017-03-27 Thread Morten Hornbech (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944094#comment-15944094
 ] 

Morten Hornbech commented on SPARK-14560:
-

No. We worried it might just trigger some other bad behaviour, and it wasn't a 
production issue.

> Cooperative Memory Management for Spillables
> 
>
> Key: SPARK-14560
> URL: https://issues.apache.org/jira/browse/SPARK-14560
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.1
>Reporter: Imran Rashid
>Assignee: Lianhui Wang
> Fix For: 2.0.0
>
>
> SPARK-10432 introduced cooperative memory management for SQL operators that 
> can spill; however, {{Spillable}} s used by the old RDD api still do not 
> cooperate.  This can lead to memory starvation, in particular on a 
> shuffle-to-shuffle stage, eventually resulting in errors like:
> {noformat}
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: Memory used in task 3081
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: Acquired by 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter@69ab0291: 32.0 KB
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: 1317230346 bytes of memory 
> were used by task 3081 but are not associated with specific consumers
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: 1317263114 bytes of memory 
> are used for execution and 1710484 bytes of memory are used for storage
> 16/03/28 08:59:54 ERROR executor.Executor: Managed memory leak detected; size 
> = 1317230346 bytes, TID = 3081
> 16/03/28 08:59:54 ERROR executor.Executor: Exception in task 533.0 in stage 
> 3.0 (TID 3081)
> java.lang.OutOfMemoryError: Unable to acquire 75 bytes of memory, got 0
> at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)
> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)
> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> This can happen anytime the shuffle read side requires more memory than what 
> is available for the task.  Since the shuffle-read side doubles its memory 
> request each time, it can easily end up acquiring all of the available 
> memory, even if it does not use it.  Eg., say that after the final spill, the 
> shuffle-read side requires 10 MB more memory, and there is 15 MB of memory 
> available.  But if it starts at 2 MB, it will double to 4, 8, and then 
> request 16 MB of memory, and in fact get all available 15 MB.  Since the 15 
> MB of memory is sufficient, it will not spill, and will continue holding on 
> to all available memory.  But this leaves *no* memory available for the 
> shuffle-write side.  Since the shuffle-write side cannot request the 
> shuffle-read side to free up memory, this leads to an OOM.
> The simple solution is to make {{Spillable}} implement {{MemoryConsumer}} as 
> well, so RDDs can benefit from the cooperative memory management introduced 
> by SPARK-10342.
> Note that an additional improvement would be for the shuffle-read side to 
> simple release unused memory, without spilling, in case that would leave 
> enough memory, and only spill if that was inadequate.  However that can come 
> as a later improvement.
> *Workaround*:  You can set 
> {{spark.shuffle.spill.numElementsForceSpillThreshold=N}} to force spilling to 
> occur every {{N}} elements, thus preventing the shuffle-read side from ever 
> grabbing all of the available memory.  However, this requires careful tuning 
> of {{N}} to specific workloads: too big, and you will still get an OOM; too 
> small, and there will be so much spilling that performance will suffer 
> drastically.  Furthermore, this workaround uses an *undocumented* 
> configuration with *no compatibility guarantees* for future versions of spark.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To 

[jira] [Commented] (SPARK-14560) Cooperative Memory Management for Spillables

2017-03-27 Thread Morten Hornbech (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944080#comment-15944080
 ] 

Morten Hornbech commented on SPARK-14560:
-

No, not really. We were able to work around it by rewriting the job, but it was 
never clear what made the difference.

> Cooperative Memory Management for Spillables
> 
>
> Key: SPARK-14560
> URL: https://issues.apache.org/jira/browse/SPARK-14560
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.1
>Reporter: Imran Rashid
>Assignee: Lianhui Wang
> Fix For: 2.0.0
>
>
> SPARK-10432 introduced cooperative memory management for SQL operators that 
> can spill; however, {{Spillable}} s used by the old RDD api still do not 
> cooperate.  This can lead to memory starvation, in particular on a 
> shuffle-to-shuffle stage, eventually resulting in errors like:
> {noformat}
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: Memory used in task 3081
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: Acquired by 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter@69ab0291: 32.0 KB
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: 1317230346 bytes of memory 
> were used by task 3081 but are not associated with specific consumers
> 16/03/28 08:59:54 INFO memory.TaskMemoryManager: 1317263114 bytes of memory 
> are used for execution and 1710484 bytes of memory are used for storage
> 16/03/28 08:59:54 ERROR executor.Executor: Managed memory leak detected; size 
> = 1317230346 bytes, TID = 3081
> 16/03/28 08:59:54 ERROR executor.Executor: Exception in task 533.0 in stage 
> 3.0 (TID 3081)
> java.lang.OutOfMemoryError: Unable to acquire 75 bytes of memory, got 0
> at 
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)
> at 
> org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)
> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> This can happen anytime the shuffle read side requires more memory than what 
> is available for the task.  Since the shuffle-read side doubles its memory 
> request each time, it can easily end up acquiring all of the available 
> memory, even if it does not use it.  Eg., say that after the final spill, the 
> shuffle-read side requires 10 MB more memory, and there is 15 MB of memory 
> available.  But if it starts at 2 MB, it will double to 4, 8, and then 
> request 16 MB of memory, and in fact get all available 15 MB.  Since the 15 
> MB of memory is sufficient, it will not spill, and will continue holding on 
> to all available memory.  But this leaves *no* memory available for the 
> shuffle-write side.  Since the shuffle-write side cannot request the 
> shuffle-read side to free up memory, this leads to an OOM.
> The simple solution is to make {{Spillable}} implement {{MemoryConsumer}} as 
> well, so RDDs can benefit from the cooperative memory management introduced 
> by SPARK-10342.
> Note that an additional improvement would be for the shuffle-read side to 
> simple release unused memory, without spilling, in case that would leave 
> enough memory, and only spill if that was inadequate.  However that can come 
> as a later improvement.
> *Workaround*:  You can set 
> {{spark.shuffle.spill.numElementsForceSpillThreshold=N}} to force spilling to 
> occur every {{N}} elements, thus preventing the shuffle-read side from ever 
> grabbing all of the available memory.  However, this requires careful tuning 
> of {{N}} to specific workloads: too big, and you will still get an OOM; too 
> small, and there will be so much spilling that performance will suffer 
> drastically.  Furthermore, this workaround uses an *undocumented* 
> configuration with *no compatibility guarantees* for future versions of spark.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (SPARK-14560) Cooperative Memory Management for Spillables

2017-01-15 Thread Morten Hornbech (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823296#comment-15823296
 ] 

Morten Hornbech commented on SPARK-14560:
-

I have also observed this error sporadically on Spark 2.0.2. Does the 
spark.shuffle.spill.numElementsForceSpillThreshold=N workaround work on 2.0.2? 
Any experience with robust values? 

Stack trace:

java.lang.OutOfMemoryError: Unable to acquire 36 bytes of memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:129)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:377)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:399)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:730)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinScanner.(SortMergeJoinExec.scala:605)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:162)
at 
org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:100)
at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:106)
at 
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:98)
at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:214)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

> Cooperative Memory Management for Spillables
> 
>
> Key: SPARK-14560
> URL: https://issues.apache.org/jira/browse/SPARK-14560
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.1
>Reporter: Imran Rashid
>Assignee: Lianhui Wang
> Fix For: 2.0.0
>
>
> SPARK-10432 introduced cooperative memory management for SQL operators that 
> can spill; however, {{Spillable}} s used by the old RDD api still do not 
> 

[jira] [Commented] (SPARK-16087) Spark Hangs When Using Union With Persisted Hadoop RDD

2016-11-11 Thread Morten Hornbech (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656768#comment-15656768
 ] 

Morten Hornbech commented on SPARK-16087:
-

Hmm. I can't see that I have that option. I can attach files and comment, but 
not edit anything.

> Spark Hangs When Using Union With Persisted Hadoop RDD
> --
>
> Key: SPARK-16087
> URL: https://issues.apache.org/jira/browse/SPARK-16087
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.4.1, 1.6.1, 2.0.1
>Reporter: Kevin Conaway
>Priority: Critical
> Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot 
> 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, 
> part-0, part-1, spark-16087.tar.gz
>
>
> Spark hangs when materializing a persisted RDD that was built from a Hadoop 
> sequence file and then union-ed with a similar RDD.
> Below is a small file that exhibits the issue:
> {code:java}
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> public class SparkBug {
> public static void main(String [] args) throws Exception {
> JavaSparkContext sc = new JavaSparkContext(
> new SparkConf()
> .set("spark.serializer", KryoSerializer.class.getName())
> .set("spark.master", "local[*]")
> .setAppName(SparkBug.class.getName())
> );
> JavaPairRDD rdd1 = sc.sequenceFile(
>"hdfs://localhost:9000/part-0",
> LongWritable.class,
> BytesWritable.class
> ).mapToPair(new PairFunction, 
> LongWritable, BytesWritable>() {
> @Override
> public Tuple2 
> call(Tuple2 tuple) throws Exception {
> return new Tuple2<>(
> new LongWritable(tuple._1.get()),
> new BytesWritable(tuple._2.copyBytes())
> );
> }
> }).persist(
> StorageLevel.MEMORY_ONLY()
> );
> System.out.println("Before union: " + rdd1.count());
> JavaPairRDD rdd2 = sc.sequenceFile(
> "hdfs://localhost:9000/part-1",
> LongWritable.class,
> BytesWritable.class
> );
> JavaPairRDD joined = rdd1.union(rdd2);
> System.out.println("After union: " + joined.count());
> }
> }
> {code}
> You'll need to upload the attached part-0 and part-1 to a local hdfs 
> instance (I'm just using a dummy [Single Node 
> Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html]
>  locally).
> Some things to note:
> - It does not hang if rdd1 is not persisted
> - It does not hang is rdd1 is not materialized (via calling rdd1.count()) 
> before the union-ed RDD is materialized
> - It does not hang if the mapToPair() transformation is removed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16087) Spark Hangs When Using Union With Persisted Hadoop RDD

2016-11-11 Thread Morten Hornbech (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15656752#comment-15656752
 ] 

Morten Hornbech commented on SPARK-16087:
-

Could someone update "Affects versions" to include 2.0.1 so the issue isn't 
binned by accident?

> Spark Hangs When Using Union With Persisted Hadoop RDD
> --
>
> Key: SPARK-16087
> URL: https://issues.apache.org/jira/browse/SPARK-16087
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.4.1, 1.6.1
>Reporter: Kevin Conaway
>Priority: Critical
> Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot 
> 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, 
> part-0, part-1, spark-16087.tar.gz
>
>
> Spark hangs when materializing a persisted RDD that was built from a Hadoop 
> sequence file and then union-ed with a similar RDD.
> Below is a small file that exhibits the issue:
> {code:java}
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> public class SparkBug {
> public static void main(String [] args) throws Exception {
> JavaSparkContext sc = new JavaSparkContext(
> new SparkConf()
> .set("spark.serializer", KryoSerializer.class.getName())
> .set("spark.master", "local[*]")
> .setAppName(SparkBug.class.getName())
> );
> JavaPairRDD rdd1 = sc.sequenceFile(
>"hdfs://localhost:9000/part-0",
> LongWritable.class,
> BytesWritable.class
> ).mapToPair(new PairFunction, 
> LongWritable, BytesWritable>() {
> @Override
> public Tuple2 
> call(Tuple2 tuple) throws Exception {
> return new Tuple2<>(
> new LongWritable(tuple._1.get()),
> new BytesWritable(tuple._2.copyBytes())
> );
> }
> }).persist(
> StorageLevel.MEMORY_ONLY()
> );
> System.out.println("Before union: " + rdd1.count());
> JavaPairRDD rdd2 = sc.sequenceFile(
> "hdfs://localhost:9000/part-1",
> LongWritable.class,
> BytesWritable.class
> );
> JavaPairRDD joined = rdd1.union(rdd2);
> System.out.println("After union: " + joined.count());
> }
> }
> {code}
> You'll need to upload the attached part-0 and part-1 to a local hdfs 
> instance (I'm just using a dummy [Single Node 
> Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html]
>  locally).
> Some things to note:
> - It does not hang if rdd1 is not persisted
> - It does not hang is rdd1 is not materialized (via calling rdd1.count()) 
> before the union-ed RDD is materialized
> - It does not hang if the mapToPair() transformation is removed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16087) Spark Hangs When Using Union With Persisted Hadoop RDD

2016-11-09 Thread Morten Hornbech (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650809#comment-15650809
 ] 

Morten Hornbech commented on SPARK-16087:
-

This is in 2.0.1 and the linked issue is marked as resolved in 2.0.1

> Spark Hangs When Using Union With Persisted Hadoop RDD
> --
>
> Key: SPARK-16087
> URL: https://issues.apache.org/jira/browse/SPARK-16087
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.4.1, 1.6.1
>Reporter: Kevin Conaway
>Priority: Critical
> Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot 
> 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, 
> part-0, part-1, spark-16087.tar.gz
>
>
> Spark hangs when materializing a persisted RDD that was built from a Hadoop 
> sequence file and then union-ed with a similar RDD.
> Below is a small file that exhibits the issue:
> {code:java}
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> public class SparkBug {
> public static void main(String [] args) throws Exception {
> JavaSparkContext sc = new JavaSparkContext(
> new SparkConf()
> .set("spark.serializer", KryoSerializer.class.getName())
> .set("spark.master", "local[*]")
> .setAppName(SparkBug.class.getName())
> );
> JavaPairRDD rdd1 = sc.sequenceFile(
>"hdfs://localhost:9000/part-0",
> LongWritable.class,
> BytesWritable.class
> ).mapToPair(new PairFunction, 
> LongWritable, BytesWritable>() {
> @Override
> public Tuple2 
> call(Tuple2 tuple) throws Exception {
> return new Tuple2<>(
> new LongWritable(tuple._1.get()),
> new BytesWritable(tuple._2.copyBytes())
> );
> }
> }).persist(
> StorageLevel.MEMORY_ONLY()
> );
> System.out.println("Before union: " + rdd1.count());
> JavaPairRDD rdd2 = sc.sequenceFile(
> "hdfs://localhost:9000/part-1",
> LongWritable.class,
> BytesWritable.class
> );
> JavaPairRDD joined = rdd1.union(rdd2);
> System.out.println("After union: " + joined.count());
> }
> }
> {code}
> You'll need to upload the attached part-0 and part-1 to a local hdfs 
> instance (I'm just using a dummy [Single Node 
> Cluster|http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html]
>  locally).
> Some things to note:
> - It does not hang if rdd1 is not persisted
> - It does not hang is rdd1 is not materialized (via calling rdd1.count()) 
> before the union-ed RDD is materialized
> - It does not hang if the mapToPair() transformation is removed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-16087) Spark Hangs When Using Union With Persisted Hadoop RDD

2016-11-09 Thread Morten Hornbech (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650788#comment-15650788
 ] 

Morten Hornbech edited comment on SPARK-16087 at 11/9/16 12:25 PM:
---

I can reproduce this issue on Spark 2.0.1 using cassandra connector. Double 
unions trigger a hang on collect - single unions are ok. Test below:

{code}
import com.datastax.driver.core.Cluster
import com.datastax.spark.connector._
import com.websudos.phantom.connectors.KeySpaceDef
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.FunSuite

case class TestData(id: String)

class ReproTest extends FunSuite {

  test("error reproduction scenario") {

// SETUP: Build key space and tables.
val keySpaceName = "test"
val tableA = "test_table_a"
val tableB = "test_table_b"
val tableC = "test_table_c"
val builder = Cluster.builder().addContactPoint("127.0.0.1").withPort(9142)
val keySpaceDef = new KeySpaceDef(
  keySpaceName,
  _ => builder,
  true,
  Some((ses, ks) => s"CREATE KEYSPACE IF NOT EXISTS $keySpaceName WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 1}"))

keySpaceDef.session.execute(s"DROP TABLE IF EXISTS 
${keySpaceName}.${tableA}")
keySpaceDef.session.execute(s"DROP TABLE IF EXISTS 
${keySpaceName}.${tableB}")
keySpaceDef.session.execute(s"DROP TABLE IF EXISTS 
${keySpaceName}.${tableC}")

// SETUP: Create spark session.
val config = new SparkConf()
  .setMaster("local[*]")
  .set("spark.cassandra.connection.port", "9142")
  .setAppName("test")
  .set("spark.cassandra.connection.host", "127.0.0.1")
  .set("spark.sql.warehouse.dir", "file:///C:/temp")

val session = SparkSession.builder().config(config).getOrCreate()

// SETUP: Create and persist data.
val data = List(TestData("Foo"))
val frame = 
session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))

frame.createCassandraTable(keySpaceName, tableA)
frame.createCassandraTable(keySpaceName, tableB)
frame.createCassandraTable(keySpaceName, tableC)

frame
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode(SaveMode.Append)
  .options(Map("table" -> tableA, "keyspace" -> keySpaceName))
  .save()

frame
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode(SaveMode.Append)
  .options(Map("table" -> tableB, "keyspace" -> keySpaceName))
  .save()

frame
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode(SaveMode.Append)
  .options(Map("table" -> tableC, "keyspace" -> keySpaceName))
  .save()

// TEST: Load and transform frames:
val loadedA = session.sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> tableA, "keyspace" -> keySpaceName))
  .load()
  .createOrReplaceTempView("A")

val loadedB = session.sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> tableB, "keyspace" -> keySpaceName))
  .load()
  .createOrReplaceTempView("B")

val loadedC = session.sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> tableC, "keyspace" -> keySpaceName))
  .load()
  .createOrReplaceTempView("C")

val rowsOK = session.sql("select id from A union select id from 
B").collect()
val rowsHang = session.sql("select id from A union select id from B union 
select id from C").collect()
  }
}
{code}



was (Author: mhornbech):
I can reproduce this issue on Spark 2.0.1 using cassandra connector. Double 
unions trigger a hang on collect - single unions are ok. Test below:

import com.datastax.driver.core.Cluster
import com.datastax.spark.connector._
import com.websudos.phantom.connectors.KeySpaceDef
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.FunSuite

case class TestData(id: String)

class ReproTest extends FunSuite {

  test("error reproduction scenario") {

// SETUP: Build key space and tables.
val keySpaceName = "test"
val tableA = "test_table_a"
val tableB = "test_table_b"
val tableC = "test_table_c"
val builder = Cluster.builder().addContactPoint("127.0.0.1").withPort(9142)
val keySpaceDef = new KeySpaceDef(
  keySpaceName,
  _ => builder,
  true,
  Some((ses, ks) => s"CREATE KEYSPACE IF NOT EXISTS $keySpaceName WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 1}"))

keySpaceDef.session.execute(s"DROP TABLE IF EXISTS 
${keySpaceName}.${tableA}")
keySpaceDef.session.execute(s"DROP TABLE IF EXISTS 
${keySpaceName}.${tableB}")
keySpaceDef.session.execute(s"DROP TABLE IF EXISTS 
${keySpaceName}.${tableC}")

// SETUP: Create spark session.
val config = 

[jira] [Commented] (SPARK-16087) Spark Hangs When Using Union With Persisted Hadoop RDD

2016-11-09 Thread Morten Hornbech (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650788#comment-15650788
 ] 

Morten Hornbech commented on SPARK-16087:
-

I can reproduce this issue on Spark 2.0.1 using cassandra connector. Double 
unions trigger a hang on collect - single unions are ok. Test below:

import com.datastax.driver.core.Cluster
import com.datastax.spark.connector._
import com.websudos.phantom.connectors.KeySpaceDef
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.FunSuite

case class TestData(id: String)

class ReproTest extends FunSuite {

  test("error reproduction scenario") {

// SETUP: Build key space and tables.
val keySpaceName = "test"
val tableA = "test_table_a"
val tableB = "test_table_b"
val tableC = "test_table_c"
val builder = Cluster.builder().addContactPoint("127.0.0.1").withPort(9142)
val keySpaceDef = new KeySpaceDef(
  keySpaceName,
  _ => builder,
  true,
  Some((ses, ks) => s"CREATE KEYSPACE IF NOT EXISTS $keySpaceName WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 1}"))

keySpaceDef.session.execute(s"DROP TABLE IF EXISTS 
${keySpaceName}.${tableA}")
keySpaceDef.session.execute(s"DROP TABLE IF EXISTS 
${keySpaceName}.${tableB}")
keySpaceDef.session.execute(s"DROP TABLE IF EXISTS 
${keySpaceName}.${tableC}")

// SETUP: Create spark session.
val config = new SparkConf()
  .setMaster("local[*]")
  .set("spark.cassandra.connection.port", "9142")
  .setAppName("test")
  .set("spark.cassandra.connection.host", "127.0.0.1")
  .set("spark.sql.warehouse.dir", "file:///C:/temp")

val session = SparkSession.builder().config(config).getOrCreate()

// SETUP: Create and persist data.
val data = List(TestData("Foo"))
val frame = 
session.sqlContext.createDataFrame(session.sparkContext.parallelize(data))

frame.createCassandraTable(keySpaceName, tableA)
frame.createCassandraTable(keySpaceName, tableB)
frame.createCassandraTable(keySpaceName, tableC)

frame
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode(SaveMode.Append)
  .options(Map("table" -> tableA, "keyspace" -> keySpaceName))
  .save()

frame
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode(SaveMode.Append)
  .options(Map("table" -> tableB, "keyspace" -> keySpaceName))
  .save()

frame
  .write
  .format("org.apache.spark.sql.cassandra")
  .mode(SaveMode.Append)
  .options(Map("table" -> tableC, "keyspace" -> keySpaceName))
  .save()

// TEST: Load and transform frames:
val loadedA = session.sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> tableA, "keyspace" -> keySpaceName))
  .load()
  .createOrReplaceTempView("A")

val loadedB = session.sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> tableB, "keyspace" -> keySpaceName))
  .load()
  .createOrReplaceTempView("B")

val loadedC = session.sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> tableC, "keyspace" -> keySpaceName))
  .load()
  .createOrReplaceTempView("C")

val rowsOK = session.sql("select id from A union select id from 
B").collect()
val rowsHang = session.sql("select id from A union select id from B union 
select id from C").collect()
  }
}



> Spark Hangs When Using Union With Persisted Hadoop RDD
> --
>
> Key: SPARK-16087
> URL: https://issues.apache.org/jira/browse/SPARK-16087
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.4.1, 1.6.1
>Reporter: Kevin Conaway
>Priority: Critical
> Attachments: SPARK-16087.dump.log, SPARK-16087.log, Screen Shot 
> 2016-06-21 at 4.27.26 PM.png, Screen Shot 2016-06-21 at 4.27.35 PM.png, 
> part-0, part-1, spark-16087.tar.gz
>
>
> Spark hangs when materializing a persisted RDD that was built from a Hadoop 
> sequence file and then union-ed with a similar RDD.
> Below is a small file that exhibits the issue:
> {code:java}
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.serializer.KryoSerializer;
> import org.apache.spark.storage.StorageLevel;
> import scala.Tuple2;
> public class SparkBug {
> public static void main(String [] args) throws Exception {
> JavaSparkContext sc = new JavaSparkContext(
> new SparkConf()
>