[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138228111 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +223,35 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = { +writeObject(writeFields( + fieldWriters = rootFieldWriters, + row = row, + schema = dataType.asInstanceOf[StructType])) + } + + + /** + * Transforms multiple `InternalRow`s or `MapData`s to JSON array using Jackson + * + * @param array The array of rows or maps to convert + */ + def write(array: ArrayData): Unit = writeArray(writeArrayData( +fieldWriter = arrElementWriter, +array = array + )) --- End diff -- Let's change this one back to: ```scala def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138225592 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteB
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138201871 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteB
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138207519 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteB
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138193555 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteB
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138225438 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteB
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138198506 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; --- End diff -- nit: double spaces --- --
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138207456 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteB
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138195259 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); --- End diff -- nit: could you add the `bufferSizeInBytes` value to the error message? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138194732 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteB
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138204793 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,292 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean isReadInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean isReadAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private final byte[] oneByte = new byte[1]; + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 && endOfStream) { + return true; +} +return false; + } + + + private void readAsync(final ByteBuffer byteBuffer) throws IOException { +stateChangeLock.lock(); +if (endOfStream || isReadInProgress) { + stateCh
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138189593 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java --- @@ -72,10 +72,15 @@ public UnsafeSorterSpillReader( bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; } +final Double readAheadFraction = +SparkEnv.get() == null ? 0.5 : + SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction", 0.5); + final InputStream bs = new NioBufferedFileInputStream(file, (int) bufferSizeBytes); try { - this.in = serializerManager.wrapStream(blockId, bs); + this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs), --- End diff -- Could you add an internal conf to disable it? It will allow the user to disable it when the new feature causes a regression. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138189733 --- Diff: core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java --- @@ -50,17 +52,16 @@ public void tearDown() { inputFile.delete(); } + --- End diff -- nit: extra empty line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138200321 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,317 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private final byte[] oneByte = new byte[1]; + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean hasRemaining() { +if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteBuffer) throws IOException { +stateCh
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138194101 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteB
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138201264 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteB
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138188910 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java --- @@ -72,10 +72,15 @@ public UnsafeSorterSpillReader( bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; } +final Double readAheadFraction = --- End diff -- nit: Double -> double --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138195292 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138200201 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteB
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138194341 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteB
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138208239 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteB
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138198579 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocal oneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; --- End diff -- why not just `return !activeBuff
[GitHub] spark pull request #19136: [SPARK-15689][SQL] data source v2
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19136#discussion_r138224219 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.reader.downward.{CatalystFilterPushDownSupport, ColumnPruningSupport, FilterPushDownSupport} + +object DataSourceV2Strategy extends Strategy { + // TODO: write path + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { +case PhysicalOperation(projects, filters, DataSourceV2Relation(output, reader)) => + val attrMap = AttributeMap(output.zip(output)) + + val projectSet = AttributeSet(projects.flatMap(_.references)) + val filterSet = AttributeSet(filters.flatMap(_.references)) + + // Match original case of attributes. + // TODO: nested fields pruning + val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap) + reader match { +case r: ColumnPruningSupport => + r.pruneColumns(requiredColumns.toStructType) +case _ => + } + + val stayUpFilters: Seq[Expression] = reader match { +case r: CatalystFilterPushDownSupport => + r.pushCatalystFilters(filters.toArray) + +case r: FilterPushDownSupport => --- End diff -- yea we can't prevent users to implement them both, and we will pick `CatalystFilterPushDownSupport` over `FilterPushDownSupport`. Let me document it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19136: [SPARK-15689][SQL] data source v2
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19136#discussion_r138224082 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader +import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport + +case class DataSourceV2Relation( +output: Seq[AttributeReference], +reader: DataSourceV2Reader) extends LeafNode { + + override def computeStats(): Statistics = reader match { +case r: StatisticsSupport => Statistics(sizeInBytes = r.getStatistics.sizeInBytes()) +case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) + } +} + +object DataSourceV2Relation { + def apply(reader: DataSourceV2Reader): DataSourceV2Relation = { +new DataSourceV2Relation(reader.readSchema().toAttributes, reader) --- End diff -- I think users can write a special `ReadTask` to do it, but we can't save memory by doing this. When an operator(the scan operator) transfers data to another operator, the data must be `UnsafeRow`s. So even users return a joined row in `ReadTask`, Spark need to convert it to `UnsafeRow`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/19185#discussion_r138220213 --- Diff: python/pyspark/ml/tests.py --- @@ -1473,11 +1473,59 @@ def test_logistic_regression_summary(self): self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame)) self.assertTrue(isinstance(s.precisionByThreshold, DataFrame)) self.assertTrue(isinstance(s.recallByThreshold, DataFrame)) + +self.assertAlmostEqual(s.accuracy, 1.0, 2) --- End diff -- also nit, but should probably add tests for all the new attributes, like `falsePositiveRateByLabel` as below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/19185#discussion_r138219915 --- Diff: python/pyspark/ml/tests.py --- @@ -1473,11 +1473,59 @@ def test_logistic_regression_summary(self): self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame)) self.assertTrue(isinstance(s.precisionByThreshold, DataFrame)) self.assertTrue(isinstance(s.recallByThreshold, DataFrame)) + +self.assertAlmostEqual(s.accuracy, 1.0, 2) +self.assertAlmostEqual(s.weightedTruePositiveRate, 1.0, 2) +self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.0, 2) +self.assertAlmostEqual(s.weightedRecall, 1.0, 2) +self.assertAlmostEqual(s.weightedPrecision, 1.0, 2) +self.assertAlmostEqual(s.weightedFMeasure(), 1.0, 2) # test evaluation (with training dataset) produces a summary with same values # one check is enough to verify a summary is returned, Scala version runs full test sameSummary = model.evaluate(df) self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC) +def test_multiclass_logistic_regression_summary(self): +df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), + (0.0, 2.0, Vectors.sparse(1, [], [])), + (2.0, 2.0, Vectors.dense(2.0)), + (2.0, 2.0, Vectors.dense(1.9))], +["label", "weight", "features"]) +lr = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight", fitIntercept=False) +model = lr.fit(df) +self.assertTrue(model.hasSummary) +s = model.summary +# test that api is callable and returns expected types +self.assertTrue(isinstance(s.predictions, DataFrame)) +self.assertEqual(s.probabilityCol, "probability") +self.assertEqual(s.labelCol, "label") +self.assertEqual(s.featuresCol, "features") +self.assertEqual(s.predictionCol, "prediction") +objHist = s.objectiveHistory +self.assertTrue(isinstance(objHist, list) and isinstance(objHist[0], float)) +self.assertGreater(s.totalIterations, 0) +self.assertTrue(isinstance(s.labels, list)) +self.assertTrue(isinstance(s.truePositiveRateByLabel, list)) +self.assertTrue(isinstance(s.falsePositiveRateByLabel, list)) +self.assertTrue(isinstance(s.precisionByLabel, list)) +self.assertTrue(isinstance(s.recallByLabel, list)) +self.assertTrue(isinstance(s.fMeasureByLabel(), list)) +self.assertAlmostEqual(s.accuracy, 0.75, 2) +self.assertAlmostEqual(s.weightedTruePositiveRate, 0.75, 2) +self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.25, 2) +self.assertAlmostEqual(s.weightedRecall, 0.75, 2) +self.assertAlmostEqual(s.weightedPrecision, 0.583, 2) +self.assertAlmostEqual(s.weightedFMeasure(), 0.65, 2) --- End diff -- maybe add `beta=1.0` to the methods that take beta as a parameter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/19185#discussion_r138220005 --- Diff: python/pyspark/ml/tests.py --- @@ -1473,11 +1473,59 @@ def test_logistic_regression_summary(self): self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame)) self.assertTrue(isinstance(s.precisionByThreshold, DataFrame)) self.assertTrue(isinstance(s.recallByThreshold, DataFrame)) + +self.assertAlmostEqual(s.accuracy, 1.0, 2) --- End diff -- care to add these to the scala unit test for binary summary as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/19185#discussion_r138220297 --- Diff: python/pyspark/ml/classification.py --- @@ -528,9 +528,11 @@ def summary(self): trained on the training set. An exception is thrown if `trainingSummary is None`. """ if self.hasSummary: -java_blrt_summary = self._call_java("summary") -# Note: Once multiclass is added, update this to return correct summary -return BinaryLogisticRegressionTrainingSummary(java_blrt_summary) +java_lrt_summary = self._call_java("summary") +if (self.numClasses <= 2): --- End diff -- nit: remove parentheses --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19188: [SPARK-21973][SQL] Add an new option to filter qu...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19188#discussion_r138221684 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala --- @@ -113,12 +114,39 @@ object TPCDSQueryBenchmark { "q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90", "q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99") +val sparkConf = new SparkConf() --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refe...
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/18592#discussion_r138220942 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala --- @@ -99,6 +95,20 @@ object TPCDSQueryBenchmark { } def main(args: Array[String]): Unit = { +if (args.length < 1) { --- End diff -- Good idea. I'll add `TPCDSQueryBenchmarkArguments`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19141: [SPARK-21384] [YARN] Spark + YARN fails with Loca...
Github user devaraj-kavali commented on a diff in the pull request: https://github.com/apache/spark/pull/19141#discussion_r138219530 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -565,7 +565,6 @@ private[spark] class Client( distribute(jarsArchive.toURI.getPath, resType = LocalResourceType.ARCHIVE, destName = Some(LOCALIZED_LIB_DIR)) - jarsArchive.delete() --- End diff -- Thanks @vanzin for the pointer. It was my mistake, I missed the change reason while looking at the history of the file. I still see that SPARK-20741 has fixed the issue partially, it leaves \_\_spark_conf\_\_*.zip file to delete as part of shutdownhook. I see these approaches to fix it further, 1. Delete \_\_spark_conf\_\_*.zip and \_\_spark_libs\_\_*.zip files after completing the application similar to cleanupStagingDir. (Or) 2. Add a configuration whether to delete \_\_spark_conf\_\_*.zip and \_\_spark_libs\_\_*.zip files after copying to dest dir, so that users can decide whether to delete these immediately or as part of process exit. In case of SPARK-20741, this new configuration can be enabled to delete these files immediately. @vanzin & @jerryshao Please let me know your thoughts on this or if you have any other way to do this. Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18945: Add option to convert nullable int columns to float colu...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/18945 Hey @logannc, have you had some time to work on this? I want to fix this issue asap. Ortherwise, would anyone here be interested in submitimg another PR for the another approach? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19188: [SPARK-21973][SQL] Add an new option to filter qu...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19188#discussion_r138217212 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala --- @@ -113,12 +114,39 @@ object TPCDSQueryBenchmark { "q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90", "q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99") +val sparkConf = new SparkConf() --- End diff -- Could we add an argument, instead of using the SQLConf? See https://github.com/apache/spark/pull/18592#discussion_r138217049 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19106: [SPARK-21770][ML] ProbabilisticClassificationModel fix c...
Github user sethah commented on the issue: https://github.com/apache/spark/pull/19106 I'm confused how this issue was discovered in the first place. Did someone actually train an RF/DT and receive all zero probabilities? If so, shouldn't there be a unit test that recreates that scenario? Anyway, AFAICT the only way this could happen is if the RandomForest was trained on an empty DataFrame which couldn't happen because it would fail before the train method was called anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refe...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18592#discussion_r138217049 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala --- @@ -99,6 +95,20 @@ object TPCDSQueryBenchmark { } def main(args: Array[String]): Unit = { +if (args.length < 1) { --- End diff -- @sarutak @maropu Could we do something like https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala? We also can use add another option for outputing the plans of TPC-DS queries, instead of running all the queries. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19147: [WIP][SPARK-21190][SQL][PYTHON] Vectorized UDFs i...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19147#discussion_r138215300 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/VectorizedPythonRunner.scala --- @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} +import java.net.Socket +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.stream.{ArrowStreamReader, ArrowStreamWriter} + +import org.apache.spark.{SparkEnv, SparkFiles, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonException, PythonRDD, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.arrow.{ArrowUtils, ArrowWriter} +import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +/** + * Similar to `PythonRunner`, but exchange data with Python worker via columnar format. + */ +class VectorizedPythonRunner( +funcs: Seq[ChainedPythonFunctions], +batchSize: Int, +bufferSize: Int, +reuse_worker: Boolean, +argOffsets: Array[Array[Int]]) extends Logging { + + require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") + + // All the Python functions should have the same exec, version and envvars. + private val envVars = funcs.head.funcs.head.envVars + private val pythonExec = funcs.head.funcs.head.pythonExec + private val pythonVer = funcs.head.funcs.head.pythonVer + + // TODO: support accumulator in multiple UDF + private val accumulator = funcs.head.funcs.head.accumulator + + // todo: return column batch? + def compute( --- End diff -- I was referring to the protocol between Scala and Python that is changed here and could act differently under some circumstances. Here is the behavior of the `PythonRunner` protocol and `VectorizedPythonRunner` protocol that you introduce here: **PythonRunner** Data blocks are framed by a special length integer. Scala reads each data block one at a time and checks the length code. If the code is a `PythonException`, the error is read from Python and a `SparkException` is thrown with that being the cause. **VectorizedPythonRunner** A data stream is opened in Scala with `ArrowStreamReader` and batches are transferred until `ArrowStreamReader` returns False indicating there is no more data. Only at this point are the special length codes checked to handle an error from Python. This behavior change would probably only cause problems if things are not working normally. For example, what would happen if `pyarrow` was not installed on an executor? With `PythonRunner` the ImportError would cause a `PythonException` to be transferred and thrown in Scala. In `VectorizedPythonRunner` I believe the `ArrowStreamReader` would try to read the special length code and then fail somewhere internally to Arrow, not showing the ImportError. My point was that this type of behavior change should probably be implemented in a separate JIRA where we could make sure to handle all of these cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19107: [SPARK-21799][ML] Fix `KMeans` performance regression ca...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/19107 @WeichenXu123 I just commented on https://issues.apache.org/jira/browse/SPARK-18608 to clarify our efforts here. Can you please either retarget this for SPARK-18608 and update it, or ask @zhengruifeng to submit his original PR as the fix? Please coordinate, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r138191841 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/config.scala --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.concurrent.TimeUnit + +import scala.annotation.meta.getter + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.util.kvstore.KVIndex + +private[spark] object config { + + /** Use this to annotate constructor params to be used as KVStore indices. */ + type KVIndexParam = KVIndex @getter + + val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") +.stringConf +.createWithDefault(DEFAULT_LOG_DIR) + + val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge") +.timeConf(TimeUnit.SECONDS) +.createWithDefaultString("7d") + + val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path") --- End diff -- It'd better to document the default one is an in-memory store. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137942598 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -229,10 +254,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getListing(): Iterator[FsApplicationHistoryInfo] = applications.values.iterator + override def getListing(): Iterator[ApplicationHistoryInfo] = { --- End diff -- The returned order is `descending`, right? This is not straightforward from the codes. Please add a comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137942907 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -316,25 +353,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val newLastScanTime = getNewLastScanTime() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") - val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) -.getOrElse(Seq.empty[FileStatus]) + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) // scan for modified applications, replay and merge them - val logInfos: Seq[FileStatus] = statusList + val logInfos = statusList .filter { entry => - val fileInfo = fileToAppInfo.get(entry.getPath()) - val prevFileSize = if (fileInfo != null) fileInfo.fileSize else 0L !entry.isDirectory() && // FsHistoryProvider generates a hidden file which can't be read. Accidentally // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && -prevFileSize < entry.getLen() && -SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) +SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) && +recordedFileSize(entry.getPath()) < entry.getLen() --- End diff -- Can we add a comment to explain what `recordedFileSize(entry.getPath())` returns? In the original code, the variable name is self descriptive. The new change does not have it any more. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137940658 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/config.scala --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.concurrent.TimeUnit + +import scala.annotation.meta.getter + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.util.kvstore.KVIndex + +private[spark] object config { + + /** Use this to annotate constructor params to be used as KVStore indices. */ + type KVIndexParam = KVIndex @getter + + val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") +.stringConf +.createWithDefault(DEFAULT_LOG_DIR) + + val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge") +.timeConf(TimeUnit.SECONDS) +.createWithDefaultString("7d") + + val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path") --- End diff -- Just want to confirm it. Except this, no change on the other parameters. Right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137942505 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -742,53 +698,146 @@ private[history] object FsHistoryProvider { private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" + + /** Current version of the data written to the listing database. */ + private val CURRENT_LISTING_VERSION = 1L } /** - * Application attempt information. - * - * @param logPath path to the log file, or, for a legacy log, its directory - * @param name application name - * @param appId application ID - * @param attemptId optional attempt ID - * @param startTime start time (from playback) - * @param endTime end time (from playback). -1 if the application is incomplete. - * @param lastUpdated the modification time of the log file when this entry was built by replaying - *the history. - * @param sparkUser user running the application - * @param completed flag to indicate whether or not the application has completed. - * @param fileSize the size of the log file the last time the file was scanned for changes + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. */ -private class FsApplicationAttemptInfo( +private class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat) + +} + +private[history] case class KVStoreMetadata( + val version: Long, + val logDir: String) + +private[history] case class LogInfo( + @KVIndexParam val logPath: String, + val fileSize: Long) + +private[history] class AttemptInfoWrapper( +val info: v1.ApplicationAttemptInfo, val logPath: String, -val name: String, -val appId: String, -attemptId: Option[String], -startTime: Long, -endTime: Long, -lastUpdated: Long, -sparkUser: String, -completed: Boolean, -val fileSize: Long, -appSparkVersion: String) - extends ApplicationAttemptInfo( - attemptId, startTime, endTime, lastUpdated, sparkUser, completed, appSparkVersion) { - - /** extend the superclass string value with the extra attributes of this class */ - override def toString: String = { -s"FsApplicationAttemptInfo($name, $appId," + - s" ${super.toString}, source=$logPath, size=$fileSize" +val fileSize: Long) { + + def toAppAttemptInfo(): ApplicationAttemptInfo = { +ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(), + info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser, + info.completed, info.appSparkVersion) } + } -/** - * Application history information - * @param id application ID - * @param name application name - * @param attempts list of attempts, most recent first. - */ -private class FsApplicationHistoryInfo( -id: String, -override val name: String, -override val attempts: List[FsApplicationAttemptInfo]) - extends ApplicationHistoryInfo(id, name, attempts) +private[history] class ApplicationInfoWrapper( +val info: v1.ApplicationInfo, +val attempts: List[AttemptInfoWrapper]) { + + @JsonIgnore @KVIndexParam + def id: String = info.id + + @JsonIgnore @KVIndexParam("endTime") + def endTime(): Long = attempts.head.info.endTime.getTime() + + @JsonIgnore @KVIndexParam("oldestAttempt") + def oldestAttempt(): Long = attempts.map(_.info.lastUpdated.getTime()).min + + def toAppHistoryInfo(): ApplicationHistoryInfo = { +ApplicationHistoryInfo(info.id, info.name, attempts.map(_.toAppAttemptInfo())) + } + + def toApiInfo(): v1.ApplicationInfo = { +new v1.ApplicationInfo(info.id, info.name, info.coresGranted, info.maxCores, + info.coresPerExecutor, info.memoryPerExecutorMB, attempts.map(_.info)) + } + +} + +private[history] class AppListingListener(log: FileStatus, clock: Clock) extends SparkListener { + + private val app = new MutableApplicationInfo() + private val attempt = new MutableAttemptInfo(log.getPath().getName(), log.getLen()) + + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { +app.id = event.appId.orNull +app.name = event.appName + +attempt.attemptId = event.appAttemptId
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137941077 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -117,17 +122,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // used for logging msgs (logs are re-scanned based on file size, rather than modtime) private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1) - // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted - // into the map in order, so the LinkedHashMap maintains the correct ordering. - @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] -= new mutable.LinkedHashMap() + private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) - val fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]() + private val storePath = conf.get(LOCAL_STORE_DIR) - // List of application logs to be deleted by event log cleaner. - private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] + private val listing: KVStore = storePath.map { path => +val dbPath = new File(path, "listing.ldb") - private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) +def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer()) + +try { + val db = openDB() + val meta = db.getMetadata(classOf[KVStoreMetadata]) + + if (meta == null) { +db.setMetadata(new KVStoreMetadata(CURRENT_LISTING_VERSION, logDir)) +db + } else if (meta.version != CURRENT_LISTING_VERSION || !logDir.equals(meta.logDir)) { +logInfo("Detected mismatched config in existing DB, deleting...") +db.close() +Utils.deleteRecursively(dbPath) --- End diff -- If the version does not match, we delete the files? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137940633 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -117,17 +122,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // used for logging msgs (logs are re-scanned based on file size, rather than modtime) private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1) - // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted - // into the map in order, so the LinkedHashMap maintains the correct ordering. - @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] -= new mutable.LinkedHashMap() + private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) - val fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]() + private val storePath = conf.get(LOCAL_STORE_DIR) --- End diff -- Need a description on `storePath` or `LOCAL_STORE_DIR`, although we have the one in `monitoring.md` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137941159 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -742,53 +698,146 @@ private[history] object FsHistoryProvider { private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" + + /** Current version of the data written to the listing database. */ + private val CURRENT_LISTING_VERSION = 1L --- End diff -- I tried to find the definition and usage of `CURRENT_LISTING_VERSION `, but it sounds like this is not discussed. What does this really mean? When will we change this value? Do we have a complete story about this flag? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137942178 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -742,53 +698,146 @@ private[history] object FsHistoryProvider { private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" + + /** Current version of the data written to the listing database. */ + private val CURRENT_LISTING_VERSION = 1L } /** - * Application attempt information. - * - * @param logPath path to the log file, or, for a legacy log, its directory - * @param name application name - * @param appId application ID - * @param attemptId optional attempt ID - * @param startTime start time (from playback) - * @param endTime end time (from playback). -1 if the application is incomplete. - * @param lastUpdated the modification time of the log file when this entry was built by replaying - *the history. - * @param sparkUser user running the application - * @param completed flag to indicate whether or not the application has completed. - * @param fileSize the size of the log file the last time the file was scanned for changes + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. */ -private class FsApplicationAttemptInfo( +private class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat) + +} + +private[history] case class KVStoreMetadata( + val version: Long, + val logDir: String) --- End diff -- The above two `val` are redundant. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137942487 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -422,208 +455,101 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } -applications.get(appId) match { - case Some(appInfo) => -try { - // If no attempt is specified, or there is no attemptId for attempts, return all attempts - appInfo.attempts.filter { attempt => -attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get - }.foreach { attempt => -val logPath = new Path(logDir, attempt.logPath) -zipFileToStream(logPath, attempt.logPath, zipStream) - } -} finally { - zipStream.close() +val app = try { + load(appId) +} catch { + case _: NoSuchElementException => +throw new SparkException(s"Logs for $appId not found.") +} + +try { + // If no attempt is specified, or there is no attemptId for attempts, return all attempts + attemptId +.map { id => app.attempts.filter(_.info.attemptId == Some(id)) } +.getOrElse(app.attempts) +.map(_.logPath) +.foreach { log => + zipFileToStream(new Path(logDir, log), log, zipStream) } - case None => throw new SparkException(s"Logs for $appId not found.") +} finally { + zipStream.close() } } /** - * Replay the log files in the list and merge the list of old applications with new ones + * Replay the given log file, saving the application in the listing db. */ protected def mergeApplicationListing(fileStatus: FileStatus): Unit = { -val newAttempts = try { - val eventsFilter: ReplayEventsFilter = { eventString => -eventString.startsWith(APPL_START_EVENT_PREFIX) || - eventString.startsWith(APPL_END_EVENT_PREFIX) || - eventString.startsWith(LOG_START_EVENT_PREFIX) - } - - val logPath = fileStatus.getPath() - val appCompleted = isApplicationCompleted(fileStatus) - - // Use loading time as lastUpdated since some filesystems don't update modifiedTime - // each time file is updated. However use modifiedTime for completed jobs so lastUpdated - // won't change whenever HistoryServer restarts and reloads the file. - val lastUpdated = if (appCompleted) fileStatus.getModificationTime else clock.getTimeMillis() --- End diff -- Is this logics still the same after this PR? Do we have any test case to ensure it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137942186 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -742,53 +698,146 @@ private[history] object FsHistoryProvider { private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" + + /** Current version of the data written to the listing database. */ + private val CURRENT_LISTING_VERSION = 1L } /** - * Application attempt information. - * - * @param logPath path to the log file, or, for a legacy log, its directory - * @param name application name - * @param appId application ID - * @param attemptId optional attempt ID - * @param startTime start time (from playback) - * @param endTime end time (from playback). -1 if the application is incomplete. - * @param lastUpdated the modification time of the log file when this entry was built by replaying - *the history. - * @param sparkUser user running the application - * @param completed flag to indicate whether or not the application has completed. - * @param fileSize the size of the log file the last time the file was scanned for changes + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. */ -private class FsApplicationAttemptInfo( +private class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat) + +} + +private[history] case class KVStoreMetadata( + val version: Long, + val logDir: String) + +private[history] case class LogInfo( + @KVIndexParam val logPath: String, + val fileSize: Long) --- End diff -- These two `val` are redundant too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137942548 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -229,10 +254,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getListing(): Iterator[FsApplicationHistoryInfo] = applications.values.iterator + override def getListing(): Iterator[ApplicationHistoryInfo] = { +listing.view(classOf[ApplicationInfoWrapper]) + .index("endTime") + .reverse() + .iterator() + .asScala + .map(_.toAppHistoryInfo) --- End diff -- Nit: `toAppHistoryInfo()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137942697 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -301,9 +334,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } override def stop(): Unit = { -if (initThread != null && initThread.isAlive()) { - initThread.interrupt() - initThread.join() +try { + if (initThread != null && initThread.isAlive()) { +initThread.interrupt() +initThread.join() + } +} finally { + listing.close() --- End diff -- What might happen if `LevelDB` is not properly closed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19118: [SPARK-21882][CORE] OutputMetrics doesn't count written ...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/19118 You don't have to use `saveAsHadoopFile`, just call `saveAsHadoopDataset` directly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15326: [SPARK-17759] [CORE] Avoid adding duplicate schedulables
Github user erenavsarogullari commented on the issue: https://github.com/apache/spark/pull/15326 Hi @kayousterhout, Many thanks again for your review. Patch is ready to re-review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19182: [SPARK-21970][Core] Fix Redundant Throws Declarations in...
Github user original-brownbear commented on the issue: https://github.com/apache/spark/pull/19182 @srowen looks like we're all green :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18982 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81647/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18982 **[Test build #81647 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81647/testReport)** for PR 18982 at commit [`088ee52`](https://github.com/apache/spark/commit/088ee523ca5d24327d90383aef7b500862f7b06d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18982 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19110: [SPARK-21027][ML][PYTHON] Added tunable parallelism to o...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/19110 Other than that 1 item, this looks ready --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19110: [SPARK-21027][ML][PYTHON] Added tunable paralleli...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19110#discussion_r138180599 --- Diff: python/pyspark/ml/param/shared.py --- @@ -608,6 +608,30 @@ def getAggregationDepth(self): return self.getOrDefault(self.aggregationDepth) +class HasParallelism(Params): +""" +Mixin for param parallelism: number of threads to use when fitting models in parallel. +""" + +parallelism = Param(Params._dummy(), "parallelism", "the number of threads to use when running parallel algorithms.", typeConverter=TypeConverters.toInt) --- End diff -- nit: Is this out of date? It's missing the "(>= 1)" from the code gen file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19194 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81645/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19194 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19194 **[Test build #81645 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81645/testReport)** for PR 19194 at commit [`4281151`](https://github.com/apache/spark/commit/4281151df9010b4e9fe91e588c07e872b8e0dd69). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19193 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19193 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81644/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19193 **[Test build #81644 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81644/testReport)** for PR 19193 at commit [`c14aa2f`](https://github.com/apache/spark/commit/c14aa2ff6161de7d45869d91e53b0b25b18ad2dd). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138154503 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,52 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// Using a dummy http URI to check if HTTP(s) FileSystem is available, it returns true in +// Hadoop 2.9+, otherwise it returns false. +val isHttpFsAvailable = Try { FileSystem.get(Utils.resolveURI("http://foo/bar";), hadoopConf) } + .map(_ => true) + .getOrElse(false) +// When running in YARN cluster manager, we check the configuration +// "spark.yarn.dist.forceDownloadResources", if true we always download remote HTTP(s) +// resources to local and then re-upload them to Hadoop FS, if false we need to check the +// availability of HTTP(s) FileSystem to decide wether to use HTTP(s) FS to handle resources +// or not. +if (clusterManager == YARN && (sparkConf.get(FORCE_DOWNLOAD_RESOURCES) || !isHttpFsAvailable)) { --- End diff -- do we somehow want to make this configurable per scheme? Right now its basically http/https, in the future would we want to possibly handle other filesystems that hadoop doesn't support. Making this a settable config would make that easier --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18875 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81643/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18875 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18875 **[Test build #81643 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81643/testReport)** for PR 18875 at commit [`0f2dd18`](https://github.com/apache/spark/commit/0f2dd186b48b2b46ed1367ebae811b5767ef8598). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138151499 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,52 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// Using a dummy http URI to check if HTTP(s) FileSystem is available, it returns true in +// Hadoop 2.9+, otherwise it returns false. +val isHttpFsAvailable = Try { FileSystem.get(Utils.resolveURI("http://foo/bar";), hadoopConf) } --- End diff -- There is a Filesystem.getFileSystemClass function we could use here instead of calling dummy uri --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluat...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19122#discussion_r138151063 --- Diff: python/pyspark/ml/tuning.py --- @@ -193,7 +194,8 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable): >>> lr = LogisticRegression() >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() >>> evaluator = BinaryClassificationEvaluator() ->>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) +>>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, +... parallelism=2) --- End diff -- Are you planning on adding a unit test to verify that parallel has the same results as serial? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18875 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81641/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18875 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18875 **[Test build #81641 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81641/testReport)** for PR 18875 at commit [`069708c`](https://github.com/apache/spark/commit/069708c7977fe5e9239ce5c42db8337307f8a3fc). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluat...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19122#discussion_r138144992 --- Diff: python/pyspark/ml/param/_shared_params_code_gen.py --- @@ -152,6 +152,8 @@ def get$Name(self): ("varianceCol", "column name for the biased sample variance of prediction.", None, "TypeConverters.toString"), ("aggregationDepth", "suggested depth for treeAggregate (>= 2).", "2", + "TypeConverters.toInt"), +("parallelism", "number of threads to use when fitting models in parallel (>= 1).", "1", --- End diff -- I think the description should be more general. Is the plan to put the shared param in here or in OneVsRest first? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluat...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19122#discussion_r138144361 --- Diff: python/pyspark/ml/tuning.py --- @@ -255,18 +257,27 @@ def _fit(self, dataset): randCol = self.uid + "_rand" df = dataset.select("*", rand(seed).alias(randCol)) metrics = [0.0] * numModels + +pool = ThreadPool(processes=min(self.getParallelism(), numModels)) + for i in range(nFolds): validateLB = i * h validateUB = (i + 1) * h condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB) -validation = df.filter(condition) -train = df.filter(~condition) -models = est.fit(train, epm) -for j in range(numModels): -model = models[j] +validation = df.filter(condition).cache() +train = df.filter(~condition).cache() + +def singleTrain(index): +model = est.fit(train, epm[index]) # TODO: duplicate evaluator to take extra params from input -metric = eva.evaluate(model.transform(validation, epm[j])) -metrics[j] += metric/nFolds +metric = eva.evaluate(model.transform(validation, epm[index])) +return metric + +currentFoldMetrics = pool.map(singleTrain, range(numModels)) --- End diff -- Could you just use `epm` as the argument in the function instead of an index? e.g. ` pool.map(singleTrain, epm)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluat...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19122#discussion_r138142834 --- Diff: python/pyspark/ml/tuning.py --- @@ -208,23 +210,23 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable): @keyword_only def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3, - seed=None): + seed=None, parallelism=1): """ __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ - seed=None) + seed=None, parallelism=1) """ super(CrossValidator, self).__init__() -self._setDefault(numFolds=3) +self._setDefault(numFolds=3, parallelism=1) --- End diff -- Since the param is not being passed to Java, should we check that it is >=1 here and in `setParam`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19186: [SPARK-21972][ML] Add param handlePersistence
Github user smurching commented on the issue: https://github.com/apache/spark/pull/19186 Note: This PR follows up on the work/discussions in [https://github.com/apache/spark/pull/17014](https://github.com/apache/spark/pull/17014) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19186#discussion_r138139729 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala --- @@ -300,20 +300,23 @@ class KMeans @Since("1.5.0") ( @Since("1.5.0") def setSeed(value: Long): this.type = set(seed, value) + /** @group setParam */ + @Since("2.3.0") + def setHandlePersistence(value: Boolean): this.type = set(handlePersistence, value) + @Since("2.0.0") override def fit(dataset: Dataset[_]): KMeansModel = { transformSchema(dataset.schema, logging = true) -val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE val instances: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } -if (handlePersistence) { +if ($(handlePersistence)) { --- End diff -- See comment above, we should also check that `dataset.storageLevel == StorageLevel.NONE` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19186#discussion_r138136774 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala --- @@ -483,24 +488,17 @@ class LogisticRegression @Since("1.2.0") ( this } - override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { -val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE -train(dataset, handlePersistence) - } - - protected[spark] def train( - dataset: Dataset[_], - handlePersistence: Boolean): LogisticRegressionModel = { + protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map { case Row(label: Double, weight: Double, features: Vector) => Instance(label, weight, features) } -if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) +if ($(handlePersistence)) instances.persist(StorageLevel.MEMORY_AND_DISK) --- End diff -- If `$(handlePersistence)` is `true`, we should still check that `dataset` is uncached (i.e. check that `dataset.storageLevel == StorageLevel.NONE`) before caching `instances`, or else we'll run into the issues described in [SPARK-21799](https://issues.apache.org/jira/browse/SPARK-21799) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19186#discussion_r138137893 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala --- @@ -163,9 +165,7 @@ final class OneVsRestModel private[ml] ( val initUDF = udf { () => Map[Int, Double]() } val newDataset = dataset.withColumn(accColName, initUDF()) -// persist if underlying dataset is not persistent. -val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE -if (handlePersistence) { +if ($(handlePersistence)) { --- End diff -- See comment above, we should also check that `dataset.storageLevel == StorageLevel.NONE` before caching `newDataset` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19186#discussion_r138139091 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala --- @@ -82,7 +82,8 @@ private[shared] object SharedParamsCodeGen { "all instance weights as 1.0"), ParamDesc[String]("solver", "the solver algorithm for optimization", finalFields = false), ParamDesc[Int]("aggregationDepth", "suggested depth for treeAggregate (>= 2)", Some("2"), -isValid = "ParamValidators.gtEq(2)", isExpertParam = true)) +isValid = "ParamValidators.gtEq(2)", isExpertParam = true), + ParamDesc[Boolean]("handlePersistence", "whether to handle data persistence", Some("true"))) --- End diff -- This description could be a bit clearer, how about "if true, will cache unpersisted input data before fitting estimator on it"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19186#discussion_r138140113 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala --- @@ -165,8 +170,7 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri transformSchema(dataset.schema, logging = true) // Extract columns from data. If dataset is persisted, do not persist oldDataset. val instances = extractWeightedLabeledPoints(dataset) -val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE -if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) +if ($(handlePersistence)) instances.persist(StorageLevel.MEMORY_AND_DISK) --- End diff -- See comment above, we should also check that `dataset.storageLevel == StorageLevel.NONE` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19186#discussion_r138139539 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala --- @@ -444,13 +444,13 @@ class LogisticRegressionWithLBFGS lr.setFitIntercept(addIntercept) lr.setMaxIter(optimizer.getNumIterations()) lr.setTol(optimizer.getConvergenceTol()) +// Determine if we should cache the DF +lr.setHandlePersistence(input.getStorageLevel == StorageLevel.NONE) --- End diff -- `handlePersistence` should be specified by the user rather than inferred by the algorithm. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19134: [SPARK-21893][BUILD][STREAMING][WIP] Put Kafka 0.8 behin...
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19134 OK by me, if it's all on-purpose. So, am I right that all of the Pyspark Kafka integration is effectively deprecated as of this change, because it depends on now-deprecated 0.8 support? just want to make sure I'm reading that right, and if so I'll try to mark it as such. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19134: [SPARK-21893][BUILD][STREAMING][WIP] Put Kafka 0.8 behin...
Github user koeninger commented on the issue: https://github.com/apache/spark/pull/19134 There's already a jira about why 0.10 doesn't have python support, https://issues-test.apache.org/jira/browse/SPARK-16534 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19106: [SPARK-21770][ML] ProbabilisticClassificationMode...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19106#discussion_r138130375 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala --- @@ -245,6 +245,10 @@ private[ml] object ProbabilisticClassificationModel { v.values(i) /= sum i += 1 } +} else { + var i = 0 + val size = v.size --- End diff -- What's the need for these two vars? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19106: [SPARK-21770][ML] ProbabilisticClassificationMode...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19106#discussion_r138135778 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala --- @@ -245,6 +245,10 @@ private[ml] object ProbabilisticClassificationModel { v.values(i) /= sum i += 1 } +} else { + var i = 0 + val size = v.size + java.util.Arrays.fill(v.values, 1.0 / size) --- End diff -- Import this, while you're at it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19195: [DOCS] Fix unreachable links in the document
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19195 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19195: [DOCS] Fix unreachable links in the document
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19195 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81646/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19195: [DOCS] Fix unreachable links in the document
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19195 **[Test build #81646 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81646/testReport)** for PR 19195 at commit [`99a4394`](https://github.com/apache/spark/commit/99a439442a708aabb2d37d1bb5cd064be74ef60e). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18982 **[Test build #81647 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81647/testReport)** for PR 18982 at commit [`088ee52`](https://github.com/apache/spark/commit/088ee523ca5d24327d90383aef7b500862f7b06d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18982: [SPARK-21685][PYTHON][ML] PySpark Params isSet state sho...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/18982 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16158: [SPARK-18724][ML] Add TuningSummary for TrainValidationS...
Github user hhbyyh commented on the issue: https://github.com/apache/spark/pull/16158 Update: To support pipeline estimator, change the tuning summary column name to include full param reference: ![image](https://user-images.githubusercontent.com/7981698/30287417-d67740ae-96d9-11e7-9e56-0300b0b96031.png) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16158: [SPARK-18724][ML] Add TuningSummary for TrainVali...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/16158#discussion_r138133273 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala --- @@ -85,6 +86,32 @@ private[ml] trait ValidatorParams extends HasSeed with Params { instrumentation.logNamedValue("evaluator", $(evaluator).getClass.getCanonicalName) instrumentation.logNamedValue("estimatorParamMapsLength", $(estimatorParamMaps).length) } + + + /** + * Summary of grid search tuning in the format of DataFrame. Each row contains one candidate + * paramMap and the corresponding metric of trained model. + */ + protected def getTuningSummaryDF(metrics: Array[Double]): DataFrame = { +val params = $(estimatorParamMaps) +require(params.nonEmpty, "estimator param maps should not be empty") +require(params.length == metrics.length, "estimator param maps number should match metrics") +val metricName = $(evaluator) match { + case b: BinaryClassificationEvaluator => b.getMetricName + case m: MulticlassClassificationEvaluator => m.getMetricName + case r: RegressionEvaluator => r.getMetricName + case _ => "metrics" +} +val spark = SparkSession.builder().getOrCreate() +val sc = spark.sparkContext +val fields = params(0).toSeq.sortBy(_.param.name).map(_.param.name) ++ Seq(metricName) +val schema = new StructType(fields.map(name => StructField(name, StringType)).toArray) +val rows = sc.parallelize(params.zip(metrics)).map { case (param, metric) => + val values = param.toSeq.sortBy(_.param.name).map(_.value.toString) ++ Seq(metric.toString) + Row.fromSeq(values) +} --- End diff -- OK --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16158: [SPARK-18724][ML] Add TuningSummary for TrainVali...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/16158#discussion_r138133238 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala --- @@ -85,6 +86,32 @@ private[ml] trait ValidatorParams extends HasSeed with Params { instrumentation.logNamedValue("evaluator", $(evaluator).getClass.getCanonicalName) instrumentation.logNamedValue("estimatorParamMapsLength", $(estimatorParamMaps).length) } + + + /** + * Summary of grid search tuning in the format of DataFrame. Each row contains one candidate + * paramMap and the corresponding metric of trained model. + */ + protected def getTuningSummaryDF(metrics: Array[Double]): DataFrame = { +val params = $(estimatorParamMaps) +require(params.nonEmpty, "estimator param maps should not be empty") +require(params.length == metrics.length, "estimator param maps number should match metrics") +val metricName = $(evaluator) match { + case b: BinaryClassificationEvaluator => b.getMetricName + case m: MulticlassClassificationEvaluator => m.getMetricName + case r: RegressionEvaluator => r.getMetricName + case _ => "metrics" +} +val spark = SparkSession.builder().getOrCreate() +val sc = spark.sparkContext +val fields = params(0).toSeq.sortBy(_.param.name).map(_.param.name) ++ Seq(metricName) +val schema = new StructType(fields.map(name => StructField(name, StringType)).toArray) +val rows = sc.parallelize(params.zip(metrics)).map { case (param, metric) => + val values = param.toSeq.sortBy(_.param.name).map(_.value.toString) ++ Seq(metric.toString) --- End diff -- Thanks, we should support the case for custom paramMap. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/18887 > What is the migration proposal/guides? Not sure what you mean. There's no change in behavior by default, so there's no migration of anything needed. > What should users do when they hit serious bugs that we are unable to find at this stage? This is always a question when you introduce a change. This particular project makes it a bit worse since it's a bigger change. But the answer is the same: file a bug, we fix it, next release has the fix; if it's really important for you, you can patch your own Spark, or revert to an older release, as many people do when they find blockers. The changes in this project do not influence whether your application will fail or not; they're isolated to the UI, so at worst it will make things harder to debug until the issues are fixed. On the other hand, a lot can be mitigated by testing; there's already a lot of test coverage for parts of this code, but the UI is kinda the exception there. So the longer these changes stay in review instead of being committed, the less coverage they'll have in people's day to day testing, actually increasing the risk that some bug might be missed. > What are the external behavior changes between the new and the old ones? None by default. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19195: [DOCS] Fix unreachable links in the document
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19195 **[Test build #81646 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81646/testReport)** for PR 19195 at commit [`99a4394`](https://github.com/apache/spark/commit/99a439442a708aabb2d37d1bb5cd064be74ef60e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19107: [SPARK-21799][ML] Fix `KMeans` performance regression ca...
Github user smurching commented on the issue: https://github.com/apache/spark/pull/19107 @jkbradley would you be able to give this a look? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19195: [DOCS] Fix unreachable links in the document
GitHub user sarutak opened a pull request: https://github.com/apache/spark/pull/19195 [DOCS] Fix unreachable links in the document ## What changes were proposed in this pull request? Recently, I found two unreachable links in the document and fixed them. Because of small changes related to the document, I don't file this issue in JIRA but please suggest I should do it if you think it's needed. ## How was this patch tested? Tested manually. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sarutak/spark fix-unreachable-link Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19195.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19195 commit 99a439442a708aabb2d37d1bb5cd064be74ef60e Author: Kousuke Saruta Date: 2017-09-11T17:00:02Z Fixed unreachable links --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19106: [SPARK-21770][ML] ProbabilisticClassificationModel fix c...
Github user smurching commented on the issue: https://github.com/apache/spark/pull/19106 This looks good to me! @srowen would you be able to give it another look? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org