[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-10-14 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951165#comment-16951165
 ] 

Min Shen commented on SPARK-21492:
--

Want to further clarify the scope of the fix in PR 
[https://github.com/apache/spark/pull/25888].

Based on previous work by [~taoluo], this PR further fixes the issue for SMJ 
codegen.

[~hvanhovell] raised 2 concerns in [~taoluo]'s PR in 
[https://github.com/apache/spark/pull/23762]:
 # This only works for a SMJ with Sorts as its direct input.
 # Not sure if it safe to assume that you can close an underlying child like 
this.

The fix in PR [https://github.com/apache/spark/pull/25888] should have 
addressed concern #2, i.e. it guarantees safeness on closing the iterator for a 
Sort operator early.

This fix does not yet propagate the requests to close iterators of both child 
operators of a SMJ throughout the plan tree to reach the Sort operators.

However, with our experiences in operating all Spark workloads at LI, it is 
mostly common for SMJ not having Sort as its direct input when there are 
multiple SMJs stacked together.

In this case, even if we are not yet propagating the requests, each SMJ can 
still properly handle its local child operators which would still help to 
release the resources early.

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-10-14 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16951154#comment-16951154
 ] 

Min Shen commented on SPARK-21492:
--

We have deployed the latest version of the PR in 
[https://github.com/apache/spark/pull/25888] in LinkedIn's production clusters 
for a week now.

With the most recent changes, all corner cases seem to have been handled.

We are seeing jobs previously failing due to this issue now able to complete.

We have also observed a general reduction of spills during join in our cluster.

Want to see if the community is also working on a fix of this issue, and if so 
whether there's a timeline for the fix.

[~cloud_fan] [~jiangxb1987] [~taoluo]

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-09-21 Thread Min Shen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935176#comment-16935176
 ] 

Min Shen commented on SPARK-21492:
--

We also saw this issue happening in our cluster.

Based on the [~taoluo] 's patch, we worked on a patch which fixes this issue 
for when codegen is enabled.

[https://github.com/apache/spark/pull/25888]

Would appreciate comments on this.

[~taoluo] [~cloud_fan] [~jiangxb1987]

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-09-11 Thread zhoukang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16928204#comment-16928204
 ] 

zhoukang commented on SPARK-21492:
--

Any progress of this issue? [~jiangxb1987]
We also encountered this problem

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-05-20 Thread Xingbo Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16844313#comment-16844313
 ] 

Xingbo Jiang commented on SPARK-21492:
--

I'm working on this, will post a simple design doc in a few days.

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-04-29 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16829545#comment-16829545
 ] 

Tao Luo commented on SPARK-21492:
-

The problem is that the task won't complete because of memory being leaked (You 
can see from the simple example above)
Secondly, it's not just the last page, it's every page with records from unused 
iterators. 
Can we increase the priority of this bug? SMJ is a pretty integral part of 
Spark SQL, and it seems like no progress is being made on this bug, which is 
causing jobs to fail and has no workaround. 

I don't think that it's a hack: the argument seems to be that limit also needs 
to fixed, so let's not fix this bug until that is also fixed, meanwhile this 
issue has been lingering since at least July 2017. 
This would fix a memory leak and improve performance from not spilling 
unnecessarily. Why don't we target this fix for SMJ first, since it's pretty 
isolated to UnsafeExternalRowIterator in SMJ, run it through all the test 
cases, and make additional changes as necessary in the future. 

I've been porting [this PR|https://github.com/apache/spark/pull/23762] onto my 
production Spark cluster for the last 3 months, but I'm hoping we can get some 
sort of fix into 3.0 at least.

I started a discussion thread here, hopefully people can jump in:
http://apache-spark-developers-list.1001551.n3.nabble.com/Memory-leak-in-SortMergeJoin-td27152.html


> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-03-18 Thread Xiaoju Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794962#comment-16794962
 ] 

Xiaoju Wu commented on SPARK-21492:
---

Any updates? Do you have any discussion on the general fix instead of hack in 
SMJ?

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-02-27 Thread Guruprasad (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16779460#comment-16779460
 ] 

Guruprasad commented on SPARK-21492:


i ran the sample code provided on spark 2.0.2 and it used sortedMergeJoin but 
did not lead to memory leaks. So wondering what has changed between spark 2.0.2 
and spark 2.2 that is leading to this OOM error? 

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-02-21 Thread Wenchen Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16773861#comment-16773861
 ] 

