[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20561


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167421526
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 ---
@@ -98,10 +99,22 @@ public UnsafeKVExternalSorter(
 numElementsForSpillThreshold,
 canUseRadixSort);
 } else {
-  // The array will be used to do in-place sort, which require half of 
the space to be empty.
-  // Note: each record in the map takes two entries in the array, one 
is record pointer,
-  // another is the key prefix.
-  assert(map.numKeys() * 2 <= map.getArray().size() / 2);
+  LongArray pointArray = map.getArray();
+  // `BytesToBytesMap`'s point array is only guaranteed to hold all 
the distinct keys, but
+  // `UnsafeInMemorySorter`'s point array need to hold all the 
entries. Since `BytesToBytesMap`
+  // can have duplicated keys, here we need a check to make sure the 
point array can hold
+  // all the entries in `BytesToBytesMap`.
+  // The point array will be used to do in-place sort, which requires 
half of the space to be
+  // empty. Note: each record in the map takes two entries in the 
point array, one is record
+  // pointer, another is key prefix. So the required size of point 
array is `numRecords * 4`.
+  // TODO: It's possible to change UnsafeInMemorySorter to have 
multiple entries with same key,
+  // so that we can always reuse the point array.
+  if (map.numValues() > pointArray.size() / 4) {
+// Here we ask the map to allocate memory, so that the memory 
manager won't ask the map
+// to spill, if the memory is not enough.
+pointArray = map.allocateArray(map.numValues() * 4L);
+  }
+
   // During spilling, the array in map will not be used, so we can 
borrow that and use it
   // as the underlying array for in-memory sorter (it's always large 
enough).
--- End diff --

