maropu commented on a change in pull request #27246:
URL: https://github.com/apache/spark/pull/27246#discussion_r434925293
##########
File path:
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
##########
@@ -703,6 +702,7 @@ public boolean hasNext() {
@Override
public void loadNext() throws IOException {
Review comment:
```
@Override
public void loadNext() throws IOException {
this.hasNext();
}
```
##########
File path:
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
##########
@@ -720,5 +720,14 @@ public void loadNext() throws IOException {
@Override
public long getKeyPrefix() { return current.getKeyPrefix(); }
+
+ private void initializeNumRecords() throws IOException {
+ if (numRecords == 0) {
+ for (UnsafeSorterIterator iter: iterators) {
+ numRecords += iter.getNumRecords();
+ }
+ numRecords += current.getNumRecords();
+ }
+ }
Review comment:
nit: How about this? (Semantically, `this.current` should be null before
`initializeNumRecords` called)
```
ChainedIterator(Queue<UnsafeSorterIterator> iterators) {
...
this.current = null;
}
private void initializeNumRecords() throws IOException {
if (numRecords == 0) {
for (UnsafeSorterIterator iter: iterators) {
numRecords += iter.getNumRecords();
}
this.current = iterators.remove();
}
}
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
##########
@@ -168,7 +159,11 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
if (spillableArray == null) {
new InMemoryBufferIterator(startIndex)
} else {
- new SpillableArrayIterator(spillableArray.getIterator(startIndex),
numFieldsPerRow)
+ new MergerIterator(spillableArray.getIterator(if (startIndex >
numRowBufferedInMemory) {
+ startIndex -
numRowBufferedInMemory
+ } else 0),
+ numFieldsPerRow,
+ startIndex)
Review comment:
format:
```
if (spillableArray == null) {
new InMemoryBufferIterator(startIndex)
} else {
val offsetIndex = if (startIndex > numRowBufferedInMemory) {
startIndex - numRowBufferedInMemory
} else {
0
}
new MergerIterator(
spillableArray.getIterator(offsetIndex),
numFieldsPerRow,
startIndex)
}
```
##########
File path:
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
##########
@@ -47,55 +47,48 @@
private int numRecords;
private int numRecordsRemaining;
- private byte[] arr = new byte[1024 * 1024];
+ private byte[] arr = new byte[1024];
Review comment:
If this number is performance-senstive, could we parameterize it?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
##########
@@ -204,20 +199,37 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
}
}
- private[this] class SpillableArrayIterator(
+ private[this] class MergerIterator(
Review comment:
nit: How about `MergerIterator` -> `SpilledArrayMergeIterator`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
##########
@@ -82,6 +82,8 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
private var numFieldsPerRow = 0
+ private var numRowBufferedInMemory = 0
Review comment:
We need this variable? Could we use `inMemoryBuffer.length` instead?
##########
File path:
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
##########
@@ -148,4 +141,34 @@ public void close() throws IOException {
}
}
}
+
+ private void readFile() throws IOException {
Review comment:
nit: How about `readFile` -> `readSpilledFile`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
##########
@@ -124,29 +129,15 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
numRowsSpillThreshold,
false)
- // populate with existing in-memory buffered rows
- if (inMemoryBuffer != null) {
- inMemoryBuffer.foreach(existingUnsafeRow =>
- spillableArray.insertRecord(
- existingUnsafeRow.getBaseObject,
- existingUnsafeRow.getBaseOffset,
- existingUnsafeRow.getSizeInBytes,
- 0,
- false)
- )
- inMemoryBuffer.clear()
- }
numFieldsPerRow = unsafeRow.numFields()
}
-
Review comment:
nit: could you avoid the unnecessary change?
##########
File path:
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
##########
@@ -148,4 +141,34 @@ public void close() throws IOException {
}
}
}
+
+ private void readFile() throws IOException {
+ assert (dataFile.length() > 0);
+ final ConfigEntry<Object> bufferSizeConfigEntry =
+ package$.MODULE$.UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE();
Review comment:
Probably, your copy-and-paste lead to wrong indents in this method.
Could you check them again?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]