[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15408


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-15 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83533112
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+@ThreadSafe
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = 0;
+  while (nRead == 0) {
+nRead = fileChannel.read(byteBuffer);
+  }
+  if (nRead < 0) {
+return false;
+  }
+  byteBuffer.flip();
+}
+return true;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+if (!refill()) {
+  return -1;
+}
+return byteBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int offset, int len) throws 
IOException {
+if (offset < 0 || len < 0 || (offset + len) < 0 || (b.length - (offset 
+ len)) < 0) {
--- End diff --

Ignore my previous comments, we still need it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-15 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83532993
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+@ThreadSafe
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = 0;
+  while (nRead == 0) {
+nRead = fileChannel.read(byteBuffer);
+  }
+  if (nRead < 0) {
+return false;
+  }
+  byteBuffer.flip();
+}
+return true;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+if (!refill()) {
+  return -1;
+}
+return byteBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int offset, int len) throws 
IOException {
+if (offset < 0 || len < 0 || (offset + len) < 0 || (b.length - (offset 
+ len)) < 0) {
+  throw new IndexOutOfBoundsException();
+}
+if (!refill()) {
+  return -1;
+}
+len = Math.min(len, byteBuffer.remaining());
+byteBuffer.get(b, offset, len);
+return len;
+  }
+
+  @Override
+  public synchronized int available() throws IOException {
+return byteBuffer.remaining();
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+if (n <= 0L) {
+  return 0L;
+}
+if (byteBuffer.remaining() >= n) {
+  // The buffered content is enough to skip
+  byteBuffer.position(byteBuffer.position() + (int) n);
+  return n;
+}
+long skippedFromBuffer = byteBuffer.remaining();
+long toSkipFromFileChannel = n - skippedFromBuffer;
+// Discard everything we have read in the buffer.
+byteBuffer.position(0);
+byteBuffer.flip();
+return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
+  }
+
+  private long skipFromFileChannel(long n) throws IOException {
+long currentFilePosition = fileChannel.position();
+long size = fileChannel.size();
+if (n > size - currentFilePosition) {
+  fileChannel.position(size);
+  return size - currentFilePosition;
+} else {
+  fileChannel.position(currentFilePosition + n);
+  return n;
+}
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+fileChannel.close();
+

[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-15 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83532978
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+@ThreadSafe
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = 0;
+  while (nRead == 0) {
+nRead = fileChannel.read(byteBuffer);
+  }
+  if (nRead < 0) {
+return false;
+  }
+  byteBuffer.flip();
+}
+return true;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+if (!refill()) {
+  return -1;
+}
+return byteBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int offset, int len) throws 
IOException {
+if (offset < 0 || len < 0 || (offset + len) < 0 || (b.length - (offset 
+ len)) < 0) {
--- End diff --

We still need to check if `offset` and `len` is less than 0 right? Removed 
the` offset + len < 0` condition because that is covered in the last condition 
`(b.length - (offset + len)) < 0`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-15 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83532939
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.ThreadSafe;
--- End diff --

Nit^2 : no longer needed as an import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-15 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83532952
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ */
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static final int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = 0;
+  while (nRead == 0) {
+nRead = fileChannel.read(byteBuffer);
+  }
+  if (nRead < 0) {
+return false;
+  }
+  byteBuffer.flip();
+}
+return true;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+if (!refill()) {
+  return -1;
+}
+return byteBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int offset, int len) throws 
IOException {
+if (offset < 0 || len < 0 || (b.length - (offset + len)) < 0) {
--- End diff --

Ah no I think that condition was needed. I mean: `if (offset < 0 || len < 0 
|| offset + len < 0 || offset + len > b.length) {`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-15 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83532904
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+@ThreadSafe
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-15 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83532901
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+@ThreadSafe
--- End diff --

Alright, removed it for consistency. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-15 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83532883
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
--- End diff --

Okay, I removed the TODO here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-15 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83529473
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+@ThreadSafe
--- End diff --

I only see one location in the codebase where we use this annotation, and I 
think we probably shouldn't use it at all if not used consistently. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-15 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83529504
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+@ThreadSafe
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = 0;
+  while (nRead == 0) {
+nRead = fileChannel.read(byteBuffer);
+  }
+  if (nRead < 0) {
+return false;
+  }
+  byteBuffer.flip();
+}
+return true;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+if (!refill()) {
+  return -1;
+}
+return byteBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int offset, int len) throws 
IOException {
+if (offset < 0 || len < 0 || (offset + len) < 0 || (b.length - (offset 
+ len)) < 0) {
--- End diff --

Hardly matters, but now that this condition has been made more explicit, 
then final condition is simpler as `offset + len > b.length`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-15 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83529508
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+@ThreadSafe
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = 0;
+  while (nRead == 0) {
+nRead = fileChannel.read(byteBuffer);
+  }
+  if (nRead < 0) {
+return false;
+  }
+  byteBuffer.flip();
+}
+return true;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+if (!refill()) {
+  return -1;
+}
+return byteBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int offset, int len) throws 
IOException {
+if (offset < 0 || len < 0 || (offset + len) < 0 || (b.length - (offset 
+ len)) < 0) {
+  throw new IndexOutOfBoundsException();
+}
+if (!refill()) {
+  return -1;
+}
+len = Math.min(len, byteBuffer.remaining());
+byteBuffer.get(b, offset, len);
+return len;
+  }
+
+  @Override
+  public synchronized int available() throws IOException {
+return byteBuffer.remaining();
+  }
+
+  @Override
+  public synchronized long skip(long n) throws IOException {
+if (n <= 0L) {
+  return 0L;
+}
+if (byteBuffer.remaining() >= n) {
+  // The buffered content is enough to skip
+  byteBuffer.position(byteBuffer.position() + (int) n);
+  return n;
+}
+long skippedFromBuffer = byteBuffer.remaining();
+long toSkipFromFileChannel = n - skippedFromBuffer;
+// Discard everything we have read in the buffer.
+byteBuffer.position(0);
+byteBuffer.flip();
+return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
+  }
+
+  private long skipFromFileChannel(long n) throws IOException {
+long currentFilePosition = fileChannel.position();
+long size = fileChannel.size();
+if (n > size - currentFilePosition) {
+  fileChannel.position(size);
+  return size - currentFilePosition;
+} else {
+  fileChannel.position(currentFilePosition + n);
+  return n;
+}
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+fileChannel.close();
+

[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-15 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83529483
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
--- End diff --

I don't really care, but this could be a comment inside the class rather 
than user-facing. In fact I don't even know it's a to-do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-15 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83529476
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+@ThreadSafe
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
--- End diff --

Oops, forgot to say this should be `final`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-14 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83476597
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -72,30 +74,35 @@ private boolean refill() throws IOException {
   }
 
   @Override
-  public int read() throws IOException {
+  public synchronized int read() throws IOException {
 if (!refill()) {
   return -1;
 }
 return byteBuffer.get() & 0xFF;
   }
 
   @Override
-  public int read(byte[] b, int offset, int len) throws IOException {
+  public synchronized int read(byte[] b, int offset, int len) throws 
IOException {
 if (!refill()) {
   return -1;
 }
+if ((offset | len | (offset + len) | (b.length - (offset + len))) < 0) 
{
--- End diff --

@srowen - Agreed, the condition was little confusing. Changed it to make it 
clearer. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-14 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83388424
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -72,30 +74,35 @@ private boolean refill() throws IOException {
   }
 
   @Override
-  public int read() throws IOException {
+  public synchronized int read() throws IOException {
 if (!refill()) {
   return -1;
 }
 return byteBuffer.get() & 0xFF;
   }
 
   @Override
-  public int read(byte[] b, int offset, int len) throws IOException {
+  public synchronized int read(byte[] b, int offset, int len) throws 
IOException {
 if (!refill()) {
   return -1;
 }
+if ((offset | len | (offset + len) | (b.length - (offset + len))) < 0) 
{
--- End diff --

Oh I see. Well it's reasonable to keep it then, but I'd at least add a 
comment I think. I didn't recognize this at all until I thought about it for a 
good few minutes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-13 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83319614
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -72,30 +74,35 @@ private boolean refill() throws IOException {
   }
 
   @Override
-  public int read() throws IOException {
+  public synchronized int read() throws IOException {
 if (!refill()) {
   return -1;
 }
 return byteBuffer.get() & 0xFF;
   }
 
   @Override
-  public int read(byte[] b, int offset, int len) throws IOException {
+  public synchronized int read(byte[] b, int offset, int len) throws 
IOException {
 if (!refill()) {
   return -1;
 }
+if ((offset | len | (offset + len) | (b.length - (offset + len))) < 0) 
{
--- End diff --

@srowen this line comes from BufferedInputStream. I'm ok with rewriting it 
for readability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-13 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83171663
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -72,30 +74,35 @@ private boolean refill() throws IOException {
   }
 
   @Override
-  public int read() throws IOException {
+  public synchronized int read() throws IOException {
 if (!refill()) {
   return -1;
 }
 return byteBuffer.get() & 0xFF;
   }
 
   @Override
-  public int read(byte[] b, int offset, int len) throws IOException {
+  public synchronized int read(byte[] b, int offset, int len) throws 
IOException {
 if (!refill()) {
   return -1;
 }
+if ((offset | len | (offset + len) | (b.length - (offset + len))) < 0) 
{
--- End diff --

@zsxwing oh I get it, you're really just checking if any top bit is set. 
Hm, that seems fairly obscure compared to just checking the arguments. Sure 
it's a little more code, but it's readable and i don't think this is 
performance critical? I hadn't seen this before and had to think a while to get 
what it does.

Yes, I'm agreeing that offset and len can't be _allowed_ to be negative, 
not that it can't be specified by the caller as something negative.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-12 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83076486
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -72,30 +74,35 @@ private boolean refill() throws IOException {
   }
 
   @Override
-  public int read() throws IOException {
+  public synchronized int read() throws IOException {
 if (!refill()) {
   return -1;
 }
 return byteBuffer.get() & 0xFF;
   }
 
   @Override
-  public int read(byte[] b, int offset, int len) throws IOException {
+  public synchronized int read(byte[] b, int offset, int len) throws 
IOException {
 if (!refill()) {
   return -1;
 }
+if ((offset | len | (offset + len) | (b.length - (offset + len))) < 0) 
{
--- End diff --



@srowen I think this check is correct. It checks if any of these values is 
negative.

> offset and len can't be negative and their sum shouldn't exceed the array 
length.

It's possible. `offset` and `len` are set by the caller.

@sitalkedia Could you move the argument check before `refill`? If the 
argument is invalid, we should always throw an exception instead of returning 
`-1`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-12 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83040126
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -72,30 +74,35 @@ private boolean refill() throws IOException {
   }
 
   @Override
-  public int read() throws IOException {
+  public synchronized int read() throws IOException {
 if (!refill()) {
   return -1;
 }
 return byteBuffer.get() & 0xFF;
   }
 
   @Override
-  public int read(byte[] b, int offset, int len) throws IOException {
+  public synchronized int read(byte[] b, int offset, int len) throws 
IOException {
 if (!refill()) {
   return -1;
 }
+if ((offset | len | (offset + len) | (b.length - (offset + len))) < 0) 
{
--- End diff --

I think this must be a typo... this is a bitwise-or of a bunch of ints. I 
think maybe the example was given as a sort of pseudo code. offset and len 
can't be negative and their sum shouldn't exceed the array lenght, but that 
seems like it. 'else' below isn't really needed, but that's a nit. Actually, 
you want to check whether len is after updating on line 94 right? if no data is 
available you also return 0 without a no-op read?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-12 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r83038562
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = 0;
+  while (nRead == 0) {
+nRead = fileChannel.read(byteBuffer);
+  }
+  if (nRead < 0) {
+return false;
+  }
+  byteBuffer.flip();
+}
+return true;
+  }
+
+  @Override
+  public int read() throws IOException {
+if (!refill()) {
+  return -1;
+}
+return byteBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public int read(byte[] b, int offset, int len) throws IOException {
+if (!refill()) {
+  return -1;
+}
+len = Math.min(len, byteBuffer.remaining());
+byteBuffer.get(b, offset, len);
+return len;
+  }
+
+  @Override
+  public int available() throws IOException {
+return byteBuffer.remaining();
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+if (n <= 0L) {
+  return 0L;
+}
+if (byteBuffer.remaining() >= n) {
+  // The buffered content is enough to skip
+  byteBuffer.position(byteBuffer.position() + (int) n);
+  return n;
+}
+long skippedFromBuffer = byteBuffer.remaining();
+long toSkipFromFileChannel = n - skippedFromBuffer;
+// Discard everything we have read in the buffer.
+byteBuffer.position(0);
+byteBuffer.flip();
+return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
+  }
+
+  private long skipFromFileChannel(long n) throws IOException {
+long currentFilePosition = fileChannel.position();
+long size = fileChannel.size();
+if (currentFilePosition + n > size) {
--- End diff --

good point, changed accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82897143
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = 0;
+  while (nRead == 0) {
+nRead = fileChannel.read(byteBuffer);
+  }
+  if (nRead < 0) {
+return false;
+  }
+  byteBuffer.flip();
+}
+return true;
+  }
+
+  @Override
+  public int read() throws IOException {
+if (!refill()) {
+  return -1;
+}
+return byteBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public int read(byte[] b, int offset, int len) throws IOException {
+if (!refill()) {
+  return -1;
+}
+len = Math.min(len, byteBuffer.remaining());
+byteBuffer.get(b, offset, len);
+return len;
+  }
+
+  @Override
+  public int available() throws IOException {
+return byteBuffer.remaining();
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+if (n <= 0L) {
+  return 0L;
+}
+if (byteBuffer.remaining() >= n) {
+  // The buffered content is enough to skip
+  byteBuffer.position(byteBuffer.position() + (int) n);
+  return n;
+}
+long skippedFromBuffer = byteBuffer.remaining();
+long toSkipFromFileChannel = n - skippedFromBuffer;
+// Discard everything we have read in the buffer.
+byteBuffer.position(0);
+byteBuffer.flip();
+return skippedFromBuffer + skipFromFileChannel(toSkipFromFileChannel);
+  }
+
+  private long skipFromFileChannel(long n) throws IOException {
+long currentFilePosition = fileChannel.position();
+long size = fileChannel.size();
+if (currentFilePosition + n > size) {
--- End diff --

nit: use `n > size - currentFilePosition` to avoid overflow (e.g., n is 
`Long.MAX_VALUE`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-

[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82903865
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = 0;
+  while (nRead == 0) {
+nRead = fileChannel.read(byteBuffer);
+  }
+  if (nRead < 0) {
+return false;
+  }
+  byteBuffer.flip();
+}
+return true;
+  }
+
+  @Override
+  public int read() throws IOException {
+if (!refill()) {
+  return -1;
+}
+return byteBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public int read(byte[] b, int offset, int len) throws IOException {
+if (!refill()) {
--- End diff --

nit: please add the defense codes like BufferedInputStream
```Java
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-11 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82816160
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead <= 0) {
--- End diff --

There arearly a few concerns here:
* immediate was to prevent underflow exception in get ()
* file channels don't return 0 when reading off local disk unless 
configured as non blocking.
Though this might be implementation detail we don't want to rely on ? (If 
yes, we got to fix other places in our code as @srowen mentioned )
* if we do want to handle 0 as retry, we will need to ensure we don't go 
into infinite loop (since this should be an anamoly to begin with).
Typical readFully () does infinite loop since it is working off generic 
InputStream where this behavior is possible, in this context it would be the 
exception.



Having said that, I can see a nas or nfs mount repeatedly returning 0 even 
though file is not done.
We could special case 0 and retry for 'n' (3 ?) times whenever we see a 
zero and file not done ?




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-11 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82737475
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead <= 0) {
--- End diff --

It is pretty standard to rely only in -1. It is especially common in 
networking code to get 0 bytes read.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-11 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82737052
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead <= 0) {
--- End diff --

It could be 0 forever though (dunno, FS error?) and then this creates an 
infinite loop. I went hunting through the SDK for some examples of dealing with 
this, because this has always been a tricky part of even `InputStream`.

`java.nio.Files`:

```
private static long copy(InputStream source, OutputStream sink)
throws IOException
{
long nread = 0L;
byte[] buf = new byte[BUFFER_SIZE];
int n;
while ((n = source.read(buf)) > 0) {
sink.write(buf, 0, n);
nread += n;
}
return nread;
}

...

private static byte[] read(InputStream source, int initialSize) throws 
IOException {
int capacity = initialSize;
byte[] buf = new byte[capacity];
int nread = 0;
int n;
for (;;) {
// read to EOF which may read more or less than initialSize 
(eg: file
// is truncated while we are reading)
while ((n = source.read(buf, nread, capacity - nread)) > 0)
nread += n;

// if last call to source.read() returned -1, we are done
// otherwise, try to read one more byte; if that failed we're 
done too
if (n < 0 || (n = source.read()) < 0)
break;

// one more byte was read; need to allocate a larger buffer
if (capacity <= MAX_BUFFER_SIZE - capacity) {
capacity = Math.max(capacity << 1, BUFFER_SIZE);
} else {
if (capacity == MAX_BUFFER_SIZE)
throw new OutOfMemoryError("Required array size too 
large");
capacity = MAX_BUFFER_SIZE;
}
buf = Arrays.copyOf(buf, capacity);
buf[nread++] = (byte)n;
}
return (capacity == nread) ? buf : Arrays.copyOf(buf, nread);
}
```

The first instance seems to assume that 0 bytes means it should stop.
In the second instance it forces the `InputStream` to give a definitive 
answer by tryin

[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-11 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82733979
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead <= 0) {
--- End diff --

Actually the right thing here is to put it in a loop until we see -1.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82733349
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead <= 0) {
--- End diff --

The thing is, that will just result in an error on the next read(), because 
nothing was put in the buffer. Is that worse? I'm not even sure. But, calling 
it in a loop could result in an infinite loop. I'm accustomed to stopping 
reading when the result is 0 in cases like this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-10 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82732392
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ * TODO: support {@link #mark(int)}/{@link #reset()}
+ *
+ */
+public final class NioBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead <= 0) {
--- End diff --

Should only return false if nRead is -1, which is the contract specified by 
FileChannel.read().

We should make sure the thing still works if nRead is 0.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-10 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82732276
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBasedBufferedFileInputStream.java ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ */
+public final class NioBasedBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBasedBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBasedBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead == -1) {
--- End diff --

It is possible to read 0 bytes and then next time able to read more than 0 
bytes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82726120
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBasedBufferedFileInputStream.java ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ */
+public final class NioBasedBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBasedBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBasedBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead == -1) {
--- End diff --

Hm, 
https://docs.oracle.com/javase/7/docs/api/java/nio/channels/FileChannel.html#read(java.nio.ByteBuffer)
 suggests that 0 doesn't mean EOF, just 0 bytes read, but, I'm also not sure 
what to do if the channel won't actually give any bytes at this point. I think 
that can only happen if the buffer is full but that won't happen here. `<= 0` 
seems reasonable AFAIK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-10 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82706375
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBasedBufferedFileInputStream.java ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ */
+public final class NioBasedBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBasedBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBasedBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead == -1) {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-10 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82706401
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBasedBufferedFileInputStream.java ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ */
+public final class NioBasedBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBasedBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBasedBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead == -1) {
+return false;
+  }
+  byteBuffer.flip();
+}
+return true;
+  }
+
+  @Override
+  public int read() throws IOException {
+if (!refill()) {
+  return -1;
+}
+return byteBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public int read(byte[] b, int offset, int len) throws IOException {
+if (!refill()) {
+  return -1;
+}
+len = Math.min(len, byteBuffer.remaining());
+byteBuffer.get(b, offset, len);
+return len;
+  }
+
+  @Override
+  public int available() throws IOException {
+return byteBuffer.remaining();
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+if (n < 0L) {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-10 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r8279
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBasedBufferedFileInputStream.java ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ */
+public final class NioBasedBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBasedBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBasedBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead == -1) {
--- End diff --

<= 0 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-10 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82665886
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBasedBufferedFileInputStream.java ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ */
+public final class NioBasedBufferedFileInputStream extends InputStream {
--- End diff --

Might also be a good idea to add ability to read to a ByteBuffer .. though 
that can come later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-10 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82665543
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBasedBufferedFileInputStream.java ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ */
+public final class NioBasedBufferedFileInputStream extends InputStream {
--- End diff --

NioBasedBufferedFileInputStream  -> NioBufferedFileInputStream ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-10 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82665139
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBasedBufferedFileInputStream.java ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.spark.storage.StorageUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ */
+public final class NioBasedBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE_BYTES = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBasedBufferedFileInputStream(File file, int bufferSizeInBytes) 
throws IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSizeInBytes);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBasedBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE_BYTES);
+  }
+
+  /**
+   * Checks weather data is left to be read from the input stream.
+   * @return true if data is left, false otherwise
+   * @throws IOException
+   */
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead == -1) {
+return false;
+  }
+  byteBuffer.flip();
+}
+return true;
+  }
+
+  @Override
+  public int read() throws IOException {
+if (!refill()) {
+  return -1;
+}
+return byteBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public int read(byte[] b, int offset, int len) throws IOException {
+if (!refill()) {
+  return -1;
+}
+len = Math.min(len, byteBuffer.remaining());
+byteBuffer.get(b, offset, len);
+return len;
+  }
+
+  @Override
+  public int available() throws IOException {
+return byteBuffer.remaining();
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+if (n < 0L) {
--- End diff --

<= instead ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-10 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82664592
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBasedBufferedFileInputStream.java ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ */
+public final class NioBasedBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBasedBufferedFileInputStream(File file, int bufferSize) throws 
IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSize);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBasedBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE);
+  }
+
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead == -1) {
+return false;
+  }
+  byteBuffer.flip();
+}
+return true;
+  }
+
+  @Override
+  public int read() throws IOException {
+if (!refill()) {
+  return -1;
+}
+return byteBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+if (!refill()) {
+  return -1;
+}
+len = Math.min(len, byteBuffer.remaining());
+byteBuffer.get(b, off, len);
+return len;
+  }
+
+  @Override
+  public int available() throws IOException {
--- End diff --

I agree, InputStream simply has a stub impl (just like read(byte[]) looks 
through read). Providing .remaining() can be useful


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-10 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82655690
  
--- Diff: 
core/src/main/java/org/apache/spark/io/NioBasedBufferedFileInputStream.java ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * {@link InputStream} implementation which uses direct buffer
+ * to read a file to avoid extra copy of data between Java and
+ * native memory which happens when using {@link 
java.io.BufferedInputStream}.
+ * Unfortunately, this is not something already available in JDK,
+ * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
+ * but does not support buffering.
+ *
+ */
+public final class NioBasedBufferedFileInputStream extends InputStream {
+
+  private static int DEFAULT_BUFFER_SIZE = 8192;
+
+  private final ByteBuffer byteBuffer;
+
+  private final FileChannel fileChannel;
+
+  public NioBasedBufferedFileInputStream(File file, int bufferSize) throws 
IOException {
+byteBuffer = ByteBuffer.allocateDirect(bufferSize);
+fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+byteBuffer.flip();
+  }
+
+  public NioBasedBufferedFileInputStream(File file) throws IOException {
+this(file, DEFAULT_BUFFER_SIZE);
+  }
+
+  private boolean refill() throws IOException {
+if (!byteBuffer.hasRemaining()) {
+  byteBuffer.clear();
+  int nRead = fileChannel.read(byteBuffer);
+  if (nRead == -1) {
+return false;
+  }
+  byteBuffer.flip();
+}
+return true;
+  }
+
+  @Override
+  public int read() throws IOException {
+if (!refill()) {
+  return -1;
+}
+return byteBuffer.get() & 0xFF;
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+if (!refill()) {
+  return -1;
+}
+len = Math.min(len, byteBuffer.remaining());
+byteBuffer.get(b, off, len);
+return len;
+  }
+
+  @Override
+  public int available() throws IOException {
+return byteBuffer.remaining();
+  }
+
+
+  @Override
+  public long skip(long n) throws IOException {
+if(n < 0L) {
+  return 0L;
+}
+if (byteBuffer.remaining() > n) {
+  // The buffered content is enough to skip
+  byteBuffer.position(byteBuffer.position() + (int) n);
+  return n;
+}
+long toSkip = n - byteBuffer.remaining();
+// Discard everything we have read in the buffer.
+byteBuffer.position(0);
+byteBuffer.flip();
+return skipFromFileChannel(toSkip);
+  }
+
+  private long skipFromFileChannel(long n) throws IOException {
+long currentFilePosition = fileChannel.position();
+long size = fileChannel.size();
+if (currentFilePosition + n > size) {
+  fileChannel.position(size);
+  return size - currentFilePosition;
+}
+else {
+  fileChannel.position(currentFilePosition + n);
+  return n;
+}
+  }
+
+  @Override
+  public void close() throws IOException {
+fileChannel.close();
--- End diff --

I see...that makes sense. Added an explicit call to cleanup the byte 
buffer. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15408: [SPARK-17839][CORE] Use Nio's directbuffer instea...

2016-10-10 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15408#discussion_r82652845
  
--- Diff: 
core/src/test/java/org/apache/spark/io/NioBasedBufferedFileInputStreamSuite.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests functionality of {@link NioBasedBufferedFileInputStream}
+ */
+public class NioBasedBufferedFileInputStreamSuite {
+
+  byte[] randomBytes;
+
+  File inputFile;
+
+  @Before
+  public void setUp() throws IOException {
+// Create a byte array of size 2 MB with random bytes
+randomBytes =  RandomUtils.nextBytes(2 * 1024 * 1024);
+inputFile = File.createTempFile("temp-file", ".tmp");
+FileUtils.writeByteArrayToFile(inputFile, randomBytes);
+  }
+
+  @After
+  public void tearDown() {
+inputFile.delete();
+  }
+
+  @Test
+  public void testReadOneByte() throws IOException {
+InputStream inputStream = new 
NioBasedBufferedFileInputStream(inputFile);
+for (int i = 0; i < randomBytes.length; i++) {
+  assertEquals(randomBytes[i], (byte) inputStream.read());
+}
+  }
+
+  @Test
+  public void testReadMultipleBytes() throws IOException {
--- End diff --

This reminds me --- we should add test coverage reporting to Spark so it 
becomes obvious what branches are not tested sufficiently.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org