[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20555


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r168066120
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -230,6 +227,7 @@ private void signalAsyncReadComplete() {
 
   private void waitForAsyncReadComplete() throws IOException {
 stateChangeLock.lock();
+isWaiting.set(true);
 try {
   while (readInProgress) {
--- End diff --

shall we add a comment about spurious wakeup? Otherwise someone else may 
still mistakenly remove it in the future.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-13 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r168048937
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -78,9 +79,8 @@
   // whether there is a read ahead task running,
   private boolean isReading;
 
-  // If the remaining data size in the current buffer is below this 
threshold,
-  // we issue an async read from the underlying input stream.
-  private final int readAheadThresholdInBytes;
+  // whether there is a reader waiting for data.
+  private AtomicBoolean isWaiting = new AtomicBoolean(false);
--- End diff --

I'll leave it be - should compile to basically the same, and with using 
`AtomicBoolean` the intent seems more readable to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-13 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r168048795
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -230,24 +227,32 @@ private void signalAsyncReadComplete() {
 
   private void waitForAsyncReadComplete() throws IOException {
 stateChangeLock.lock();
+isWaiting.set(true);
 try {
-  while (readInProgress) {
+  if (readInProgress) {
--- End diff --

Good catch, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-13 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r168036836
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -78,9 +79,8 @@
   // whether there is a read ahead task running,
   private boolean isReading;
 
-  // If the remaining data size in the current buffer is below this 
threshold,
-  // we issue an async read from the underlying input stream.
-  private final int readAheadThresholdInBytes;
+  // whether there is a reader waiting for data.
+  private AtomicBoolean isWaiting = new AtomicBoolean(false);
--- End diff --

You can just use `volatile` here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-13 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r168007713
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -230,24 +227,32 @@ private void signalAsyncReadComplete() {
 
   private void waitForAsyncReadComplete() throws IOException {
 stateChangeLock.lock();
+isWaiting.set(true);
 try {
-  while (readInProgress) {
+  if (readInProgress) {
--- End diff --

The while loop here is to handle [spurious 
wakeup](https://en.wikipedia.org/wiki/Spurious_wakeup).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-13 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r167971273
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -258,54 +263,43 @@ public int read(byte[] b, int offset, int len) throws 
IOException {
 if (len == 0) {
   return 0;
 }
-stateChangeLock.lock();
-try {
-  return readInternal(b, offset, len);
-} finally {
-  stateChangeLock.unlock();
-}
-  }
 
-  /**
-   * flip the active and read ahead buffer
-   */
-  private void swapBuffers() {
-ByteBuffer temp = activeBuffer;
-activeBuffer = readAheadBuffer;
-readAheadBuffer = temp;
-  }
-
-  /**
-   * Internal read function which should be called only from read() api. 
The assumption is that
-   * the stateChangeLock is already acquired in the caller before calling 
this function.
-   */
-  private int readInternal(byte[] b, int offset, int len) throws 
IOException {
-assert (stateChangeLock.isLocked());
 if (!activeBuffer.hasRemaining()) {
-  waitForAsyncReadComplete();
-  if (readAheadBuffer.hasRemaining()) {
-swapBuffers();
-  } else {
-// The first read or activeBuffer is skipped.
-readAsync();
+  // No remaining in active buffer - lock and switch to write ahead 
buffer.
+  stateChangeLock.lock();
+  try {
 waitForAsyncReadComplete();
-if (isEndOfStream()) {
-  return -1;
+if (!readAheadBuffer.hasRemaining()) {
+  // The first read or activeBuffer is skipped.
--- End diff --

skipped using `skip()`.
I moved the comment over from a few lines above, but looking at `skip()` 
now I don't think it can happen - the skip would trigger an `readAsync` read in 
that case.
I'll update the comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-13 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r167824897
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -258,54 +263,43 @@ public int read(byte[] b, int offset, int len) throws 
IOException {
 if (len == 0) {
   return 0;
 }
-stateChangeLock.lock();
-try {
-  return readInternal(b, offset, len);
-} finally {
-  stateChangeLock.unlock();
-}
-  }
 
-  /**
-   * flip the active and read ahead buffer
-   */
-  private void swapBuffers() {
-ByteBuffer temp = activeBuffer;
-activeBuffer = readAheadBuffer;
-readAheadBuffer = temp;
-  }
-
-  /**
-   * Internal read function which should be called only from read() api. 
The assumption is that
-   * the stateChangeLock is already acquired in the caller before calling 
this function.
-   */
-  private int readInternal(byte[] b, int offset, int len) throws 
IOException {
-assert (stateChangeLock.isLocked());
 if (!activeBuffer.hasRemaining()) {
-  waitForAsyncReadComplete();
-  if (readAheadBuffer.hasRemaining()) {
-swapBuffers();
-  } else {
-// The first read or activeBuffer is skipped.
-readAsync();
+  // No remaining in active buffer - lock and switch to write ahead 
buffer.
+  stateChangeLock.lock();
+  try {
 waitForAsyncReadComplete();
-if (isEndOfStream()) {
-  return -1;
+if (!readAheadBuffer.hasRemaining()) {
+  // The first read or activeBuffer is skipped.
--- End diff --

unrelated question: what does `activeBuffer is skipped` mean?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-12 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r167651037
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -232,7 +229,9 @@ private void waitForAsyncReadComplete() throws 
IOException {
 stateChangeLock.lock();
 try {
   while (readInProgress) {
+isWaiting.set(true);
 asyncReadComplete.await();
+isWaiting.set(false);
--- End diff --

Good catch, I added `isWaiting.set(false)` to the finally branch.
Actually, since the whole implementation assumes that there is only one 
reader, I removed the while() loop, since there is no other reader to race with 
us to trigger another read.

In practice I think not updating `isWaiting` it would have been benign, as 
after the exception the query will be going down with an 
`InterruptedException`, or elsewise anyone upstream handling that exception 
would most probably declare that stream as unusable afterwards anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-12 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r167646954
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -258,54 +262,43 @@ public int read(byte[] b, int offset, int len) throws 
IOException {
 if (len == 0) {
   return 0;
 }
-stateChangeLock.lock();
-try {
-  return readInternal(b, offset, len);
-} finally {
-  stateChangeLock.unlock();
-}
-  }
-
-  /**
-   * flip the active and read ahead buffer
-   */
-  private void swapBuffers() {
-ByteBuffer temp = activeBuffer;
-activeBuffer = readAheadBuffer;
-readAheadBuffer = temp;
-  }
 
-  /**
-   * Internal read function which should be called only from read() api. 
The assumption is that
-   * the stateChangeLock is already acquired in the caller before calling 
this function.
-   */
-  private int readInternal(byte[] b, int offset, int len) throws 
IOException {
-assert (stateChangeLock.isLocked());
 if (!activeBuffer.hasRemaining()) {
-  waitForAsyncReadComplete();
-  if (readAheadBuffer.hasRemaining()) {
-swapBuffers();
-  } else {
-// The first read or activeBuffer is skipped.
-readAsync();
+  // No remaining in active buffer - lock and switch to write ahead 
buffer.
+  stateChangeLock.lock();
+  try {
 waitForAsyncReadComplete();
-if (isEndOfStream()) {
-  return -1;
+if (!readAheadBuffer.hasRemaining()) {
+  // The first read or activeBuffer is skipped.
+  readAsync();
+  waitForAsyncReadComplete();
+  if (isEndOfStream()) {
+return -1;
+  }
 }
+// Swap the newly read read ahead buffer in place of empty active 
buffer.
--- End diff --

Other existing places in comments in the file use `read ahead`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r167391619
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -258,54 +262,43 @@ public int read(byte[] b, int offset, int len) throws 
IOException {
 if (len == 0) {
   return 0;
 }
-stateChangeLock.lock();
-try {
-  return readInternal(b, offset, len);
-} finally {
-  stateChangeLock.unlock();
-}
-  }
-
-  /**
-   * flip the active and read ahead buffer
-   */
-  private void swapBuffers() {
-ByteBuffer temp = activeBuffer;
-activeBuffer = readAheadBuffer;
-readAheadBuffer = temp;
-  }
 
-  /**
-   * Internal read function which should be called only from read() api. 
The assumption is that
-   * the stateChangeLock is already acquired in the caller before calling 
this function.
-   */
-  private int readInternal(byte[] b, int offset, int len) throws 
IOException {
-assert (stateChangeLock.isLocked());
 if (!activeBuffer.hasRemaining()) {
-  waitForAsyncReadComplete();
-  if (readAheadBuffer.hasRemaining()) {
-swapBuffers();
-  } else {
-// The first read or activeBuffer is skipped.
-readAsync();
+  // No remaining in active buffer - lock and switch to write ahead 
buffer.
+  stateChangeLock.lock();
+  try {
 waitForAsyncReadComplete();
-if (isEndOfStream()) {
-  return -1;
+if (!readAheadBuffer.hasRemaining()) {
+  // The first read or activeBuffer is skipped.
+  readAsync();
+  waitForAsyncReadComplete();
+  if (isEndOfStream()) {
+return -1;
+  }
 }
+// Swap the newly read read ahead buffer in place of empty active 
buffer.
--- End diff --

Is it good to use `read-ahead` in comments for ease of reading?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r167391309
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -232,7 +229,9 @@ private void waitForAsyncReadComplete() throws 
IOException {
 stateChangeLock.lock();
 try {
   while (readInProgress) {
+isWaiting.set(true);
 asyncReadComplete.await();
+isWaiting.set(false);
--- End diff --

What happens if `await()` throws an exception? Is it ok not to update 
`isWaiting`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-08 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/20555

[SPARK-23366] Improve hot reading path in ReadAheadInputStream

## What changes were proposed in this pull request?

`ReadAheadInputStream` was introduced in 
https://github.com/apache/spark/pull/18317/ to optimize reading spill files 
from disk.
However, from the profiles it seems that the hot path of reading small 
amounts of data (like readInt) is inefficient - it involves taking locks, and 
multiple checks.

Optimize locking: Lock is not needed when simply accessing the active 
buffer. Only lock when needing to swap buffers or trigger async reading, or get 
information about the async state.

Optimize short-path single byte reads, that are used e.g. by Java library 
DataInputStream.readInt.

The asyncReader used to call "read" only once on the underlying stream, 
that never filled the underlying buffer when it was wrapping an 
LZ4BlockInputStream. If the buffer was returned unfilled, that would trigger 
the async reader to be triggered to fill the read ahead buffer on each call, 
because the reader would see that the active buffer is below the refill 
threshold all the time.

However, filling the full buffer all the time could introduce increased 
latency, so also add an `AtomicBoolean` flag for the async reader to return 
earlier if there is a reader waiting for data.

Remove `readAheadThresholdInBytes` and instead immediately trigger async 
read when switching the buffers. It allows to simplify code paths, especially 
the hot one that then only has to check if there is available data in the 
active buffer, without worrying if it needs to retrigger async read. It seems 
to have positive effect on perf.

## How was this patch tested?

It was noticed as a regression in some workloads after upgrading to Spark 
2.3. 

It was particularly visible on TPCDS Q95 running on instances with fast 
disk (i3 AWS instances).
Running with profiling:
* Spark 2.2 - 5.2-5.3 minutes 9.5% in LZ4BlockInputStream.read
* Spark 2.3 - 6.4-6.6 minutes 31.1% in ReadAheadInputStream.read
* Spark 2.3 + fix - 5.3-5.4 minutes 13.3% in ReadAheadInputStream.read - 
very slightly slower, practically within noise.

We didn't see other regressions, and many workloads in general seem to be 
faster with Spark 2.3 (not investigated if thanks to async readed, or 
unrelated).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-23366

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20555.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20555


commit 987f15ccb01b6c0351fbfdd49d6930b929c50a74
Author: Juliusz Sompolski 
Date:   2018-01-30T20:54:47Z

locking tweak

commit b26ffce6780078dbc38bff658e1ef7e9c56c3dd8
Author: Juliusz Sompolski 
Date:   2018-02-01T14:27:09Z

fill the read ahead buffer




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org