This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e0a779  [FLINK-10469][core] make sure to always write the whole 
buffer to FileChannel
1e0a779 is described below

commit 1e0a77959d27031048c1c4079a504274c82cc173
Author: Nico Kruber <n...@data-artisans.com>
AuthorDate: Tue Oct 2 21:59:18 2018 +0200

    [FLINK-10469][core] make sure to always write the whole buffer to 
FileChannel
---
 .../src/main/java/org/apache/flink/util/FileUtils.java      | 10 ++++++++++
 .../io/disk/iomanager/AsynchronousFileIOChannel.java        |  7 ++++---
 .../SpillingAdaptiveSpanningRecordDeserializer.java         |  5 +++--
 .../apache/flink/streaming/runtime/io/BufferSpiller.java    | 10 +++-------
 .../runtime/io/SpilledBufferOrEventSequenceTest.java        | 13 +++++++------
 5 files changed, 27 insertions(+), 18 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index 23af2e8..8f32262 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -28,6 +28,8 @@ import org.apache.flink.util.function.ThrowingConsumer;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
@@ -56,6 +58,14 @@ public final class FileUtils {
 
        // 
------------------------------------------------------------------------
 
+       public static void writeCompletely(WritableByteChannel channel, 
ByteBuffer src) throws IOException {
+               while (src.hasRemaining()) {
+                       channel.write(src);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
        /**
         * Constructs a random filename with the given prefix and
         * a random part generated from hex characters.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
index 0e575d3..ddb0c4e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.disk.iomanager;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.event.NotificationListener;
+import org.apache.flink.util.FileUtils;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -341,7 +342,7 @@ final class SegmentWriteRequest implements WriteRequest {
        @Override
        public void write() throws IOException {
                try {
-                       this.channel.fileChannel.write(this.segment.wrap(0, 
this.segment.size()));
+                       FileUtils.writeCompletely(this.channel.fileChannel, 
this.segment.wrap(0, this.segment.size()));
                }
                catch (NullPointerException npex) {
                        throw new IOException("Memory segment has been 
released.");
@@ -375,8 +376,8 @@ final class BufferWriteRequest implements WriteRequest {
                header.putInt(nioBufferReadable.remaining());
                header.flip();
 
-               channel.fileChannel.write(header);
-               channel.fileChannel.write(nioBufferReadable);
+               FileUtils.writeCompletely(channel.fileChannel, header);
+               FileUtils.writeCompletely(channel.fileChannel, 
nioBufferReadable);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 8630ace..a78cb4d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
 
 import java.io.BufferedInputStream;
@@ -481,7 +482,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                                this.spillingChannel = createSpillingChannel();
 
                                ByteBuffer toWrite = 
partial.segment.wrap(partial.position, numBytesChunk);
-                               this.spillingChannel.write(toWrite);
+                               FileUtils.writeCompletely(this.spillingChannel, 
toWrite);
                        }
                        else {
                                // collect in memory
@@ -528,7 +529,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                        if (spillingChannel != null) {
                                // spill to file
                                ByteBuffer toWrite = 
segment.wrap(segmentPosition, toCopy);
-                               this.spillingChannel.write(toWrite);
+                               FileUtils.writeCompletely(this.spillingChannel, 
toWrite);
                        } else {
                                segment.get(segmentPosition, buffer, 
this.accumulatedRecordBytes, toCopy);
                        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 4b690d1..ae95408 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
 
 import java.io.File;
@@ -75,9 +76,6 @@ public class BufferSpiller implements BufferBlocker {
        /** The buffer that encodes the spilled header. */
        private final ByteBuffer headBuffer;
 
-       /** The reusable array that holds header and contents buffers. */
-       private final ByteBuffer[] sources;
-
        /** The file that we currently spill to. */
        private File currentSpillFile;
 
@@ -109,8 +107,6 @@ public class BufferSpiller implements BufferBlocker {
                this.headBuffer = ByteBuffer.allocateDirect(16);
                this.headBuffer.order(ByteOrder.LITTLE_ENDIAN);
 
-               this.sources = new ByteBuffer[] { this.headBuffer, null };
-
                File[] tempDirs = ioManager.getSpillingDirectories();
                this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % 
tempDirs.length];
 
@@ -148,8 +144,8 @@ public class BufferSpiller implements BufferBlocker {
 
                        bytesWritten += (headBuffer.remaining() + 
contents.remaining());
 
-                       sources[1] = contents;
-                       currentChannel.write(sources);
+                       FileUtils.writeCompletely(currentChannel, headBuffer);
+                       FileUtils.writeCompletely(currentChannel, contents);
                }
                finally {
                        if (boe.isBuffer()) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
index adbe240..c1ff79f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import 
org.apache.flink.streaming.runtime.io.BufferSpiller.SpilledBufferOrEventSequence;
+import org.apache.flink.util.FileUtils;
 
 import org.junit.After;
 import org.junit.Before;
@@ -107,7 +108,7 @@ public class SpilledBufferOrEventSequenceTest {
                        ByteBuffer buf = ByteBuffer.allocate(7);
                        buf.order(ByteOrder.LITTLE_ENDIAN);
 
-                       fileChannel.write(buf);
+                       FileUtils.writeCompletely(fileChannel, buf);
                        fileChannel.position(0);
 
                        SpilledBufferOrEventSequence seq = new 
SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
@@ -175,7 +176,7 @@ public class SpilledBufferOrEventSequenceTest {
                        data.put((byte) 0);
                        data.position(0);
                        data.limit(312);
-                       fileChannel.write(data);
+                       FileUtils.writeCompletely(fileChannel, data);
                        fileChannel.position(0L);
 
                        SpilledBufferOrEventSequence seq = new 
SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
@@ -414,7 +415,7 @@ public class SpilledBufferOrEventSequenceTest {
                        ByteBuffer data = ByteBuffer.allocate(157);
                        data.order(ByteOrder.LITTLE_ENDIAN);
 
-                       fileChannel.write(data);
+                       FileUtils.writeCompletely(fileChannel, data);
                        fileChannel.position(54);
 
                        SpilledBufferOrEventSequence seq = new 
SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
@@ -451,8 +452,8 @@ public class SpilledBufferOrEventSequenceTest {
                header.put((byte) 1);
                header.flip();
 
-               fileChannel.write(header);
-               fileChannel.write(serializedEvent);
+               FileUtils.writeCompletely(fileChannel, header);
+               FileUtils.writeCompletely(fileChannel, serializedEvent);
                return new BufferOrEvent(evt, channelIndex);
        }
 
@@ -467,7 +468,7 @@ public class SpilledBufferOrEventSequenceTest {
                        data.put((byte) i);
                }
                data.flip();
-               fileChannel.write(data);
+               FileUtils.writeCompletely(fileChannel, data);
        }
 
        private static void validateBuffer(BufferOrEvent boe, int expectedSize, 
int expectedChannelIndex) {

Reply via email to