Wenchen Fan commented on SPARK-21492:
-

[~taoluo] Thanks for the detailed explanation! I kind of agree that this is a 
memory leak, although the memory can be released when the task completes.

The problematic pattern is: an operator consumes only a part of records from 
its child, so that the child can't release the last page which stores the last 
record it outputs. The child has no idea if the last record has been consumed 
by the parent or not, so it's not safe to release the last page, as doing so 
would make the last record corrupted. SMJ and limit are 2 places that I can 
think of that have this pattern.

So we need a mechanism to allow the parent to tell the child that it can 
release all the resources. Closable iterator is an option here, but we should 
not hack it in SMJ only.

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-02-12 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16766598#comment-16766598
 ] 

Tao Luo commented on SPARK-21492:
-

cc [~tejasp], [~kiszk] for input on code generation to address the memory leak. 

[https://github.com/apache/spark/pull/23762] 

 

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-02-11 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765378#comment-16765378
 ] 

Tao Luo commented on SPARK-21492:
-

I'll take a stab at this jira, should have something to review today or 
tomorrow. 

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-02-08 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763825#comment-16763825
 ] 

Tao Luo commented on SPARK-21492:
-

Can someone add 'affects version' 2.4.0 as well? 

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2019-02-08 Thread Tao Luo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16763820#comment-16763820
 ] 

Tao Luo commented on SPARK-21492:
-

If SortMergeJoinScanner doesn't consume the iterator from 
UnsafeExternalRowSorter entirely, the memory that UnsafeExternalSorter acquired 
from TaskMemoryManager will never be released. This leads to a memory leak, 
spills, and OOME. A page will be held per partition of the unused iterator.



{code:java}
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 
{{== Physical Plan == Coalesce 1 +- *(5) SortMergeJoin [timestamp1#52L|#52L], 
[timestamp2#59L|#59L], Inner :- *(2) Sort [timestamp1#52L ASC NULLS FIRST|#52L 
ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(timestamp1#52L, 200) 
: +- *(1) Project [id#50L AS timestamp1#52L, rand(-4732263137869282482) AS 
value#54|#50L AS timestamp1#52L, rand(-4732263137869282482) AS value#54] : +- 
*(1) Range (1, 1001, step=1, splits=4) +- *(4) Sort [timestamp2#59L ASC NULLS 
FIRST|#59L ASC NULLS FIRST], false, 0 +- Exchange 
hashpartitioning(timestamp2#59L, 200) +- *(3) Project [id#57L AS 
timestamp2#59L, rand(-3625198886289022666) AS value2#61|#57L AS timestamp2#59L, 
rand(-3625198886289022666) AS value2#61] +- *(3) Range (1000, 2001, step=1, 
splits=4)}}

{{org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 
21, 10.100.100.10, executor 0): org.apache.spark.memory.SparkOutOfMemoryError: 
Unable to acquire 65536 bytes of memory, got 0}}
 

Using broadcast succeeds:
{code:java}
#broadcast
joined = r1.join(broadcast(r2), r1.timestamp1 == r2.timestamp2, "inner"){code}
Running on Spark 2.4. 

 

Or if you prefer Scala:
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, rand}

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

var r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn("value", rand())
var r2 = spark.range(1000, 2001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn("value2", rand())
var joined = r1.join(r2, col("timestamp1") === col("timestamp2"), "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show(){code}
 

 

Just reproduced it in standalone mode using 
[https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz,]
 Python 3.7. Same code as above.

 

Succeeds with 1 thread: ./bin/pyspark --master local[1]

Fails with >1 thread: ./bin/pyspark --master local[4]
{code:java}
SparkSession available as 'spark'.

>>> from pyspark.sql.functions import rand, col

>>>

>>> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

>>> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

>>>

>>> r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))

>>> r1 = r1.withColumn('value', rand())

>>> r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))

>>> r2 = r2.withColumn('value2', rand())

>>> joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")

>>> joined = joined.coalesce(1)

>>> joined.show(){code}
 
{code:java}
[Stage 2:>                                                          (0 + 1) / 
1]2019-02-06 17:12:27 WARN  TaskMemoryManager:304 - Failed to allocate a page 
(1900544 bytes), try again.

2019-02-06 17:12:27 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 
6)

org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of 
memory, got 0

at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)

at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119)

at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383)

