[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...

2017-09-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18875#discussion_r138228111
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 ---
@@ -193,14 +223,35 @@ private[sql] class JacksonGenerator(
*
* @param row The row to convert
*/
-  def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, 
rootFieldWriters))
+  def write(row: InternalRow): Unit = {
+writeObject(writeFields(
+  fieldWriters = rootFieldWriters,
+  row = row,
+  schema = dataType.asInstanceOf[StructType]))
+  }
+
+
+  /**
+   * Transforms multiple `InternalRow`s or `MapData`s to JSON array using 
Jackson
+   *
+   * @param array The array of rows or maps to convert
+   */
+  def write(array: ArrayData): Unit = writeArray(writeArrayData(
+fieldWriter = arrElementWriter,
+array = array
+  ))
--- End diff --

Let's change this one back to:

```scala
def write(array: ArrayData): Unit = writeArray(writeArrayData(array, 
arrElementWriter))
```




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138225592
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteB

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138201871
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteB

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138207519
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteB

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138193555
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteB

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138225438
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteB

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138198506
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
--- End diff --

nit: double spaces


---

--

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138207456
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteB

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138195259
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
--- End diff --

nit: could you add the `bufferSizeInBytes` value to the error message?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138194732
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteB

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138204793
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,292 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean isReadInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean isReadAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private final byte[] oneByte = new byte[1];
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 
&& endOfStream) {
+  return true;
+}
+return  false;
+  }
+
+
+  private void readAsync(final ByteBuffer byteBuffer) throws IOException {
+stateChangeLock.lock();
+if (endOfStream || isReadInProgress) {
+  stateCh

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138189593
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 ---
@@ -72,10 +72,15 @@ public UnsafeSorterSpillReader(
   bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
 }
 
+final Double readAheadFraction =
+SparkEnv.get() == null ? 0.5 :
+ 
SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction",
 0.5);
+
 final InputStream bs =
 new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
 try {
-  this.in = serializerManager.wrapStream(blockId, bs);
+  this.in = new 
ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
--- End diff --

Could you add an internal conf to disable it? It will allow the user to 
disable it when the new feature causes a regression.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138189733
  
--- Diff: 
core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java ---
@@ -50,17 +52,16 @@ public void tearDown() {
 inputFile.delete();
   }
 
+
--- End diff --

nit: extra empty line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138200321
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,317 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private final byte[] oneByte = new byte[1];
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean hasRemaining() {
+if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 
&& endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteBuffer) throws IOException {
+stateCh

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138194101
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteB

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138201264
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteB

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138188910
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 ---
@@ -72,10 +72,15 @@ public UnsafeSorterSpillReader(
   bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
 }
 
+final Double readAheadFraction =
--- End diff --

nit: Double -> double


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138195292
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138200201
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteB

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138194341
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteB

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138208239
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteB

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138198579
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
--- End diff --

why not just `return !activeBuff

[GitHub] spark pull request #19136: [SPARK-15689][SQL] data source v2

2017-09-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r138224219
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
+import 
org.apache.spark.sql.sources.v2.reader.downward.{CatalystFilterPushDownSupport, 
ColumnPruningSupport, FilterPushDownSupport}
+
+object DataSourceV2Strategy extends Strategy {
+  // TODO: write path
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+case PhysicalOperation(projects, filters, DataSourceV2Relation(output, 
reader)) =>
+  val attrMap = AttributeMap(output.zip(output))
+
+  val projectSet = AttributeSet(projects.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  // Match original case of attributes.
+  // TODO: nested fields pruning
+  val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap)
+  reader match {
+case r: ColumnPruningSupport =>
+  r.pruneColumns(requiredColumns.toStructType)
+case _ =>
+  }
+
+  val stayUpFilters: Seq[Expression] = reader match {
+case r: CatalystFilterPushDownSupport =>
+  r.pushCatalystFilters(filters.toArray)
+
+case r: FilterPushDownSupport =>
--- End diff --

yea we can't prevent users to implement them both, and we will pick 
`CatalystFilterPushDownSupport` over `FilterPushDownSupport`. Let me document 
it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [SPARK-15689][SQL] data source v2

2017-09-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r138224082
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader
+import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport
+
+case class DataSourceV2Relation(
+output: Seq[AttributeReference],
+reader: DataSourceV2Reader) extends LeafNode {
+
+  override def computeStats(): Statistics = reader match {
+case r: StatisticsSupport => Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes())
+case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
+}
+
+object DataSourceV2Relation {
+  def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
+new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
--- End diff --

I think users can write a special `ReadTask` to do it, but we can't save 
memory by doing this. When an operator(the scan operator) transfers data to 
another operator, the data must be `UnsafeRow`s. So even users return a joined 
row in `ReadTask`, Spark need to convert it to `UnsafeRow`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...

2017-09-11 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19185#discussion_r138220213
  
--- Diff: python/pyspark/ml/tests.py ---
@@ -1473,11 +1473,59 @@ def test_logistic_regression_summary(self):
 self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame))
 self.assertTrue(isinstance(s.precisionByThreshold, DataFrame))
 self.assertTrue(isinstance(s.recallByThreshold, DataFrame))
+
+self.assertAlmostEqual(s.accuracy, 1.0, 2)
--- End diff --

also nit, but should probably add tests for all the new attributes, like 
`falsePositiveRateByLabel` as below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...

2017-09-11 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19185#discussion_r138219915
  
--- Diff: python/pyspark/ml/tests.py ---
@@ -1473,11 +1473,59 @@ def test_logistic_regression_summary(self):
 self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame))
 self.assertTrue(isinstance(s.precisionByThreshold, DataFrame))
 self.assertTrue(isinstance(s.recallByThreshold, DataFrame))
+
+self.assertAlmostEqual(s.accuracy, 1.0, 2)
+self.assertAlmostEqual(s.weightedTruePositiveRate, 1.0, 2)
+self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.0, 2)
+self.assertAlmostEqual(s.weightedRecall, 1.0, 2)
+self.assertAlmostEqual(s.weightedPrecision, 1.0, 2)
+self.assertAlmostEqual(s.weightedFMeasure(), 1.0, 2)
 # test evaluation (with training dataset) produces a summary with 
same values
 # one check is enough to verify a summary is returned, Scala 
version runs full test
 sameSummary = model.evaluate(df)
 self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC)
 
+def test_multiclass_logistic_regression_summary(self):
+df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
+ (0.0, 2.0, Vectors.sparse(1, [], 
[])),
+ (2.0, 2.0, Vectors.dense(2.0)),
+ (2.0, 2.0, Vectors.dense(1.9))],
+["label", "weight", "features"])
+lr = LogisticRegression(maxIter=5, regParam=0.01, 
weightCol="weight", fitIntercept=False)
+model = lr.fit(df)
+self.assertTrue(model.hasSummary)
+s = model.summary
+# test that api is callable and returns expected types
+self.assertTrue(isinstance(s.predictions, DataFrame))
+self.assertEqual(s.probabilityCol, "probability")
+self.assertEqual(s.labelCol, "label")
+self.assertEqual(s.featuresCol, "features")
+self.assertEqual(s.predictionCol, "prediction")
+objHist = s.objectiveHistory
+self.assertTrue(isinstance(objHist, list) and 
isinstance(objHist[0], float))
+self.assertGreater(s.totalIterations, 0)
+self.assertTrue(isinstance(s.labels, list))
+self.assertTrue(isinstance(s.truePositiveRateByLabel, list))
+self.assertTrue(isinstance(s.falsePositiveRateByLabel, list))
+self.assertTrue(isinstance(s.precisionByLabel, list))
+self.assertTrue(isinstance(s.recallByLabel, list))
+self.assertTrue(isinstance(s.fMeasureByLabel(), list))
+self.assertAlmostEqual(s.accuracy, 0.75, 2)
+self.assertAlmostEqual(s.weightedTruePositiveRate, 0.75, 2)
+self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.25, 2)
+self.assertAlmostEqual(s.weightedRecall, 0.75, 2)
+self.assertAlmostEqual(s.weightedPrecision, 0.583, 2)
+self.assertAlmostEqual(s.weightedFMeasure(), 0.65, 2)
--- End diff --

maybe add `beta=1.0` to the methods that take beta as a parameter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...

2017-09-11 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19185#discussion_r138220005
  
--- Diff: python/pyspark/ml/tests.py ---
@@ -1473,11 +1473,59 @@ def test_logistic_regression_summary(self):
 self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame))
 self.assertTrue(isinstance(s.precisionByThreshold, DataFrame))
 self.assertTrue(isinstance(s.recallByThreshold, DataFrame))
+
+self.assertAlmostEqual(s.accuracy, 1.0, 2)
--- End diff --

care to add these to the scala unit test for binary summary as well?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...

2017-09-11 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19185#discussion_r138220297
  
--- Diff: python/pyspark/ml/classification.py ---
@@ -528,9 +528,11 @@ def summary(self):
 trained on the training set. An exception is thrown if 
`trainingSummary is None`.
 """
 if self.hasSummary:
-java_blrt_summary = self._call_java("summary")
-# Note: Once multiclass is added, update this to return 
correct summary
-return 
BinaryLogisticRegressionTrainingSummary(java_blrt_summary)
+java_lrt_summary = self._call_java("summary")
+if (self.numClasses <= 2):
--- End diff --

nit: remove parentheses


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19188: [SPARK-21973][SQL] Add an new option to filter qu...

2017-09-11 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19188#discussion_r138221684
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
 ---
@@ -113,12 +114,39 @@ object TPCDSQueryBenchmark {
   "q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90",
   "q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99")
 
+val sparkConf = new SparkConf()
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refe...

2017-09-11 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/18592#discussion_r138220942
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
 ---
@@ -99,6 +95,20 @@ object TPCDSQueryBenchmark {
   }
 
   def main(args: Array[String]): Unit = {
+if (args.length < 1) {
--- End diff --

Good idea. I'll add `TPCDSQueryBenchmarkArguments`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19141: [SPARK-21384] [YARN] Spark + YARN fails with Loca...

2017-09-11 Thread devaraj-kavali
Github user devaraj-kavali commented on a diff in the pull request:

https://github.com/apache/spark/pull/19141#discussion_r138219530
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -565,7 +565,6 @@ private[spark] class Client(
   distribute(jarsArchive.toURI.getPath,
 resType = LocalResourceType.ARCHIVE,
 destName = Some(LOCALIZED_LIB_DIR))
-  jarsArchive.delete()
--- End diff --

Thanks @vanzin for the pointer. It was my mistake, I missed the change 
reason while looking at the history of the file.

I still see that SPARK-20741 has fixed the issue partially, it leaves 
\_\_spark_conf\_\_*.zip file to delete as part of shutdownhook.

I see these approaches to fix it further,

1. Delete \_\_spark_conf\_\_*.zip and \_\_spark_libs\_\_*.zip files after 
completing the application similar to cleanupStagingDir. 
(Or)
2. Add a configuration whether to delete \_\_spark_conf\_\_*.zip and 
\_\_spark_libs\_\_*.zip files after copying to dest dir, so that users can 
decide whether to delete these immediately or as part of process exit. In case 
of SPARK-20741, this new configuration can be enabled to delete these files 
immediately. 

@vanzin & @jerryshao Please let me know your thoughts on this or if you 
have any other way to do this. Thanks



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18945: Add option to convert nullable int columns to float colu...

2017-09-11 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/18945
  
Hey @logannc, have you had some time to work on this? I want to fix this 
issue asap. Ortherwise, would anyone here be interested in submitimg another PR 
for the another approach?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19188: [SPARK-21973][SQL] Add an new option to filter qu...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19188#discussion_r138217212
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
 ---
@@ -113,12 +114,39 @@ object TPCDSQueryBenchmark {
   "q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90",
   "q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99")
 
+val sparkConf = new SparkConf()
--- End diff --

Could we add an argument, instead of using the SQLConf? See 
https://github.com/apache/spark/pull/18592#discussion_r138217049


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19106: [SPARK-21770][ML] ProbabilisticClassificationModel fix c...

2017-09-11 Thread sethah
Github user sethah commented on the issue:

https://github.com/apache/spark/pull/19106
  
I'm confused how this issue was discovered in the first place. Did someone 
actually train an RF/DT and receive all zero probabilities? If so, shouldn't 
there be a unit test that recreates that scenario? 

Anyway, AFAICT the only way this could happen is if the RandomForest was 
trained on an empty DataFrame which couldn't happen because it would fail 
before the train method was called anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refe...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18592#discussion_r138217049
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
 ---
@@ -99,6 +95,20 @@ object TPCDSQueryBenchmark {
   }
 
   def main(args: Array[String]): Unit = {
+if (args.length < 1) {
--- End diff --

@sarutak @maropu Could we do something like 
https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala?
 

We also can use add another option for outputing the plans of TPC-DS 
queries, instead of running all the queries.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19147: [WIP][SPARK-21190][SQL][PYTHON] Vectorized UDFs i...

2017-09-11 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19147#discussion_r138215300
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/VectorizedPythonRunner.scala
 ---
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, 
DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.stream.{ArrowStreamReader, 
ArrowStreamWriter}
+
+import org.apache.spark.{SparkEnv, SparkFiles, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, 
PythonEvalType, PythonException, PythonRDD, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.{ArrowUtils, ArrowWriter}
+import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, 
ColumnarBatch, ColumnVector}
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+/**
+ * Similar to `PythonRunner`, but exchange data with Python worker via 
columnar format.
+ */
+class VectorizedPythonRunner(
+funcs: Seq[ChainedPythonFunctions],
+batchSize: Int,
+bufferSize: Int,
+reuse_worker: Boolean,
+argOffsets: Array[Array[Int]]) extends Logging {
+
+  require(funcs.length == argOffsets.length, "argOffsets should have the 
same length as funcs")
+
+  // All the Python functions should have the same exec, version and 
envvars.
+  private val envVars = funcs.head.funcs.head.envVars
+  private val pythonExec = funcs.head.funcs.head.pythonExec
+  private val pythonVer = funcs.head.funcs.head.pythonVer
+
+  // TODO: support accumulator in multiple UDF
+  private val accumulator = funcs.head.funcs.head.accumulator
+
+  // todo: return column batch?
+  def compute(
--- End diff --

I was referring to the protocol between Scala and Python that is changed 
here and could act differently under some circumstances.  Here is the behavior 
of the `PythonRunner` protocol and `VectorizedPythonRunner` protocol that you 
introduce here:

**PythonRunner**
Data blocks are framed by a special length integer.  Scala reads each data 
block one at a time and checks the length code.  If the code is a 
`PythonException`, the error is read from Python and a `SparkException` is 
thrown with that being the cause.

**VectorizedPythonRunner**
A data stream is opened in Scala with `ArrowStreamReader` and batches are 
transferred until `ArrowStreamReader` returns False indicating there is no more 
data.  Only at this point are the special length codes checked to handle an 
error from Python.

This behavior change would probably only cause problems if things are not 
working normally.  For example, what would happen if `pyarrow` was not 
installed on an executor?  With `PythonRunner` the ImportError would cause a 
`PythonException` to be transferred and thrown in Scala.  In 
`VectorizedPythonRunner` I believe the `ArrowStreamReader` would try to read 
the special length code and then fail somewhere internally to Arrow, not 
showing the ImportError.

My point was that this type of behavior change should probably be 
implemented in a separate JIRA where we could make sure to handle all of these 
cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19107: [SPARK-21799][ML] Fix `KMeans` performance regression ca...

2017-09-11 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/19107
  
@WeichenXu123 I just commented on 
https://issues.apache.org/jira/browse/SPARK-18608 to clarify our efforts here.  
Can you please either retarget this for SPARK-18608 and update it, or ask 
@zhengruifeng to submit his original PR as the fix?  Please coordinate, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r138191841
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/history/config.scala 
---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.TimeUnit
+
+import scala.annotation.meta.getter
+
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.util.kvstore.KVIndex
+
+private[spark] object config {
+
+  /** Use this to annotate constructor params to be used as KVStore 
indices. */
+  type KVIndexParam = KVIndex @getter
+
+  val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+  val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
+.stringConf
+.createWithDefault(DEFAULT_LOG_DIR)
+
+  val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge")
+.timeConf(TimeUnit.SECONDS)
+.createWithDefaultString("7d")
+
+  val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path")
--- End diff --

It'd better to document the default one is an in-memory store.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137942598
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -229,10 +254,22 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
   }
 
-  override def getListing(): Iterator[FsApplicationHistoryInfo] = 
applications.values.iterator
+  override def getListing(): Iterator[ApplicationHistoryInfo] = {
--- End diff --

The returned order is `descending`, right? This is not straightforward from 
the codes. Please add a comment


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137942907
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -316,25 +353,21 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 try {
   val newLastScanTime = getNewLastScanTime()
   logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-  val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
-.getOrElse(Seq.empty[FileStatus])
+  val statusList = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
   // scan for modified applications, replay and merge them
-  val logInfos: Seq[FileStatus] = statusList
+  val logInfos = statusList
 .filter { entry =>
-  val fileInfo = fileToAppInfo.get(entry.getPath())
-  val prevFileSize = if (fileInfo != null) fileInfo.fileSize else 
0L
   !entry.isDirectory() &&
 // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
 // reading a garbage file is safe, but we would log an error 
which can be scary to
 // the end-user.
 !entry.getPath().getName().startsWith(".") &&
-prevFileSize < entry.getLen() &&
-SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+SparkHadoopUtil.get.checkAccessPermission(entry, 
FsAction.READ) &&
+recordedFileSize(entry.getPath()) < entry.getLen()
--- End diff --

Can we add a comment to explain what `recordedFileSize(entry.getPath())` 
returns? In the original code, the variable name is self descriptive. The new 
change does not have it any more.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137940658
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/history/config.scala 
---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.TimeUnit
+
+import scala.annotation.meta.getter
+
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.util.kvstore.KVIndex
+
+private[spark] object config {
+
+  /** Use this to annotate constructor params to be used as KVStore 
indices. */
+  type KVIndexParam = KVIndex @getter
+
+  val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+  val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
+.stringConf
+.createWithDefault(DEFAULT_LOG_DIR)
+
+  val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge")
+.timeConf(TimeUnit.SECONDS)
+.createWithDefaultString("7d")
+
+  val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path")
--- End diff --

Just want to confirm it. Except this, no change on the other parameters. 
Right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137942505
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -742,53 +698,146 @@ private[history] object FsHistoryProvider {
   private val APPL_END_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationEnd\""
 
   private val LOG_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerLogStart\""
+
+  /** Current version of the data written to the listing database. */
+  private val CURRENT_LISTING_VERSION = 1L
 }
 
 /**
- * Application attempt information.
- *
- * @param logPath path to the log file, or, for a legacy log, its directory
- * @param name application name
- * @param appId application ID
- * @param attemptId optional attempt ID
- * @param startTime start time (from playback)
- * @param endTime end time (from playback). -1 if the application is 
incomplete.
- * @param lastUpdated the modification time of the log file when this 
entry was built by replaying
- *the history.
- * @param sparkUser user running the application
- * @param completed flag to indicate whether or not the application has 
completed.
- * @param fileSize the size of the log file the last time the file was 
scanned for changes
+ * A KVStoreSerializer that provides Scala types serialization too, and 
uses the same options as
+ * the API serializer.
  */
-private class FsApplicationAttemptInfo(
+private class KVStoreScalaSerializer extends KVStoreSerializer {
+
+  mapper.registerModule(DefaultScalaModule)
+  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
+  mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
+
+}
+
+private[history] case class KVStoreMetadata(
+  val version: Long,
+  val logDir: String)
+
+private[history] case class LogInfo(
+  @KVIndexParam val logPath: String,
+  val fileSize: Long)
+
+private[history] class AttemptInfoWrapper(
+val info: v1.ApplicationAttemptInfo,
 val logPath: String,
-val name: String,
-val appId: String,
-attemptId: Option[String],
-startTime: Long,
-endTime: Long,
-lastUpdated: Long,
-sparkUser: String,
-completed: Boolean,
-val fileSize: Long,
-appSparkVersion: String)
-  extends ApplicationAttemptInfo(
-  attemptId, startTime, endTime, lastUpdated, sparkUser, completed, 
appSparkVersion) {
-
-  /** extend the superclass string value with the extra attributes of this 
class */
-  override def toString: String = {
-s"FsApplicationAttemptInfo($name, $appId," +
-  s" ${super.toString}, source=$logPath, size=$fileSize"
+val fileSize: Long) {
+
+  def toAppAttemptInfo(): ApplicationAttemptInfo = {
+ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(),
+  info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser,
+  info.completed, info.appSparkVersion)
   }
+
 }
 
-/**
- * Application history information
- * @param id application ID
- * @param name application name
- * @param attempts list of attempts, most recent first.
- */
-private class FsApplicationHistoryInfo(
-id: String,
-override val name: String,
-override val attempts: List[FsApplicationAttemptInfo])
-  extends ApplicationHistoryInfo(id, name, attempts)
+private[history] class ApplicationInfoWrapper(
+val info: v1.ApplicationInfo,
+val attempts: List[AttemptInfoWrapper]) {
+
+  @JsonIgnore @KVIndexParam
+  def id: String = info.id
+
+  @JsonIgnore @KVIndexParam("endTime")
+  def endTime(): Long = attempts.head.info.endTime.getTime()
+
+  @JsonIgnore @KVIndexParam("oldestAttempt")
+  def oldestAttempt(): Long = 
attempts.map(_.info.lastUpdated.getTime()).min
+
+  def toAppHistoryInfo(): ApplicationHistoryInfo = {
+ApplicationHistoryInfo(info.id, info.name, 
attempts.map(_.toAppAttemptInfo()))
+  }
+
+  def toApiInfo(): v1.ApplicationInfo = {
+new v1.ApplicationInfo(info.id, info.name, info.coresGranted, 
info.maxCores,
+  info.coresPerExecutor, info.memoryPerExecutorMB, 
attempts.map(_.info))
+  }
+
+}
+
+private[history] class AppListingListener(log: FileStatus, clock: Clock) 
extends SparkListener {
+
+  private val app = new MutableApplicationInfo()
+  private val attempt = new MutableAttemptInfo(log.getPath().getName(), 
log.getLen())
+
+  override def onApplicationStart(event: SparkListenerApplicationStart): 
Unit = {
+app.id = event.appId.orNull
+app.name = event.appName
+
+attempt.attemptId = event.appAttemptId

[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137941077
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -117,17 +122,37 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   // used for logging msgs (logs are re-scanned based on file size, rather 
than modtime)
   private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1)
 
-  // Mapping of application IDs to their metadata, in descending end time 
order. Apps are inserted
-  // into the map in order, so the LinkedHashMap maintains the correct 
ordering.
-  @volatile private var applications: mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]
-= new mutable.LinkedHashMap()
+  private val pendingReplayTasksCount = new 
java.util.concurrent.atomic.AtomicInteger(0)
 
-  val fileToAppInfo = new ConcurrentHashMap[Path, 
FsApplicationAttemptInfo]()
+  private val storePath = conf.get(LOCAL_STORE_DIR)
 
-  // List of application logs to be deleted by event log cleaner.
-  private var attemptsToClean = new 
mutable.ListBuffer[FsApplicationAttemptInfo]
+  private val listing: KVStore = storePath.map { path =>
+val dbPath = new File(path, "listing.ldb")
 
-  private val pendingReplayTasksCount = new 
java.util.concurrent.atomic.AtomicInteger(0)
+def openDB(): LevelDB = new LevelDB(dbPath, new 
KVStoreScalaSerializer())
+
+try {
+  val db = openDB()
+  val meta = db.getMetadata(classOf[KVStoreMetadata])
+
+  if (meta == null) {
+db.setMetadata(new KVStoreMetadata(CURRENT_LISTING_VERSION, 
logDir))
+db
+  } else if (meta.version != CURRENT_LISTING_VERSION || 
!logDir.equals(meta.logDir)) {
+logInfo("Detected mismatched config in existing DB, deleting...")
+db.close()
+Utils.deleteRecursively(dbPath)
--- End diff --

If the version does not match, we delete the files?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137940633
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -117,17 +122,37 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   // used for logging msgs (logs are re-scanned based on file size, rather 
than modtime)
   private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1)
 
-  // Mapping of application IDs to their metadata, in descending end time 
order. Apps are inserted
-  // into the map in order, so the LinkedHashMap maintains the correct 
ordering.
-  @volatile private var applications: mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]
-= new mutable.LinkedHashMap()
+  private val pendingReplayTasksCount = new 
java.util.concurrent.atomic.AtomicInteger(0)
 
-  val fileToAppInfo = new ConcurrentHashMap[Path, 
FsApplicationAttemptInfo]()
+  private val storePath = conf.get(LOCAL_STORE_DIR)
--- End diff --

Need a description on `storePath` or `LOCAL_STORE_DIR`, although we have 
the one in `monitoring.md`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137941159
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -742,53 +698,146 @@ private[history] object FsHistoryProvider {
   private val APPL_END_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationEnd\""
 
   private val LOG_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerLogStart\""
+
+  /** Current version of the data written to the listing database. */
+  private val CURRENT_LISTING_VERSION = 1L
--- End diff --

I tried to find the definition and usage of `CURRENT_LISTING_VERSION `, but 
it sounds like this is not discussed. What does this really mean? When will we 
change this value? Do we have a complete story about this flag?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137942178
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -742,53 +698,146 @@ private[history] object FsHistoryProvider {
   private val APPL_END_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationEnd\""
 
   private val LOG_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerLogStart\""
+
+  /** Current version of the data written to the listing database. */
+  private val CURRENT_LISTING_VERSION = 1L
 }
 
 /**
- * Application attempt information.
- *
- * @param logPath path to the log file, or, for a legacy log, its directory
- * @param name application name
- * @param appId application ID
- * @param attemptId optional attempt ID
- * @param startTime start time (from playback)
- * @param endTime end time (from playback). -1 if the application is 
incomplete.
- * @param lastUpdated the modification time of the log file when this 
entry was built by replaying
- *the history.
- * @param sparkUser user running the application
- * @param completed flag to indicate whether or not the application has 
completed.
- * @param fileSize the size of the log file the last time the file was 
scanned for changes
+ * A KVStoreSerializer that provides Scala types serialization too, and 
uses the same options as
+ * the API serializer.
  */
-private class FsApplicationAttemptInfo(
+private class KVStoreScalaSerializer extends KVStoreSerializer {
+
+  mapper.registerModule(DefaultScalaModule)
+  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
+  mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
+
+}
+
+private[history] case class KVStoreMetadata(
+  val version: Long,
+  val logDir: String)
--- End diff --

The above two `val` are redundant.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137942487
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -422,208 +455,101 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   }
 }
 
-applications.get(appId) match {
-  case Some(appInfo) =>
-try {
-  // If no attempt is specified, or there is no attemptId for 
attempts, return all attempts
-  appInfo.attempts.filter { attempt =>
-attempt.attemptId.isEmpty || attemptId.isEmpty || 
attempt.attemptId.get == attemptId.get
-  }.foreach { attempt =>
-val logPath = new Path(logDir, attempt.logPath)
-zipFileToStream(logPath, attempt.logPath, zipStream)
-  }
-} finally {
-  zipStream.close()
+val app = try {
+  load(appId)
+} catch {
+  case _: NoSuchElementException =>
+throw new SparkException(s"Logs for $appId not found.")
+}
+
+try {
+  // If no attempt is specified, or there is no attemptId for 
attempts, return all attempts
+  attemptId
+.map { id => app.attempts.filter(_.info.attemptId == Some(id)) }
+.getOrElse(app.attempts)
+.map(_.logPath)
+.foreach { log =>
+  zipFileToStream(new Path(logDir, log), log, zipStream)
 }
-  case None => throw new SparkException(s"Logs for $appId not found.")
+} finally {
+  zipStream.close()
 }
   }
 
   /**
-   * Replay the log files in the list and merge the list of old 
applications with new ones
+   * Replay the given log file, saving the application in the listing db.
*/
   protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
-val newAttempts = try {
-  val eventsFilter: ReplayEventsFilter = { eventString =>
-eventString.startsWith(APPL_START_EVENT_PREFIX) ||
-  eventString.startsWith(APPL_END_EVENT_PREFIX) ||
-  eventString.startsWith(LOG_START_EVENT_PREFIX)
-  }
-
-  val logPath = fileStatus.getPath()
-  val appCompleted = isApplicationCompleted(fileStatus)
-
-  // Use loading time as lastUpdated since some filesystems don't 
update modifiedTime
-  // each time file is updated. However use modifiedTime for completed 
jobs so lastUpdated
-  // won't change whenever HistoryServer restarts and reloads the file.
-  val lastUpdated = if (appCompleted) fileStatus.getModificationTime 
else clock.getTimeMillis()
--- End diff --

Is this logics still the same after this PR? Do we have any test case to 
ensure it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137942186
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -742,53 +698,146 @@ private[history] object FsHistoryProvider {
   private val APPL_END_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationEnd\""
 
   private val LOG_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerLogStart\""
+
+  /** Current version of the data written to the listing database. */
+  private val CURRENT_LISTING_VERSION = 1L
 }
 
 /**
- * Application attempt information.
- *
- * @param logPath path to the log file, or, for a legacy log, its directory
- * @param name application name
- * @param appId application ID
- * @param attemptId optional attempt ID
- * @param startTime start time (from playback)
- * @param endTime end time (from playback). -1 if the application is 
incomplete.
- * @param lastUpdated the modification time of the log file when this 
entry was built by replaying
- *the history.
- * @param sparkUser user running the application
- * @param completed flag to indicate whether or not the application has 
completed.
- * @param fileSize the size of the log file the last time the file was 
scanned for changes
+ * A KVStoreSerializer that provides Scala types serialization too, and 
uses the same options as
+ * the API serializer.
  */
-private class FsApplicationAttemptInfo(
+private class KVStoreScalaSerializer extends KVStoreSerializer {
+
+  mapper.registerModule(DefaultScalaModule)
+  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
+  mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
+
+}
+
+private[history] case class KVStoreMetadata(
+  val version: Long,
+  val logDir: String)
+
+private[history] case class LogInfo(
+  @KVIndexParam val logPath: String,
+  val fileSize: Long)
--- End diff --

These two `val` are redundant too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137942548
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -229,10 +254,22 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
   }
 
-  override def getListing(): Iterator[FsApplicationHistoryInfo] = 
applications.values.iterator
+  override def getListing(): Iterator[ApplicationHistoryInfo] = {
+listing.view(classOf[ApplicationInfoWrapper])
+  .index("endTime")
+  .reverse()
+  .iterator()
+  .asScala
+  .map(_.toAppHistoryInfo)
--- End diff --

Nit: `toAppHistoryInfo()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137942697
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -301,9 +334,13 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   }
 
   override def stop(): Unit = {
-if (initThread != null && initThread.isAlive()) {
-  initThread.interrupt()
-  initThread.join()
+try {
+  if (initThread != null && initThread.isAlive()) {
+initThread.interrupt()
+initThread.join()
+  }
+} finally {
+  listing.close()
--- End diff --

What might happen if `LevelDB` is not properly closed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19118: [SPARK-21882][CORE] OutputMetrics doesn't count written ...

2017-09-11 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/19118
  
You don't have to use `saveAsHadoopFile`, just call `saveAsHadoopDataset` 
directly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15326: [SPARK-17759] [CORE] Avoid adding duplicate schedulables

2017-09-11 Thread erenavsarogullari
Github user erenavsarogullari commented on the issue:

https://github.com/apache/spark/pull/15326
  
Hi @kayousterhout,
Many thanks again for your review.
Patch is ready to re-review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19182: [SPARK-21970][Core] Fix Redundant Throws Declarations in...

2017-09-11 Thread original-brownbear
Github user original-brownbear commented on the issue:

https://github.com/apache/spark/pull/19182
  
@srowen looks like we're all green :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18982
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81647/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18982
  
**[Test build #81647 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81647/testReport)**
 for PR 18982 at commit 
[`088ee52`](https://github.com/apache/spark/commit/088ee523ca5d24327d90383aef7b500862f7b06d).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18982
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19110: [SPARK-21027][ML][PYTHON] Added tunable parallelism to o...

2017-09-11 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/19110
  
Other than that 1 item, this looks ready


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19110: [SPARK-21027][ML][PYTHON] Added tunable paralleli...

2017-09-11 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19110#discussion_r138180599
  
--- Diff: python/pyspark/ml/param/shared.py ---
@@ -608,6 +608,30 @@ def getAggregationDepth(self):
 return self.getOrDefault(self.aggregationDepth)
 
 
+class HasParallelism(Params):
+"""
+Mixin for param parallelism: number of threads to use when fitting 
models in parallel.
+"""
+
+parallelism = Param(Params._dummy(), "parallelism", "the number of 
threads to use when running parallel algorithms.", 
typeConverter=TypeConverters.toInt)
--- End diff --

nit: Is this out of date?  It's missing the "(>= 1)" from the code gen file.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19194
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81645/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19194
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19194
  
**[Test build #81645 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81645/testReport)**
 for PR 19194 at commit 
[`4281151`](https://github.com/apache/spark/commit/4281151df9010b4e9fe91e588c07e872b8e0dd69).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19193
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19193
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81644/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19193
  
**[Test build #81644 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81644/testReport)**
 for PR 19193 at commit 
[`c14aa2f`](https://github.com/apache/spark/commit/c14aa2ff6161de7d45869d91e53b0b25b18ad2dd).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...

2017-09-11 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/19130#discussion_r138154503
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -367,6 +368,52 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }.orNull
 }
 
+// Using a dummy http URI to check if HTTP(s) FileSystem is available, 
it returns true in
+// Hadoop 2.9+, otherwise it returns false.
+val isHttpFsAvailable = Try { 
FileSystem.get(Utils.resolveURI("http://foo/bar";), hadoopConf) }
+  .map(_ => true)
+  .getOrElse(false)
+// When running in YARN cluster manager, we check the configuration
+// "spark.yarn.dist.forceDownloadResources", if true we always 
download remote HTTP(s)
+// resources to local and then re-upload them to Hadoop FS, if false 
we need to check the
+// availability of HTTP(s) FileSystem to decide wether to use HTTP(s) 
FS to handle resources
+// or not.
+if (clusterManager == YARN && (sparkConf.get(FORCE_DOWNLOAD_RESOURCES) 
|| !isHttpFsAvailable)) {
--- End diff --

do we somehow want to make this configurable per scheme?  Right now its 
basically http/https, in the future would we want to possibly handle other 
filesystems that hadoop doesn't support.  Making this a settable config would 
make that easier


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18875
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81643/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18875
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18875
  
**[Test build #81643 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81643/testReport)**
 for PR 18875 at commit 
[`0f2dd18`](https://github.com/apache/spark/commit/0f2dd186b48b2b46ed1367ebae811b5767ef8598).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...

2017-09-11 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/19130#discussion_r138151499
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -367,6 +368,52 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }.orNull
 }
 
+// Using a dummy http URI to check if HTTP(s) FileSystem is available, 
it returns true in
+// Hadoop 2.9+, otherwise it returns false.
+val isHttpFsAvailable = Try { 
FileSystem.get(Utils.resolveURI("http://foo/bar";), hadoopConf) }
--- End diff --


There is a Filesystem.getFileSystemClass function we could use here instead 
of calling dummy uri


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluat...

2017-09-11 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19122#discussion_r138151063
  
--- Diff: python/pyspark/ml/tuning.py ---
@@ -193,7 +194,8 @@ class CrossValidator(Estimator, ValidatorParams, 
MLReadable, MLWritable):
 >>> lr = LogisticRegression()
 >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
 >>> evaluator = BinaryClassificationEvaluator()
->>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, 
evaluator=evaluator)
+>>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, 
evaluator=evaluator,
+... parallelism=2)
--- End diff --

Are you planning on adding a unit test to verify that parallel has the same 
results as serial?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18875
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81641/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18875
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18875
  
**[Test build #81641 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81641/testReport)**
 for PR 18875 at commit 
[`069708c`](https://github.com/apache/spark/commit/069708c7977fe5e9239ce5c42db8337307f8a3fc).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluat...

2017-09-11 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19122#discussion_r138144992
  
--- Diff: python/pyspark/ml/param/_shared_params_code_gen.py ---
@@ -152,6 +152,8 @@ def get$Name(self):
 ("varianceCol", "column name for the biased sample variance of 
prediction.",
  None, "TypeConverters.toString"),
 ("aggregationDepth", "suggested depth for treeAggregate (>= 2).", 
"2",
+ "TypeConverters.toInt"),
+("parallelism", "number of threads to use when fitting models in 
parallel (>= 1).", "1",
--- End diff --

I think the description should be more general.  Is the plan to put the 
shared param in here or in OneVsRest first?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluat...

2017-09-11 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19122#discussion_r138144361
  
--- Diff: python/pyspark/ml/tuning.py ---
@@ -255,18 +257,27 @@ def _fit(self, dataset):
 randCol = self.uid + "_rand"
 df = dataset.select("*", rand(seed).alias(randCol))
 metrics = [0.0] * numModels
+
+pool = ThreadPool(processes=min(self.getParallelism(), numModels))
+
 for i in range(nFolds):
 validateLB = i * h
 validateUB = (i + 1) * h
 condition = (df[randCol] >= validateLB) & (df[randCol] < 
validateUB)
-validation = df.filter(condition)
-train = df.filter(~condition)
-models = est.fit(train, epm)
-for j in range(numModels):
-model = models[j]
+validation = df.filter(condition).cache()
+train = df.filter(~condition).cache()
+
+def singleTrain(index):
+model = est.fit(train, epm[index])
 # TODO: duplicate evaluator to take extra params from input
-metric = eva.evaluate(model.transform(validation, epm[j]))
-metrics[j] += metric/nFolds
+metric = eva.evaluate(model.transform(validation, 
epm[index]))
+return metric
+
+currentFoldMetrics = pool.map(singleTrain, range(numModels))
--- End diff --

Could you just use `epm` as the argument in the function instead of an 
index?  e.g. ` pool.map(singleTrain, epm)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluat...

2017-09-11 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19122#discussion_r138142834
  
--- Diff: python/pyspark/ml/tuning.py ---
@@ -208,23 +210,23 @@ class CrossValidator(Estimator, ValidatorParams, 
MLReadable, MLWritable):
 
 @keyword_only
 def __init__(self, estimator=None, estimatorParamMaps=None, 
evaluator=None, numFolds=3,
- seed=None):
+ seed=None, parallelism=1):
 """
 __init__(self, estimator=None, estimatorParamMaps=None, 
evaluator=None, numFolds=3,\
- seed=None)
+ seed=None, parallelism=1)
 """
 super(CrossValidator, self).__init__()
-self._setDefault(numFolds=3)
+self._setDefault(numFolds=3, parallelism=1)
--- End diff --

Since the param is not being passed to Java, should we check that it is >=1 
here and in `setParam`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-11 Thread smurching
Github user smurching commented on the issue:

https://github.com/apache/spark/pull/19186
  
Note: This PR follows up on the work/discussions in 
[https://github.com/apache/spark/pull/17014](https://github.com/apache/spark/pull/17014)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-11 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19186#discussion_r138139729
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala 
---
@@ -300,20 +300,23 @@ class KMeans @Since("1.5.0") (
   @Since("1.5.0")
   def setSeed(value: Long): this.type = set(seed, value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setHandlePersistence(value: Boolean): this.type = 
set(handlePersistence, value)
+
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): KMeansModel = {
 transformSchema(dataset.schema, logging = true)
 
-val handlePersistence = dataset.rdd.getStorageLevel == 
StorageLevel.NONE
 val instances: RDD[OldVector] = 
dataset.select(col($(featuresCol))).rdd.map {
   case Row(point: Vector) => OldVectors.fromML(point)
 }
 
-if (handlePersistence) {
+if ($(handlePersistence)) {
--- End diff --

See comment above, we should also check that `dataset.storageLevel == 
StorageLevel.NONE`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-11 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19186#discussion_r138136774
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 ---
@@ -483,24 +488,17 @@ class LogisticRegression @Since("1.2.0") (
 this
   }
 
-  override protected[spark] def train(dataset: Dataset[_]): 
LogisticRegressionModel = {
-val handlePersistence = dataset.rdd.getStorageLevel == 
StorageLevel.NONE
-train(dataset, handlePersistence)
-  }
-
-  protected[spark] def train(
-  dataset: Dataset[_],
-  handlePersistence: Boolean): LogisticRegressionModel = {
+  protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel 
= {
 val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) 
else col($(weightCol))
 val instances: RDD[Instance] =
   dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map {
 case Row(label: Double, weight: Double, features: Vector) =>
   Instance(label, weight, features)
   }
 
-if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
+if ($(handlePersistence)) 
instances.persist(StorageLevel.MEMORY_AND_DISK)
--- End diff --

If `$(handlePersistence)` is `true`, we should still check that `dataset` 
is uncached (i.e. check that `dataset.storageLevel == StorageLevel.NONE`) 
before caching `instances`, or else we'll run into the issues described in 
[SPARK-21799](https://issues.apache.org/jira/browse/SPARK-21799)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-11 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19186#discussion_r138137893
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala ---
@@ -163,9 +165,7 @@ final class OneVsRestModel private[ml] (
 val initUDF = udf { () => Map[Int, Double]() }
 val newDataset = dataset.withColumn(accColName, initUDF())
 
-// persist if underlying dataset is not persistent.
-val handlePersistence = dataset.rdd.getStorageLevel == 
StorageLevel.NONE
-if (handlePersistence) {
+if ($(handlePersistence)) {
--- End diff --

See comment above, we should also check that `dataset.storageLevel == 
StorageLevel.NONE` before caching `newDataset`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-11 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19186#discussion_r138139091
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala 
---
@@ -82,7 +82,8 @@ private[shared] object SharedParamsCodeGen {
 "all instance weights as 1.0"),
   ParamDesc[String]("solver", "the solver algorithm for optimization", 
finalFields = false),
   ParamDesc[Int]("aggregationDepth", "suggested depth for 
treeAggregate (>= 2)", Some("2"),
-isValid = "ParamValidators.gtEq(2)", isExpertParam = true))
+isValid = "ParamValidators.gtEq(2)", isExpertParam = true),
+  ParamDesc[Boolean]("handlePersistence", "whether to handle data 
persistence", Some("true")))
--- End diff --

This description could be a bit clearer, how about "if true, will cache 
unpersisted input data before fitting estimator on it"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-11 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19186#discussion_r138140113
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala ---
@@ -165,8 +170,7 @@ class IsotonicRegression @Since("1.5.0") 
(@Since("1.5.0") override val uid: Stri
 transformSchema(dataset.schema, logging = true)
 // Extract columns from data.  If dataset is persisted, do not persist 
oldDataset.
 val instances = extractWeightedLabeledPoints(dataset)
-val handlePersistence = dataset.rdd.getStorageLevel == 
StorageLevel.NONE
-if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
+if ($(handlePersistence)) 
instances.persist(StorageLevel.MEMORY_AND_DISK)
--- End diff --

See comment above, we should also check that `dataset.storageLevel == 
StorageLevel.NONE`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-11 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19186#discussion_r138139539
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
 ---
@@ -444,13 +444,13 @@ class LogisticRegressionWithLBFGS
 lr.setFitIntercept(addIntercept)
 lr.setMaxIter(optimizer.getNumIterations())
 lr.setTol(optimizer.getConvergenceTol())
+// Determine if we should cache the DF
+lr.setHandlePersistence(input.getStorageLevel == StorageLevel.NONE)
--- End diff --

`handlePersistence` should be specified by the user rather than inferred by 
the algorithm.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19134: [SPARK-21893][BUILD][STREAMING][WIP] Put Kafka 0.8 behin...

2017-09-11 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/19134
  
OK by me, if it's all on-purpose. So, am I right that all of the Pyspark 
Kafka integration is effectively deprecated as of this change, because it 
depends on now-deprecated 0.8 support? just want to make sure I'm reading that 
right, and if so I'll try to mark it as such.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19134: [SPARK-21893][BUILD][STREAMING][WIP] Put Kafka 0.8 behin...

2017-09-11 Thread koeninger
Github user koeninger commented on the issue:

https://github.com/apache/spark/pull/19134
  
There's already a jira about why 0.10 doesn't have python support, 
https://issues-test.apache.org/jira/browse/SPARK-16534


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19106: [SPARK-21770][ML] ProbabilisticClassificationMode...

2017-09-11 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19106#discussion_r138130375
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
 ---
@@ -245,6 +245,10 @@ private[ml] object ProbabilisticClassificationModel {
 v.values(i) /= sum
 i += 1
   }
+} else {
+  var i = 0
+  val size = v.size
--- End diff --

What's the need for these two vars?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19106: [SPARK-21770][ML] ProbabilisticClassificationMode...

2017-09-11 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19106#discussion_r138135778
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
 ---
@@ -245,6 +245,10 @@ private[ml] object ProbabilisticClassificationModel {
 v.values(i) /= sum
 i += 1
   }
+} else {
+  var i = 0
+  val size = v.size
+  java.util.Arrays.fill(v.values, 1.0 / size)
--- End diff --

Import this, while you're at it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19195: [DOCS] Fix unreachable links in the document

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19195
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19195: [DOCS] Fix unreachable links in the document

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19195
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81646/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19195: [DOCS] Fix unreachable links in the document

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19195
  
**[Test build #81646 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81646/testReport)**
 for PR 19195 at commit 
[`99a4394`](https://github.com/apache/spark/commit/99a439442a708aabb2d37d1bb5cd064be74ef60e).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18982
  
**[Test build #81647 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81647/testReport)**
 for PR 18982 at commit 
[`088ee52`](https://github.com/apache/spark/commit/088ee523ca5d24327d90383aef7b500862f7b06d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...

2017-09-11 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/18982
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16158: [SPARK-18724][ML] Add TuningSummary for TrainValidationS...

2017-09-11 Thread hhbyyh
Github user hhbyyh commented on the issue:

https://github.com/apache/spark/pull/16158
  
Update:

To support pipeline estimator, change the tuning summary column name to 
include full param reference:

![image](https://user-images.githubusercontent.com/7981698/30287417-d67740ae-96d9-11e7-9e56-0300b0b96031.png)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16158: [SPARK-18724][ML] Add TuningSummary for TrainVali...

2017-09-11 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/16158#discussion_r138133273
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala ---
@@ -85,6 +86,32 @@ private[ml] trait ValidatorParams extends HasSeed with 
Params {
 instrumentation.logNamedValue("evaluator", 
$(evaluator).getClass.getCanonicalName)
 instrumentation.logNamedValue("estimatorParamMapsLength", 
$(estimatorParamMaps).length)
   }
+
+
+  /**
+   * Summary of grid search tuning in the format of DataFrame. Each row 
contains one candidate
+   * paramMap and the corresponding metric of trained model.
+   */
+  protected def getTuningSummaryDF(metrics: Array[Double]): DataFrame = {
+val params = $(estimatorParamMaps)
+require(params.nonEmpty, "estimator param maps should not be empty")
+require(params.length == metrics.length, "estimator param maps number 
should match metrics")
+val metricName = $(evaluator) match {
+  case b: BinaryClassificationEvaluator => b.getMetricName
+  case m: MulticlassClassificationEvaluator => m.getMetricName
+  case r: RegressionEvaluator => r.getMetricName
+  case _ => "metrics"
+}
+val spark = SparkSession.builder().getOrCreate()
+val sc = spark.sparkContext
+val fields = params(0).toSeq.sortBy(_.param.name).map(_.param.name) ++ 
Seq(metricName)
+val schema = new StructType(fields.map(name => StructField(name, 
StringType)).toArray)
+val rows = sc.parallelize(params.zip(metrics)).map { case (param, 
metric) =>
+  val values = param.toSeq.sortBy(_.param.name).map(_.value.toString) 
++ Seq(metric.toString)
+  Row.fromSeq(values)
+}
--- End diff --

OK



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16158: [SPARK-18724][ML] Add TuningSummary for TrainVali...

2017-09-11 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/16158#discussion_r138133238
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala ---
@@ -85,6 +86,32 @@ private[ml] trait ValidatorParams extends HasSeed with 
Params {
 instrumentation.logNamedValue("evaluator", 
$(evaluator).getClass.getCanonicalName)
 instrumentation.logNamedValue("estimatorParamMapsLength", 
$(estimatorParamMaps).length)
   }
+
+
+  /**
+   * Summary of grid search tuning in the format of DataFrame. Each row 
contains one candidate
+   * paramMap and the corresponding metric of trained model.
+   */
+  protected def getTuningSummaryDF(metrics: Array[Double]): DataFrame = {
+val params = $(estimatorParamMaps)
+require(params.nonEmpty, "estimator param maps should not be empty")
+require(params.length == metrics.length, "estimator param maps number 
should match metrics")
+val metricName = $(evaluator) match {
+  case b: BinaryClassificationEvaluator => b.getMetricName
+  case m: MulticlassClassificationEvaluator => m.getMetricName
+  case r: RegressionEvaluator => r.getMetricName
+  case _ => "metrics"
+}
+val spark = SparkSession.builder().getOrCreate()
+val sc = spark.sparkContext
+val fields = params(0).toSeq.sortBy(_.param.name).map(_.param.name) ++ 
Seq(metricName)
+val schema = new StructType(fields.map(name => StructField(name, 
StringType)).toArray)
+val rows = sc.parallelize(params.zip(metrics)).map { case (param, 
metric) =>
+  val values = param.toSeq.sortBy(_.param.name).map(_.value.toString) 
++ Seq(metric.toString)
--- End diff --

Thanks, we should support the case for custom paramMap.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...

2017-09-11 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/18887
  
> What is the migration proposal/guides?

Not sure what you mean. There's no change in behavior by default, so 
there's no migration of anything needed.

> What should users do when they hit serious bugs that we are unable to 
find at this stage?

This is always a question when you introduce a change. This particular 
project makes it a bit worse since it's a bigger change. But the answer is the 
same: file a bug, we fix it, next release has the fix; if it's really important 
for you, you can patch your own Spark, or revert to an older release, as many 
people do when they find blockers.

The changes in this project do not influence whether your application will 
fail or not; they're isolated to the UI, so at worst it will make things harder 
to debug until the issues are fixed.

On the other hand, a lot can be mitigated by testing; there's already a lot 
of test coverage for parts of this code, but the UI is kinda the exception 
there. So the longer these changes stay in review instead of being committed, 
the less coverage they'll have in people's day to day testing, actually 
increasing the risk that some bug might be missed.

> What are the external behavior changes between the new and the old ones?

None by default.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19195: [DOCS] Fix unreachable links in the document

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19195
  
**[Test build #81646 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81646/testReport)**
 for PR 19195 at commit 
[`99a4394`](https://github.com/apache/spark/commit/99a439442a708aabb2d37d1bb5cd064be74ef60e).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19107: [SPARK-21799][ML] Fix `KMeans` performance regression ca...

2017-09-11 Thread smurching
Github user smurching commented on the issue:

https://github.com/apache/spark/pull/19107
  
@jkbradley would you be able to give this a look? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19195: [DOCS] Fix unreachable links in the document

2017-09-11 Thread sarutak
GitHub user sarutak opened a pull request:

https://github.com/apache/spark/pull/19195

[DOCS] Fix unreachable links in the document

## What changes were proposed in this pull request?

Recently, I found two unreachable links in the document and fixed them.
Because of small changes related to the document, I don't file this issue 
in JIRA but please suggest I should do it if you think it's needed.

## How was this patch tested?

Tested manually.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sarutak/spark fix-unreachable-link

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19195.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19195


commit 99a439442a708aabb2d37d1bb5cd064be74ef60e
Author: Kousuke Saruta 
Date:   2017-09-11T17:00:02Z

Fixed unreachable links




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19106: [SPARK-21770][ML] ProbabilisticClassificationModel fix c...

2017-09-11 Thread smurching
Github user smurching commented on the issue:

https://github.com/apache/spark/pull/19106
  
This looks good to me! @srowen would you be able to give it another look?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   >