Shall we update the comment here too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167387192
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 ---
@@ -98,10 +99,20 @@ public UnsafeKVExternalSorter(
 numElementsForSpillThreshold,
 canUseRadixSort);
 } else {
-  // The array will be used to do in-place sort, which require half of 
the space to be empty.
-  // Note: each record in the map takes two entries in the array, one 
is record pointer,
-  // another is the key prefix.
-  assert(map.numKeys() * 2 <= map.getArray().size() / 2);
+  // `BytesToBytesMap`'s point array is only guaranteed to hold all 
the distinct keys, but
+  // `UnsafeInMemorySorter`'s point array need to hold all the 
entries. Since `BytesToBytesMap`
+  // can have duplicated keys, here we need a check to make sure the 
point array can hold
+  // all the entries in `BytesToBytesMap`.
+  final LongArray pointArray;
+  // The point array will be used to do in-place sort, which require 
half of the space to be
+  // empty. Note: each record in the map takes two entries in the 
point array, one is record
+  // pointer, another is the key prefix.
+  if (map.numValues() > map.getArray().size() / 4) {
+pointArray = map.allocateArray(map.numValues() * 4);
--- End diff --

`map.allocateArray` will trigger other consumers to spill is memory is not 
enough. If the allocation still fails, there is nothing we can do, just let the 
execution fail.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167387138
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
 ---
@@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends 
SparkFunSuite with SharedSQLContext {
   spill = true
 )
   }
+
+  test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap 
having duplicated keys") {
+val memoryManager = new TestMemoryManager(new SparkConf())
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
+val map = new BytesToBytesMap(taskMemoryManager, 64, 
taskMemoryManager.pageSizeBytes())
+
+// Key/value are a unsafe rows with a single int column
+val schema = new StructType().add("i", IntegerType)
+val key = new UnsafeRow(1)
+key.pointTo(new Array[Byte](32), 32)
+key.setInt(0, 1)
+val value = new UnsafeRow(1)
+value.pointTo(new Array[Byte](32), 32)
+value.setInt(0, 2)
+
+for (_ <- 1 to 65) {
+  val loc = map.lookup(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes)
+  loc.append(
+key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
+value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
+}
+
+// Make sure we can successfully create a UnsafeKVExternalSorter with 
a `BytesToBytesMap`
+// which has duplicated keys and the number of entries exceeds its 
capacity.
--- End diff --

yes, we use `BytesToBytesMap` to build the broadcast join hash relation, 
which may have duplicated keys. I only create a new pointer array if the 
existing one is not big enough, so we won't have performance regression for 
aggregate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167387073
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 ---
@@ -98,10 +99,20 @@ public UnsafeKVExternalSorter(
 numElementsForSpillThreshold,
 canUseRadixSort);
 } else {
-  // The array will be used to do in-place sort, which require half of 
the space to be empty.
-  // Note: each record in the map takes two entries in the array, one 
is record pointer,
-  // another is the key prefix.
-  assert(map.numKeys() * 2 <= map.getArray().size() / 2);
+  // `BytesToBytesMap`'s point array is only guaranteed to hold all 
the distinct keys, but
+  // `UnsafeInMemorySorter`'s point array need to hold all the 
entries. Since `BytesToBytesMap`
--- End diff --

yea, but it's not trivial, I'd like to do it later. The required change I 
can think of: `BytesToBytesMap` is actually a key -> list, and we need 
to provide a way to iterate key -> list instead of key -> value.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167307229
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 ---
@@ -98,10 +99,20 @@ public UnsafeKVExternalSorter(
 numElementsForSpillThreshold,
 canUseRadixSort);
 } else {
-  // The array will be used to do in-place sort, which require half of 
the space to be empty.
-  // Note: each record in the map takes two entries in the array, one 
is record pointer,
-  // another is the key prefix.
-  assert(map.numKeys() * 2 <= map.getArray().size() / 2);
+  // `BytesToBytesMap`'s point array is only guaranteed to hold all 
the distinct keys, but
+  // `UnsafeInMemorySorter`'s point array need to hold all the 
entries. Since `BytesToBytesMap`
+  // can have duplicated keys, here we need a check to make sure the 
point array can hold
+  // all the entries in `BytesToBytesMap`.
+  final LongArray pointArray;
+  // The point array will be used to do in-place sort, which require 
half of the space to be
+  // empty. Note: each record in the map takes two entries in the 
point array, one is record
+  // pointer, another is the key prefix.
+  if (map.numValues() > map.getArray().size() / 4) {
+pointArray = map.allocateArray(map.numValues() * 4);
--- End diff --

Since overflow may occur (e.g. 0x7000 * 4), should we use `* 
4L` instead of `* 4`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167300330
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
 ---
@@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends 
SparkFunSuite with SharedSQLContext {
   spill = true
 )
   }
+
+  test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap 
having duplicated keys") {
+val memoryManager = new TestMemoryManager(new SparkConf())
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
+val map = new BytesToBytesMap(taskMemoryManager, 64, 
taskMemoryManager.pageSizeBytes())
+
+// Key/value are a unsafe rows with a single int column
+val schema = new StructType().add("i", IntegerType)
+val key = new UnsafeRow(1)
+key.pointTo(new Array[Byte](32), 32)
+key.setInt(0, 1)
+val value = new UnsafeRow(1)
+value.pointTo(new Array[Byte](32), 32)
+value.setInt(0, 2)
+
+for (_ <- 1 to 65) {
+  val loc = map.lookup(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes)
+  loc.append(
+key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
+value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
+}
+
+// Make sure we can successfully create a UnsafeKVExternalSorter with 
a `BytesToBytesMap`
+// which has duplicated keys and the number of entries exceeds its 
capacity.
--- End diff --

For aggregation, there are no multiple entries for same key, that only 
happen for hash join (Don't remember the details)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167299807
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 ---
@@ -98,10 +99,20 @@ public UnsafeKVExternalSorter(
 numElementsForSpillThreshold,
 canUseRadixSort);
 } else {
-  // The array will be used to do in-place sort, which require half of 
the space to be empty.
-  // Note: each record in the map takes two entries in the array, one 
is record pointer,
-  // another is the key prefix.
-  assert(map.numKeys() * 2 <= map.getArray().size() / 2);
+  // `BytesToBytesMap`'s point array is only guaranteed to hold all 
the distinct keys, but
+  // `UnsafeInMemorySorter`'s point array need to hold all the 
entries. Since `BytesToBytesMap`
+  // can have duplicated keys, here we need a check to make sure the 
point array can hold
+  // all the entries in `BytesToBytesMap`.
+  final LongArray pointArray;
+  // The point array will be used to do in-place sort, which require 
half of the space to be
+  // empty. Note: each record in the map takes two entries in the 
point array, one is record
+  // pointer, another is the key prefix.
+  if (map.numValues() > map.getArray().size() / 4) {
+pointArray = map.allocateArray(map.numValues() * 4);
--- End diff --

The allocation may fail.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167299716
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 ---
@@ -98,10 +99,20 @@ public UnsafeKVExternalSorter(
 numElementsForSpillThreshold,
 canUseRadixSort);
 } else {
-  // The array will be used to do in-place sort, which require half of 
the space to be empty.
-  // Note: each record in the map takes two entries in the array, one 
is record pointer,
-  // another is the key prefix.
-  assert(map.numKeys() * 2 <= map.getArray().size() / 2);
+  // `BytesToBytesMap`'s point array is only guaranteed to hold all 
the distinct keys, but
+  // `UnsafeInMemorySorter`'s point array need to hold all the 
entries. Since `BytesToBytesMap`
--- End diff --

It's possible to change UnsafeInMemorySorter to have multiple entries with 
same key.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/20561

[SPARK-23376][SQL] creating UnsafeKVExternalSorter with BytesToBytesMap may 
fail

## What changes were proposed in this pull request?

This is a long-standing bug in `UnsafeKVExternalSorter` and was reported in 
the dev list multiple times.

When creating `UnsafeKVExternalSorter` with `BytesToBytesMap`, we need to 
create a `UnsafeInMemorySorter` to sort the data in `BytesToBytesMap`. The data 
format of the sorter and the map is same, so no data movement is required. 
However, both the sorter and the map need a point array for some bookkeeping 
work.

There is an optimization in `UnsafeKVExternalSorter`: reuse the point array 
between the sorter and the map, to avoid an extra memory allocation. This 
sounds like a reasonable optimization, the length of the `BytesToBytesMap` 
point array is at least 4 times larger than the number of keys(to avoid hash 
collision, the hash table size should be at least 2 times larger than the 
number of keys, and each key occupies 2 slots). `UnsafeInMemorySorter` needs 
the pointer array size to be 4 times of the number of entries, so we are safe 
to reuse the point array.

However, the number of keys of the map doesn't equal to the number of 
entries in the map, because `BytesToBytesMap` supports duplicated keys. This 
breaks the assumption of the above optimization and we may run out of space 
when inserting data into the sorter, and hit error
```
java.lang.IllegalStateException: There is no space for new record
   at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.insertRecord(UnsafeInMemorySorter.java:239)
   at 
org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:149)
...
```

This PR fixes this bug by creating a new point array if the existing one is 
not big enough.

## How was this patch tested?

a new test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark bug

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20561.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20561






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org