at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407)

at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135){code}
 

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.3.1, 3.0.0
>Reporter: Zhan Zhang
>Priority: Major
>
> In 

[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2017-10-20 Thread JIRA

[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212522#comment-16212522
 ] 

Bartosz Mścichowski commented on SPARK-21492:
-

Here's a script that exposes memory leak during SortMergeJoin in Spark 2.2.0, 
maybe it will be helpful.
Memory leak happens when the following code is executed in spark-shell (a local 
one). {{--conf spark.sql.autoBroadcastJoinThreshold=-1}} may be needed to 
ensure proper join type.

{noformat}
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val table1Key = "t1_key"
val table1Value = "t1_value"

val table2Key = "t2_key"
val table2Value = "t2_value"

val table1Schema = StructType(List(
StructField(table1Key, IntegerType),
StructField(table1Value, DoubleType)
));

val table2Schema = StructType(List(
StructField(table2Key, IntegerType),
StructField(table2Value, DoubleType)
));

val table1 = spark.sqlContext.createDataFrame(
rowRDD = spark.sparkContext.parallelize(Seq(
Row(1, 2.0)
)),
schema = table1Schema
);

val table2 = spark.sqlContext.createDataFrame(
rowRDD = spark.sparkContext.parallelize(Seq(
Row(1, 4.0)
)),
schema = table2Schema
);


val t1 = table1.repartition(col(table1Key)).groupBy(table1Key).avg()
val t2 = table2.repartition(col(table2Key)).groupBy(table2Key).avg()

val joinedDF = t1 join t2 where t1(table1Key) === t2(table2Key)

joinedDF.explain()
// == Physical Plan ==
// *SortMergeJoin [t1_key#2], [t2_key#9], Inner
// :- *Sort [t1_key#2 ASC NULLS FIRST], false, 0
// :  +- *HashAggregate(keys=[t1_key#2], functions=[avg(cast(t1_key#2 as 
bigint)), avg(t1_value#3)])
// : +- *HashAggregate(keys=[t1_key#2], 
functions=[partial_avg(cast(t1_key#2 as bigint)), partial_avg(t1_value#3)])
// :+- Exchange hashpartitioning(t1_key#2, 200)
// :   +- *Filter isnotnull(t1_key#2)
// :  +- Scan ExistingRDD[t1_key#2,t1_value#3]
// +- *Sort [t2_key#9 ASC NULLS FIRST], false, 0
//+- *HashAggregate(keys=[t2_key#9], functions=[avg(cast(t2_key#9 as 
bigint)), avg(t2_value#10)])
//   +- *HashAggregate(keys=[t2_key#9], 
functions=[partial_avg(cast(t2_key#9 as bigint)), partial_avg(t2_value#10)])
//  +- Exchange hashpartitioning(t2_key#9, 200)
// +- *Filter isnotnull(t2_key#9)
//+- Scan ExistingRDD[t2_key#9,t2_value#10]

joinedDF.show()
// The 'show' action yields a lot of:
// 17/10/19 08:17:39 WARN executor.Executor: Managed memory leak detected; size 
= 4194304 bytes, TID = 8
// 17/10/19 08:17:39 WARN executor.Executor: Managed memory leak detected; size 
= 4194304 bytes, TID = 9
// 17/10/19 08:17:39 WARN executor.Executor: Managed memory leak detected; size 
= 4194304 bytes, TID = 11
{noformat}


> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Zhan Zhang
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2017-07-20 Thread Zhan Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095425#comment-16095425
 ] 

Zhan Zhang commented on SPARK-21492:


root cause: In the SortMergeJoin, inner/leftOuter/rightOuter, one side of the 
SortedIter may not exhausted, that chunk of the memory thus cannot be released, 
causing memory leak and performance degradtion.

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Zhan Zhang
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin

2017-07-20 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095418#comment-16095418
 ] 

Apache Spark commented on SPARK-21492:
--

User 'zhzhan' has created a pull request for this issue:
https://github.com/apache/spark/pull/18694

> Memory leak in SortMergeJoin
> 
>
> Key: SPARK-21492
> URL: https://issues.apache.org/jira/browse/SPARK-21492
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Zhan Zhang
>
> In SortMergeJoin, if the iterator is not exhausted, there will be memory leak 
> caused by the Sort. The memory is not released until the task end, and cannot 
> be used by other operators causing performance drop or OOM.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org