Repository: ignite
Updated Branches:
  refs/heads/master 41b742cd6 -> 54fb41517


IGNITE-3858 IGFS: Support direct PROXY mode invocation in methods: create / 
append. This closes #1070. This closes #1084.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a97483a4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a97483a4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a97483a4

Branch: refs/heads/master
Commit: a97483a4ce2c00bd0cca025c4ef4bfa181897aa9
Parents: 0d5ee78
Author: tledkov-gridgain <tled...@gridgain.com>
Authored: Thu Sep 22 10:51:05 2016 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Thu Sep 22 10:51:05 2016 +0300

----------------------------------------------------------------------
 .../igfs/IgfsAbstractOutputStream.java          | 266 ++++++++++++++++
 .../internal/processors/igfs/IgfsImpl.java      |  27 +-
 .../processors/igfs/IgfsOutputStreamImpl.java   | 319 ++++---------------
 .../igfs/IgfsOutputStreamProxyImpl.java         | 163 ++++++++++
 .../igfs/IgfsAbstractBaseSelfTest.java          |   2 +-
 5 files changed, 518 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractOutputStream.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractOutputStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractOutputStream.java
new file mode 100644
index 0000000..c1e751e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractOutputStream.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.events.IgfsEvent;
+import org.apache.ignite.igfs.IgfsOutputStream;
+import org.apache.ignite.igfs.IgfsPath;
+import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
+
+/**
+ * Output stream to store data into grid cache with separate blocks.
+ */
+abstract class IgfsAbstractOutputStream extends IgfsOutputStream {
+    /** IGFS context. */
+    protected final IgfsContext igfsCtx;
+
+    /** Path to file. */
+    protected final IgfsPath path;
+
+    /** Buffer size. */
+    protected final int bufSize;
+
+    /** File worker batch. */
+    protected final IgfsFileWorkerBatch batch;
+
+    /** Mutex for synchronization. */
+    protected final Object mux = new Object();
+
+    /** Flag for this stream open/closed state. */
+    protected boolean closed;
+
+    /** Local buffer to store stream data as consistent block. */
+    protected ByteBuffer buf;
+
+    /** Bytes written. */
+    protected long bytes;
+
+    /** Time consumed by write operations. */
+    protected long time;
+
+    /**
+     * Constructs file output stream.
+     *
+     * @param igfsCtx IGFS context.
+     * @param path Path to stored file.
+     * @param bufSize The size of the buffer to be used.
+     * @param batch Optional secondary file system batch.
+     */
+    IgfsAbstractOutputStream(IgfsContext igfsCtx, IgfsPath path, int bufSize, 
@Nullable IgfsFileWorkerBatch batch) {
+        synchronized (mux) {
+            this.path = path;
+            this.bufSize = optimizeBufferSize(bufSize);
+            this.igfsCtx = igfsCtx;
+            this.batch = batch;
+        }
+
+        igfsCtx.metrics().incrementFilesOpenedForWrite();
+    }
+
+    /**
+     * Optimize buffer size.
+     *
+     * @param bufSize Original byffer size.
+     * @return Optimized buffer size.
+     */
+    protected abstract int optimizeBufferSize(int bufSize);
+
+    /** {@inheritDoc} */
+    @Override public void write(int b) throws IOException {
+        synchronized (mux) {
+            checkClosed(null, 0);
+
+            b &= 0xFF;
+
+            long startTime = System.nanoTime();
+
+            if (buf == null)
+                buf = allocateNewBuffer();
+
+            buf.put((byte)b);
+
+            sendBufferIfFull();
+
+            time += System.nanoTime() - startTime;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("NullableProblems")
+    @Override public void write(byte[] b, int off, int len) throws IOException 
{
+        A.notNull(b, "b");
+
+        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > 
b.length) || ((off + len) < 0)) {
+            throw new IndexOutOfBoundsException("Invalid bounds [data.length=" 
+ b.length + ", offset=" + off +
+                ", length=" + len + ']');
+        }
+
+        synchronized (mux) {
+            checkClosed(null, 0);
+
+            // Check if there is anything to write.
+            if (len == 0)
+                return;
+
+            long startTime = System.nanoTime();
+
+            if (buf == null) {
+                if (len >= bufSize) {
+                    // Send data right away.
+                    ByteBuffer tmpBuf = ByteBuffer.wrap(b, off, len);
+
+                    send(tmpBuf, tmpBuf.remaining());
+                }
+                else {
+                    buf = allocateNewBuffer();
+
+                    buf.put(b, off, len);
+                }
+            }
+            else {
+                // Re-allocate buffer if needed.
+                if (buf.remaining() < len)
+                    buf = ByteBuffer.allocate(buf.position() + 
len).put((ByteBuffer)buf.flip());
+
+                buf.put(b, off, len);
+
+                sendBufferIfFull();
+            }
+
+            time += System.nanoTime() - startTime;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void transferFrom(DataInput in, int len) throws 
IOException {
+        synchronized (mux) {
+            checkClosed(in, len);
+
+            long startTime = System.nanoTime();
+
+            // Clean-up local buffer before streaming.
+            sendBufferIfNotEmpty();
+
+            // Perform transfer.
+            send(in, len);
+
+            time += System.nanoTime() - startTime;
+        }
+    }
+
+    /**
+     * Validate this stream is open.
+     *
+     * @param in Data input.
+     * @param len Data len in bytes.
+     * @throws IOException If this stream is closed.
+     */
+    protected void checkClosed(@Nullable DataInput in, int len) throws 
IOException {
+        assert Thread.holdsLock(mux);
+
+        if (closed) {
+            // Must read data from stream before throwing exception.
+            if (in != null)
+                in.skipBytes(len);
+
+            throw new IOException("Stream has been closed: " + this);
+        }
+    }
+
+    /**
+     * Send local buffer if it full.
+     *
+     * @throws IOException If failed.
+     */
+    private void sendBufferIfFull() throws IOException {
+        if (buf.position() >= bufSize)
+            sendBuffer();
+    }
+
+    /**
+     * Send local buffer if at least something is stored there.
+     *
+     * @throws IOException If failed.
+     */
+    void sendBufferIfNotEmpty() throws IOException {
+        if (buf != null && buf.position() > 0)
+            sendBuffer();
+    }
+
+    /**
+     * Send all local-buffered data to server.
+     *
+     * @throws IOException In case of IO exception.
+     */
+    private void sendBuffer() throws IOException {
+        buf.flip();
+
+        send(buf, buf.remaining());
+
+        buf = null;
+    }
+
+    /**
+     * Store data block.
+     *
+     * @param data Block.
+     * @param writeLen Write length.
+     * @throws IOException If failed.
+     */
+    protected abstract void send(Object data, int writeLen) throws IOException;
+
+    /**
+     * Allocate new buffer.
+     *
+     * @return New buffer.
+     */
+    private ByteBuffer allocateNewBuffer() {
+        return ByteBuffer.allocate(bufSize);
+    }
+
+    /**
+     * Updates IGFS metrics when the stream is closed.
+     */
+    protected void updateMetricsOnClose() {
+        IgfsLocalMetrics metrics = igfsCtx.metrics();
+
+        metrics.addWrittenBytesTime(bytes, time);
+        metrics.decrementFilesOpenedForWrite();
+
+        GridEventStorageManager evts = igfsCtx.kernalContext().event();
+
+        if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
+            evts.record(new IgfsEvent(path, igfsCtx.localNode(),
+                EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsAbstractOutputStream.class, this);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 87a4699..bee9d9a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -92,7 +92,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -274,7 +274,7 @@ public final class IgfsImpl implements IgfsEx {
         }
 
         dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, 
Integer.MAX_VALUE, 5000L,
-            new LinkedBlockingQueue<Runnable>(), new 
IgfsThreadFactory(cfg.getName()), null) : null;
+            new SynchronousQueue<Runnable>(), new 
IgfsThreadFactory(cfg.getName()), null) : null;
     }
 
     /** {@inheritDoc} */
@@ -1088,6 +1088,17 @@ public final class IgfsImpl implements IgfsEx {
                 else
                     dirProps = fileProps = new HashMap<>(props);
 
+                if (mode == PROXY) {
+                    assert secondaryFs != null;
+
+                    OutputStream secondaryStream = secondaryFs.create(path, 
bufSize, overwrite, replication,
+                        groupBlockSize(), props);
+
+                    IgfsFileWorkerBatch batch = newBatch(path, 
secondaryStream);
+
+                    return new IgfsOutputStreamProxyImpl(igfsCtx, path, 
info(path), bufferSize(bufSize), batch);
+                }
+
                 // Prepare context for DUAL mode.
                 IgfsSecondaryFileSystemCreateContext secondaryCtx = null;
 
@@ -1142,7 +1153,15 @@ public final class IgfsImpl implements IgfsEx {
 
                 final IgfsMode mode = resolveMode(path);
 
-                IgfsFileWorkerBatch batch;
+                if (mode == PROXY) {
+                    assert secondaryFs != null;
+
+                    OutputStream secondaryStream = secondaryFs.append(path, 
bufSize, create, props);
+
+                    IgfsFileWorkerBatch batch = newBatch(path, 
secondaryStream);
+
+                    return new IgfsOutputStreamProxyImpl(igfsCtx, path, 
info(path), bufferSize(bufSize), batch);
+                }
 
                 if (mode != PRIMARY) {
                     assert IgfsUtils.isDualMode(mode);
@@ -1151,7 +1170,7 @@ public final class IgfsImpl implements IgfsEx {
 
                     IgfsCreateResult desc = meta.appendDual(secondaryFs, path, 
bufSize, create);
 
-                    batch = newBatch(path, desc.secondaryOutputStream());
+                    IgfsFileWorkerBatch batch = newBatch(path, 
desc.secondaryOutputStream());
 
                     return new IgfsOutputStreamImpl(igfsCtx, path, 
desc.info(), bufferSize(bufSize), mode, batch);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 6dec0c1..f976242 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -18,14 +18,10 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.events.IgfsEvent;
 import org.apache.ignite.igfs.IgfsException;
 import org.apache.ignite.igfs.IgfsMode;
-import org.apache.ignite.igfs.IgfsOutputStream;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
-import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -35,7 +31,6 @@ import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
 import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
 import static org.apache.ignite.igfs.IgfsMode.PROXY;
@@ -43,57 +38,30 @@ import static org.apache.ignite.igfs.IgfsMode.PROXY;
 /**
  * Output stream to store data into grid cache with separate blocks.
  */
-class IgfsOutputStreamImpl extends IgfsOutputStream {
+class IgfsOutputStreamImpl extends IgfsAbstractOutputStream {
     /** Maximum number of blocks in buffer. */
     private static final int MAX_BLOCKS_CNT = 16;
 
-    /** IGFS context. */
-    private final IgfsContext igfsCtx;
-
-    /** Path to file. */
-    private final IgfsPath path;
-
-    /** Buffer size. */
-    private final int bufSize;
-
     /** IGFS mode. */
     private final IgfsMode mode;
 
-    /** File worker batch. */
-    private final IgfsFileWorkerBatch batch;
-
-    /** Mutex for synchronization. */
-    private final Object mux = new Object();
-
     /** Write completion future. */
     private final IgniteInternalFuture<Boolean> writeFut;
 
-    /** Flag for this stream open/closed state. */
-    private boolean closed;
-
-    /** Local buffer to store stream data as consistent block. */
-    private ByteBuffer buf;
-
-    /** Bytes written. */
-    private long bytes;
-
-    /** Time consumed by write operations. */
-    private long time;
-
     /** File descriptor. */
     private IgfsEntryInfo fileInfo;
 
-    /** Space in file to write data. */
-    private long space;
+    /** Affinity written by this output stream. */
+    private IgfsFileAffinityRange streamRange;
+
+    /** Data length in remainder. */
+    protected int remainderDataLen;
 
     /** Intermediate remainder to keep data. */
     private byte[] remainder;
 
-    /** Data length in remainder. */
-    private int remainderDataLen;
-
-    /** Affinity written by this output stream. */
-    private IgfsFileAffinityRange streamRange;
+    /** Space in file to write data. */
+    protected long space;
 
     /**
      * Constructs file output stream.
@@ -107,6 +75,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      */
     IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo 
fileInfo, int bufSize, IgfsMode mode,
         @Nullable IgfsFileWorkerBatch batch) {
+        super(igfsCtx, path, bufSize, batch);
+
         assert fileInfo != null && fileInfo.isFile() : "Unexpected file info: 
" + fileInfo;
         assert mode != null && mode != PROXY && (mode == PRIMARY && batch == 
null || batch != null);
 
@@ -115,108 +85,55 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             throw new IgfsException("Failed to acquire file lock (concurrently 
modified?): " + path);
 
         synchronized (mux) {
-            this.path = path;
-            this.bufSize = optimizeBufferSize(bufSize, fileInfo);
-            this.igfsCtx = igfsCtx;
             this.fileInfo = fileInfo;
             this.mode = mode;
-            this.batch = batch;
 
             streamRange = initialStreamRange(fileInfo);
 
             writeFut = igfsCtx.data().writeStart(fileInfo.id());
         }
-
-        igfsCtx.metrics().incrementFilesOpenedForWrite();
     }
 
-    /** {@inheritDoc} */
-    @Override public void write(int b) throws IOException {
-        synchronized (mux) {
-            checkClosed(null, 0);
-
-            b &= 0xFF;
-
-            long startTime = System.nanoTime();
-
-            if (buf == null)
-                buf = allocateNewBuffer();
-
-            buf.put((byte)b);
-
-            sendBufferIfFull();
-
-            time += System.nanoTime() - startTime;
-        }
+    /**
+     * @return Length of file.
+     */
+    private long length() {
+        return fileInfo.length();
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("NullableProblems")
-    @Override public void write(byte[] b, int off, int len) throws IOException 
{
-        A.notNull(b, "b");
-
-        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > 
b.length) || ((off + len) < 0)) {
-            throw new IndexOutOfBoundsException("Invalid bounds [data.length=" 
+ b.length + ", offset=" + off +
-                ", length=" + len + ']');
-        }
-
-        synchronized (mux) {
-            checkClosed(null, 0);
-
-            // Check if there is anything to write.
-            if (len == 0)
-                return;
-
-            long startTime = System.nanoTime();
-
-            if (buf == null) {
-                if (len >= bufSize) {
-                    // Send data right away.
-                    ByteBuffer tmpBuf = ByteBuffer.wrap(b, off, len);
-
-                    send(tmpBuf, tmpBuf.remaining());
-                }
-                else {
-                    buf = allocateNewBuffer();
-
-                    buf.put(b, off, len);
-                }
-            }
-            else {
-                // Re-allocate buffer if needed.
-                if (buf.remaining() < len)
-                    buf = ByteBuffer.allocate(buf.position() + 
len).put((ByteBuffer)buf.flip());
+    @Override protected int optimizeBufferSize(int bufSize) {
+        assert bufSize > 0;
 
-                buf.put(b, off, len);
+        if (fileInfo == null)
+            return bufSize;
 
-                sendBufferIfFull();
-            }
+        int blockSize = fileInfo.blockSize();
 
-            time += System.nanoTime() - startTime;
-        }
-    }
+        if (blockSize <= 0)
+            return bufSize;
 
-    /** {@inheritDoc} */
-    @Override public void transferFrom(DataInput in, int len) throws 
IOException {
-        synchronized (mux) {
-            checkClosed(in, len);
+        if (bufSize <= blockSize)
+            // Optimize minimum buffer size to be equal file's block size.
+            return blockSize;
 
-            long startTime = System.nanoTime();
+        int maxBufSize = blockSize * MAX_BLOCKS_CNT;
 
-            // Clean-up local buffer before streaming.
-            sendBufferIfNotEmpty();
+        if (bufSize > maxBufSize)
+            // There is no profit or optimization from larger buffers.
+            return maxBufSize;
 
-            // Perform transfer.
-            send(in, len);
+        if (fileInfo.length() == 0)
+            // Make buffer size multiple of block size (optimized for new 
files).
+            return bufSize / blockSize * blockSize;
 
-            time += System.nanoTime() - startTime;
-        }
+        return bufSize;
     }
 
     /**
      * Flushes this output stream and forces any buffered output bytes to be 
written out.
      *
-     * @exception IOException  if an I/O error occurs.
+     * @throws IOException if an I/O error occurs.
      */
     @Override public void flush() throws IOException {
         synchronized (mux) {
@@ -250,40 +167,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
         }
     }
 
-    /**
-     * Await acknowledgments.
-     *
-     * @throws IOException If failed.
-     */
-    private void awaitAcks() throws IOException {
-        try {
-            igfsCtx.data().awaitAllAcksReceived(fileInfo.id());
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException("Failed to wait for flush acknowledge: " + 
fileInfo.id, e);
-        }
-    }
-
-    /**
-     * Flush remainder.
-     *
-     * @throws IOException If failed.
-     */
-    private void flushRemainder() throws IOException {
-        try {
-            if (remainder != null) {
-                igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + 
space, null, 0,
-                    ByteBuffer.wrap(remainder, 0, remainderDataLen), true, 
streamRange, batch);
-
-                remainder = null;
-                remainderDataLen = 0;
-            }
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException("Failed to flush data (remainder) [path=" + 
path + ", space=" + space + ']', e);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public final void close() throws IOException {
         synchronized (mux) {
@@ -355,75 +238,33 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             if (err != null)
                 throw err;
 
-            igfsCtx.metrics().addWrittenBytesTime(bytes, time);
-            igfsCtx.metrics().decrementFilesOpenedForWrite();
-
-            GridEventStorageManager evts = igfsCtx.kernalContext().event();
-
-            if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
-                evts.record(new IgfsEvent(path, 
igfsCtx.kernalContext().discovery().localNode(),
-                    EVT_IGFS_FILE_CLOSED_WRITE, bytes));
-        }
-    }
-
-    /**
-     * Validate this stream is open.
-     *
-     * @throws IOException If this stream is closed.
-     */
-    private void checkClosed(@Nullable DataInput in, int len) throws 
IOException {
-        assert Thread.holdsLock(mux);
-
-        if (closed) {
-            // Must read data from stream before throwing exception.
-            if (in != null)
-                in.skipBytes(len);
-
-            throw new IOException("Stream has been closed: " + this);
+            updateMetricsOnClose();
         }
     }
 
     /**
-     * Send local buffer if it full.
-     *
-     * @throws IOException If failed.
-     */
-    private void sendBufferIfFull() throws IOException {
-        if (buf.position() >= bufSize)
-            sendBuffer();
-    }
-
-    /**
-     * Send local buffer if at least something is stored there.
+     * Flush remainder.
      *
      * @throws IOException If failed.
      */
-    private void sendBufferIfNotEmpty() throws IOException {
-        if (buf != null && buf.position() > 0)
-            sendBuffer();
-    }
-
-    /**
-     * Send all local-buffered data to server.
-     *
-     * @throws IOException In case of IO exception.
-     */
-    private void sendBuffer() throws IOException {
-        buf.flip();
+    private void flushRemainder() throws IOException {
+        try {
+            if (remainder != null) {
 
-        send(buf, buf.remaining());
+                remainder = igfsCtx.data().storeDataBlocks(fileInfo, length() 
+ space, null,
+                    0, ByteBuffer.wrap(remainder, 0, remainderDataLen), true, 
streamRange, batch);
 
-        buf = null;
+                remainder = null;
+                remainderDataLen = 0;
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException("Failed to flush data (remainder) [path=" + 
path + ", space=" + space + ']', e);
+        }
     }
 
-    /**
-     * Store data block.
-     *
-     * @param data Block.
-     * @param writeLen Write length.
-     * @throws IOException If failed.
-     */
-    private void send(Object data, int writeLen) throws IOException {
+    /** {@inheritDoc} */
+    @Override protected void send(Object data, int writeLen) throws 
IOException {
         assert Thread.holdsLock(mux);
         assert data instanceof ByteBuffer || data instanceof DataInput;
 
@@ -449,20 +290,20 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                 }
 
                 if (data instanceof ByteBuffer)
-                    ((ByteBuffer) data).get(remainder, remainderDataLen, 
writeLen);
+                    ((ByteBuffer)data).get(remainder, remainderDataLen, 
writeLen);
                 else
-                    ((DataInput) data).readFully(remainder, remainderDataLen, 
writeLen);
+                    ((DataInput)data).readFully(remainder, remainderDataLen, 
writeLen);
 
                 remainderDataLen += writeLen;
             }
             else {
                 if (data instanceof ByteBuffer) {
-                    remainder = igfsCtx.data().storeDataBlocks(fileInfo, 
fileInfo.length() + space, remainder,
-                        remainderDataLen, (ByteBuffer) data, false, 
streamRange, batch);
+                    remainder = igfsCtx.data().storeDataBlocks(fileInfo, 
length() + space, remainder,
+                        remainderDataLen, (ByteBuffer)data, false, 
streamRange, batch);
                 }
                 else {
-                    remainder = igfsCtx.data().storeDataBlocks(fileInfo, 
fileInfo.length() + space, remainder,
-                        remainderDataLen, (DataInput) data, writeLen, false, 
streamRange, batch);
+                    remainder = igfsCtx.data().storeDataBlocks(fileInfo, 
length() + space, remainder,
+                        remainderDataLen, (DataInput)data, writeLen, false, 
streamRange, batch);
                 }
 
                 remainderDataLen = remainder == null ? 0 : remainder.length;
@@ -474,12 +315,17 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     }
 
     /**
-     * Allocate new buffer.
+     * Await acknowledgments.
      *
-     * @return New buffer.
+     * @throws IOException If failed.
      */
-    private ByteBuffer allocateNewBuffer() {
-        return ByteBuffer.allocate(bufSize);
+    private void awaitAcks() throws IOException {
+        try {
+            igfsCtx.data().awaitAllAcksReceived(fileInfo.id());
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException("Failed to wait for flush acknowledge: " + 
fileInfo.id, e);
+        }
     }
 
     /**
@@ -516,41 +362,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
         return affKey == null ? null : new IgfsFileAffinityRange(off, off, 
affKey);
     }
 
-    /**
-     * Optimize buffer size.
-     *
-     * @param bufSize Requested buffer size.
-     * @param fileInfo File info.
-     * @return Optimized buffer size.
-     */
-    private static int optimizeBufferSize(int bufSize, IgfsEntryInfo fileInfo) 
{
-        assert bufSize > 0;
-
-        if (fileInfo == null)
-            return bufSize;
-
-        int blockSize = fileInfo.blockSize();
-
-        if (blockSize <= 0)
-            return bufSize;
-
-        if (bufSize <= blockSize)
-            // Optimize minimum buffer size to be equal file's block size.
-            return blockSize;
-
-        int maxBufSize = blockSize * MAX_BLOCKS_CNT;
-
-        if (bufSize > maxBufSize)
-            // There is no profit or optimization from larger buffers.
-            return maxBufSize;
-
-        if (fileInfo.length() == 0)
-            // Make buffer size multiple of block size (optimized for new 
files).
-            return bufSize / blockSize * blockSize;
-
-        return bufSize;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgfsOutputStreamImpl.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamProxyImpl.java
new file mode 100644
index 0000000..7b74a1f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamProxyImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Output stream to store data into grid cache with separate blocks.
+ */
+class IgfsOutputStreamProxyImpl extends IgfsAbstractOutputStream {
+    /** File info. */
+    private IgfsFile info;
+
+    /**
+     * Constructs file output stream.
+     *
+     * @param igfsCtx IGFS context.
+     * @param path Path to stored file.
+     * @param info File info.
+     * @param bufSize The size of the buffer to be used.
+     * @param batch Optional secondary file system batch.
+     */
+    IgfsOutputStreamProxyImpl(IgfsContext igfsCtx, IgfsPath path, IgfsFile 
info, int bufSize,
+        @Nullable IgfsFileWorkerBatch batch) {
+        super(igfsCtx, path, bufSize, batch);
+
+        assert batch != null;
+
+        this.info = info;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int optimizeBufferSize(int bufSize) {
+        assert bufSize > 0;
+
+        return bufSize;
+    }
+
+    /**
+     * Flushes this output stream and forces any buffered output bytes to be 
written out.
+     *
+     * @throws IOException if an I/O error occurs.
+     */
+    @Override public void flush() throws IOException {
+        synchronized (mux) {
+            checkClosed(null, 0);
+
+            sendBufferIfNotEmpty();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void close() throws IOException {
+        synchronized (mux) {
+            // Do nothing if stream is already closed.
+            if (closed)
+                return;
+
+            // Set closed flag immediately.
+            closed = true;
+
+            // Flush data.
+            IOException err = null;
+
+            try {
+                sendBufferIfNotEmpty();
+            }
+            catch (Exception e) {
+                err = new IOException("Failed to flush data during stream 
close [path=" + path +
+                    ", fileInfo=" + info + ']', e);
+            }
+
+            // Finish batch before file unlocking to support the assertion 
that unlocked file batch,
+            // if any, must be in finishing state (e.g. append see more 
IgfsImpl.newBatch)
+            batch.finish();
+
+            // Finally, await secondary file system flush.
+            try {
+                batch.await();
+            }
+            catch (IgniteCheckedException e) {
+                if (err == null)
+                    err = new IOException("Failed to close secondary file 
system stream [path=" + path +
+                        ", fileInfo=" + info + ']', e);
+                else
+                    err.addSuppressed(e);
+            }
+
+            // Throw error, if any.
+            if (err != null)
+                throw err;
+
+            updateMetricsOnClose();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void send(Object data, int writeLen) throws 
IOException {
+        assert Thread.holdsLock(mux);
+        assert data instanceof ByteBuffer || data instanceof DataInput;
+
+        try {
+            // Increment metrics.
+            bytes += writeLen;
+
+            byte [] dataBuf = new byte[writeLen];
+
+            if (data instanceof ByteBuffer) {
+                ByteBuffer byteBuf = (ByteBuffer)data;
+
+                byteBuf.get(dataBuf);
+            }
+            else {
+                DataInput dataIn = (DataInput)data;
+
+                try {
+                    dataIn.readFully(dataBuf);
+                }
+                catch (IOException e) {
+                    throw new IgniteCheckedException(e);
+                }
+            }
+
+            if (!batch.write(dataBuf))
+                throw new IgniteCheckedException("Cannot write more data to 
the secondary file system output " +
+                    "stream because it was marked as closed: " + batch.path());
+            else
+                igfsCtx.metrics().addWriteBlocks(1, 1);
+
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException("Failed to store data into file: " + path, 
e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsOutputStreamProxyImpl.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java
index 3f62cf5..14a653b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java
@@ -904,7 +904,7 @@ public abstract class IgfsAbstractBaseSelfTest extends 
IgfsCommonAbstractTest {
     protected void clear(IgniteFileSystem igfs, 
IgfsSecondaryFileSystemTestAdapter igfsSecondary) throws Exception {
         clear(igfs);
 
-        if (dual)
+        if (mode != PRIMARY)
             clear(igfsSecondary);
     }
 

Reply via email to