This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b861813  IGNITE-11685 Java thin client: Handle multiple async requests 
in parallel - Fixes #6595.
b861813 is described below

commit b861813bad67ec2fd4a545975d47120da535ff23
Author: Aleksey Plekhanov <plehanov.a...@gmail.com>
AuthorDate: Fri Jul 12 16:04:22 2019 +0300

    IGNITE-11685 Java thin client: Handle multiple async requests in parallel - 
Fixes #6595.
---
 .../ignite/client/ClientConnectionException.java   |  18 +-
 .../ignite/internal/client/thin/ClientChannel.java |  17 +-
 .../client/thin/ClientFieldsQueryPager.java        |   7 +-
 .../internal/client/thin/ClientQueryPager.java     |   7 +-
 .../ignite/internal/client/thin/ClientUtils.java   |  11 +-
 .../internal/client/thin/GenericQueryPager.java    |  42 ++--
 .../client/thin/PayloadInputChannel.java}          |  39 +--
 .../internal/client/thin/PayloadOutputChannel.java |  62 +++++
 .../internal/client/thin/ProtocolVersion.java      |   9 +
 .../internal/client/thin/ReliableChannel.java      | 121 ++++-----
 .../internal/client/thin/TcpClientCache.java       |  98 +++++---
 .../internal/client/thin/TcpClientChannel.java     | 276 +++++++++++++++++----
 .../internal/client/thin/TcpIgniteClient.java      |  56 ++---
 .../org/apache/ignite/client/AsyncChannelTest.java | 198 +++++++++++++++
 .../org/apache/ignite/client/ClientTestSuite.java  |   3 +-
 15 files changed, 717 insertions(+), 247 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java
 
b/modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java
index 1ec096c..58ca153 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java
@@ -24,22 +24,22 @@ public class ClientConnectionException extends 
ClientException {
     /** Serial version uid. */
     private static final long serialVersionUID = 0L;
 
-    /** Message. */
-    private static final String MSG = "Ignite cluster is unavailable";
-
     /**
-     * Default constructor.
+     * Constructs a new exception with the specified detail message.
+     *
+     * @param msg the detail message.
      */
-    public ClientConnectionException() {
-        super(MSG);
+    public ClientConnectionException(String msg) {
+        super(msg);
     }
 
     /**
-     * Constructs a new exception with the specified cause.
+     * Constructs a new exception with the specified cause and detail message.
      *
+     * @param msg the detail message.
      * @param cause the cause.
      */
-    public ClientConnectionException(Throwable cause) {
-        super(MSG, cause);
+    public ClientConnectionException(String msg, Throwable cause) {
+        super(msg, cause);
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
index 9e97b34..de7062e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java
@@ -19,30 +19,23 @@ package org.apache.ignite.internal.client.thin;
 
 import java.util.function.Consumer;
 import java.util.function.Function;
-import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
-import org.apache.ignite.client.ClientConnectionException;
 import org.apache.ignite.client.ClientAuthorizationException;
+import org.apache.ignite.client.ClientConnectionException;
 
 /**
  * Processing thin client requests and responses.
  */
 interface ClientChannel extends AutoCloseable {
     /**
+     * Send request and handle response for client operation.
+     *
      * @param op Operation.
      * @param payloadWriter Payload writer to stream or {@code null} if 
request has no payload.
-     * @return Request ID.
-     */
-    public long send(ClientOperation op, Consumer<BinaryOutputStream> 
payloadWriter) throws ClientConnectionException;
-
-    /**
-     * @param op Operation.
-     * @param reqId ID of the request to receive the response for.
      * @param payloadReader Payload reader from stream.
      * @return Received operation payload or {@code null} if response has no 
payload.
      */
-    public <T> T receive(ClientOperation op, long reqId, 
Function<BinaryInputStream, T> payloadReader)
-        throws ClientConnectionException, ClientAuthorizationException;
+    public <T> T service(ClientOperation op, Consumer<PayloadOutputChannel> 
payloadWriter,
+        Function<PayloadInputChannel, T> payloadReader) throws 
ClientConnectionException, ClientAuthorizationException;
 
     /**
      * @return Server version.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
index 82a4ce7..f7c080f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.function.Consumer;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
 
 /**
  * Fields query pager.
@@ -42,7 +41,7 @@ class ClientFieldsQueryPager extends 
GenericQueryPager<List<?>> implements Field
         ReliableChannel ch,
         ClientOperation qryOp,
         ClientOperation pageQryOp,
-        Consumer<BinaryOutputStream> qryWriter,
+        Consumer<PayloadOutputChannel> qryWriter,
         boolean keepBinary,
         ClientBinaryMarshaller marsh
     ) {
@@ -54,7 +53,9 @@ class ClientFieldsQueryPager extends 
GenericQueryPager<List<?>> implements Field
     }
 
     /** {@inheritDoc} */
-    @Override Collection<List<?>> readEntries(BinaryInputStream in) {
+    @Override Collection<List<?>> readEntries(PayloadInputChannel payloadCh) {
+        BinaryInputStream in = payloadCh.in();
+
         if (!hasFirstPage())
             fieldNames = new ArrayList<>(ClientUtils.collection(in, ignored -> 
(String)serDes.readObject(in, keepBinary)));
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryPager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryPager.java
index 1a373b4..995bdaf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryPager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryPager.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.function.Consumer;
 import javax.cache.Cache;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
 
 /**
  * Client query pager.
@@ -38,7 +37,7 @@ class ClientQueryPager<K, V> extends 
GenericQueryPager<Cache.Entry<K, V>> {
         ReliableChannel ch,
         ClientOperation qryOp,
         ClientOperation pageQryOp,
-        Consumer<BinaryOutputStream> qryWriter,
+        Consumer<PayloadOutputChannel> qryWriter,
         boolean keepBinary,
         ClientBinaryMarshaller marsh
     ) {
@@ -50,7 +49,9 @@ class ClientQueryPager<K, V> extends 
GenericQueryPager<Cache.Entry<K, V>> {
     }
 
     /** {@inheritDoc} */
-    @Override Collection<Cache.Entry<K, V>> readEntries(BinaryInputStream in) {
+    @Override Collection<Cache.Entry<K, V>> readEntries(PayloadInputChannel 
paloadCh) {
+        BinaryInputStream in = paloadCh.in();
+
         return ClientUtils.collection(
             in,
             ignored -> new ClientCacheEntry<>(serDes.readObject(in, 
keepBinary), serDes.readObject(in, keepBinary))
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java
index 8e2d61a..3175cfc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java
@@ -55,9 +55,8 @@ import org.apache.ignite.internal.binary.BinarySchema;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
-import 
org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
 
-import static 
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext.VER_1_2_0;
+import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_2_0;
 
 /**
  * Shared serialization/deserialization utils.
@@ -234,7 +233,7 @@ final class ClientUtils {
     }
 
     /** Serialize configuration to stream. */
-    void cacheConfiguration(ClientCacheConfiguration cfg, BinaryOutputStream 
out, ClientListenerProtocolVersion ver) {
+    void cacheConfiguration(ClientCacheConfiguration cfg, BinaryOutputStream 
out, ProtocolVersion ver) {
         try (BinaryRawWriterEx writer = new 
BinaryWriterExImpl(marsh.context(), out, null, null)) {
             int origPos = out.position();
 
@@ -313,7 +312,7 @@ final class ClientUtils {
                                 w.writeBoolean(qf.isNotNull());
                                 w.writeObject(qf.getDefaultValue());
 
-                                if (ver.compareTo(VER_1_2_0) >= 0) {
+                                if (ver.compareTo(V1_2_0) >= 0) {
                                     w.writeInt(qf.getPrecision());
                                     w.writeInt(qf.getScale());
                                 }
@@ -349,7 +348,7 @@ final class ClientUtils {
     }
 
     /** Deserialize configuration from stream. */
-    ClientCacheConfiguration cacheConfiguration(BinaryInputStream in, 
ClientListenerProtocolVersion ver)
+    ClientCacheConfiguration cacheConfiguration(BinaryInputStream in, 
ProtocolVersion ver)
         throws IOException {
         try (BinaryReaderExImpl reader = new 
BinaryReaderExImpl(marsh.context(), in, null, true)) {
             reader.readInt(); // Do not need length to read data. The protocol 
defines fixed configuration layout.
@@ -394,7 +393,7 @@ final class ClientUtils {
                             .setKeyFieldName(reader.readString())
                             .setValueFieldName(reader.readString());
 
-                        boolean isCliVer1_2 = ver.compareTo(VER_1_2_0) >= 0;
+                        boolean isCliVer1_2 = ver.compareTo(V1_2_0) >= 0;
 
                         Collection<QueryField> qryFields = 
ClientUtils.collection(
                             in,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
index 30c73cf..90ab568 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java
@@ -21,12 +21,9 @@ import java.util.Collection;
 import java.util.function.Consumer;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.ClientReconnectedException;
-import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
-import org.apache.ignite.internal.processors.platform.client.ClientStatus;
 
 /**
- * Generic query pager. Override {@link this#readResult(BinaryInputStream)} to 
make it specific.
+ * Generic query pager. Override {@link this#readResult(PayloadInputChannel)} 
to make it specific.
  */
 abstract class GenericQueryPager<T> implements QueryPager<T> {
     /** Query op. */
@@ -36,7 +33,7 @@ abstract class GenericQueryPager<T> implements QueryPager<T> {
     private final ClientOperation pageQryOp;
 
     /** Query writer. */
-    private final Consumer<BinaryOutputStream> qryWriter;
+    private final Consumer<PayloadOutputChannel> qryWriter;
 
     /** Channel. */
     private final ReliableChannel ch;
@@ -50,12 +47,15 @@ abstract class GenericQueryPager<T> implements 
QueryPager<T> {
     /** Cursor id. */
     private Long cursorId = null;
 
+    /** Client channel on first query page. */
+    private ClientChannel clientCh;
+
     /** Constructor. */
     GenericQueryPager(
         ReliableChannel ch,
         ClientOperation qryOp,
         ClientOperation pageQryOp,
-        Consumer<BinaryOutputStream> qryWriter
+        Consumer<PayloadOutputChannel> qryWriter
     ) {
         this.ch = ch;
         this.qryOp = qryOp;
@@ -75,7 +75,7 @@ abstract class GenericQueryPager<T> implements QueryPager<T> {
     @Override public void close() throws Exception {
         // Close cursor only if the server has more pages: the server closes 
cursor automatically on last page
         if (cursorId != null && hasNext)
-            ch.request(ClientOperation.RESOURCE_CLOSE, req -> 
req.writeLong(cursorId));
+            ch.request(ClientOperation.RESOURCE_CLOSE, req -> 
req.out().writeLong(cursorId));
     }
 
     /** {@inheritDoc} */
@@ -95,6 +95,8 @@ abstract class GenericQueryPager<T> implements QueryPager<T> {
         hasNext = true;
 
         cursorId = null;
+
+        clientCh = null;
     }
 
     /**
@@ -102,12 +104,12 @@ abstract class GenericQueryPager<T> implements 
QueryPager<T> {
      * cursor ID and trailing "has next page" flag.
      * Use {@link this#hasFirstPage} flag to differentiate between the initial 
query and page query responses.
      */
-    abstract Collection<T> readEntries(BinaryInputStream in);
+    abstract Collection<T> readEntries(PayloadInputChannel in);
 
     /** */
-    private Collection<T> readResult(BinaryInputStream in) {
+    private Collection<T> readResult(PayloadInputChannel payloadCh) {
         if (!hasFirstPage) {
-            long resCursorId = in.readLong();
+            long resCursorId = payloadCh.in().readLong();
 
             if (cursorId != null) {
                 if (cursorId != resCursorId)
@@ -115,13 +117,16 @@ abstract class GenericQueryPager<T> implements 
QueryPager<T> {
                         String.format("Expected cursor [%s] but received 
cursor [%s]", cursorId, resCursorId)
                     );
             }
-            else
+            else {
                 cursorId = resCursorId;
+
+                clientCh = payloadCh.clientChannel();
+            }
         }
 
-        Collection<T> res = readEntries(in);
+        Collection<T> res = readEntries(payloadCh);
 
-        hasNext = in.readBoolean();
+        hasNext = payloadCh.in().readBoolean();
 
         hasFirstPage = true;
 
@@ -130,16 +135,13 @@ abstract class GenericQueryPager<T> implements 
QueryPager<T> {
 
     /** Get page. */
     private Collection<T> queryPage() throws ClientException {
-        try {
-            return ch.service(pageQryOp, req -> req.writeLong(cursorId), 
this::readResult);
-        }
-        catch (ClientServerError ex) {
-            if (ex.getCode() == ClientStatus.RESOURCE_DOES_NOT_EXIST) {
+        return ch.service(pageQryOp, req -> {
+            if (clientCh != req.clientChannel()) {
                 throw new ClientReconnectedException("Client was reconnected 
in the middle of results fetch, " +
                     "query results can be inconsistent, please retry the 
query.");
             }
 
-            throw ex;
-        }
+            req.out().writeLong(cursorId);
+        }, this::readResult);
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java
similarity index 54%
copy from 
modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java
copy to 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java
index 1ec096c..76af7f2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java
@@ -15,31 +15,40 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.client;
+package org.apache.ignite.internal.client.thin;
+
+import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 
 /**
- * Indicates all the Ignite servers specified in the client configuration are 
no longer available.
+ * Thin client payload input channel.
  */
-public class ClientConnectionException extends ClientException {
-    /** Serial version uid. */
-    private static final long serialVersionUID = 0L;
+class PayloadInputChannel {
+    /** Client channel. */
+    private final ClientChannel ch;
+
+    /** Input stream. */
+    private final BinaryInputStream in;
 
-    /** Message. */
-    private static final String MSG = "Ignite cluster is unavailable";
+    /**
+     * Constructor.
+     */
+    PayloadInputChannel(ClientChannel ch, byte[] payload) {
+        in = new BinaryHeapInputStream(payload);
+        this.ch = ch;
+    }
 
     /**
-     * Default constructor.
+     * Gets client channel.
      */
-    public ClientConnectionException() {
-        super(MSG);
+    public ClientChannel clientChannel() {
+        return ch;
     }
 
     /**
-     * Constructs a new exception with the specified cause.
-     *
-     * @param cause the cause.
+     * Gets input stream.
      */
-    public ClientConnectionException(Throwable cause) {
-        super(MSG, cause);
+    public BinaryInputStream in() {
+        return in;
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadOutputChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadOutputChannel.java
new file mode 100644
index 0000000..cd2b29c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadOutputChannel.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+
+/**
+ * Thin client payload output channel.
+ */
+class PayloadOutputChannel implements AutoCloseable {
+    /** Initial output stream buffer capacity. */
+    private static final int INITIAL_BUFFER_CAPACITY = 1024;
+
+    /** Client channel. */
+    private final ClientChannel ch;
+
+    /** Output stream. */
+    private final BinaryOutputStream out;
+
+    /**
+     * Constructor.
+     */
+    PayloadOutputChannel(ClientChannel ch) {
+        out = new BinaryHeapOutputStream(INITIAL_BUFFER_CAPACITY);
+        this.ch = ch;
+    }
+
+    /**
+     * Gets client channel.
+     */
+    public ClientChannel clientChannel() {
+        return ch;
+    }
+
+    /**
+     * Gets output stream.
+     */
+    public BinaryOutputStream out() {
+        return out;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        out.close();
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
index 2e84e36..aaf7eed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
@@ -19,6 +19,15 @@ package org.apache.ignite.internal.client.thin;
 
 /** Thin client protocol version. */
 public final class ProtocolVersion implements Comparable<ProtocolVersion> {
+    /** Protocol version: 1.2.0. */
+    public static final ProtocolVersion V1_2_0 = new ProtocolVersion((short)1, 
(short)2, (short)0);
+
+    /** Protocol version: 1.1.0. */
+    public static final ProtocolVersion V1_1_0 = new ProtocolVersion((short)1, 
(short)1, (short)0);
+
+    /** Protocol version 1.0.0. */
+    public static final ProtocolVersion V1_0_0 = new ProtocolVersion((short)1, 
(short)0, (short)0);
+
     /** Major. */
     private final short major;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index 8edcc16..2e8deef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -24,8 +24,6 @@ import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -35,10 +33,9 @@ import org.apache.ignite.client.ClientConnectionException;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.ClientConnectorConfiguration;
-import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
 import org.apache.ignite.internal.util.HostAndPortRange;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Adds failover abd thread-safety to {@link ClientChannel}.
@@ -47,8 +44,8 @@ final class ReliableChannel implements AutoCloseable {
     /** Raw channel. */
     private final Function<ClientChannelConfiguration, Result<ClientChannel>> 
chFactory;
 
-    /** Service lock. */
-    private final Lock svcLock = new ReentrantLock();
+    /** Servers count. */
+    private final int srvCnt;
 
     /** Primary server. */
     private InetSocketAddress primary;
@@ -57,11 +54,14 @@ final class ReliableChannel implements AutoCloseable {
     private final Deque<InetSocketAddress> backups = new LinkedList<>();
 
     /** Channel. */
-    private ClientChannel ch = null;
+    private ClientChannel ch;
 
     /** Ignite config. */
     private final ClientConfiguration clientCfg;
 
+    /** Channel is closed. */
+    private boolean closed;
+
     /**
      * Constructor.
      */
@@ -80,11 +80,13 @@ final class ReliableChannel implements AutoCloseable {
 
         List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
 
+        srvCnt = addrs.size();
+
         primary = addrs.get(new Random().nextInt(addrs.size())); // we already 
verified there is at least one address
 
         for (InetSocketAddress a : addrs) {
             if (a != primary)
-                this.backups.add(a);
+                backups.add(a);
         }
 
         ClientConnectionException lastEx = null;
@@ -97,7 +99,7 @@ final class ReliableChannel implements AutoCloseable {
             } catch (ClientConnectionException e) {
                 lastEx = e;
 
-                changeServer();
+                rollAddress();
             }
         }
 
@@ -105,7 +107,9 @@ final class ReliableChannel implements AutoCloseable {
     }
 
     /** {@inheritDoc} */
-    @Override public void close() throws Exception {
+    @Override public synchronized void close() throws Exception {
+        closed = true;
+
         if (ch != null) {
             ch.close();
 
@@ -114,58 +118,40 @@ final class ReliableChannel implements AutoCloseable {
     }
 
     /**
-     * Send request and handle response. The method is synchronous and 
single-threaded.
+     * Send request and handle response.
      */
     public <T> T service(
         ClientOperation op,
-        Consumer<BinaryOutputStream> payloadWriter,
-        Function<BinaryInputStream, T> payloadReader
+        Consumer<PayloadOutputChannel> payloadWriter,
+        Function<PayloadInputChannel, T> payloadReader
     ) throws ClientException {
         ClientConnectionException failure = null;
 
-        T res = null;
-
-        int totalSrvs = 1 + backups.size();
-
-        svcLock.lock();
-        try {
-            for (int i = 0; i < totalSrvs; i++) {
-                try {
-                    if (ch == null)
-                        ch = chFactory.apply(new 
ClientChannelConfiguration(clientCfg).setAddress(primary)).get();
+        for (int i = 0; i < srvCnt; i++) {
+            ClientChannel ch = null;
 
-                    long id = ch.send(op, payloadWriter);
-
-                    res = ch.receive(op, id, payloadReader);
-
-                    failure = null;
+            try {
+                ch = channel();
 
-                    break;
-                }
-                catch (ClientConnectionException e) {
-                    if (failure == null)
-                        failure = e;
-                    else
-                        failure.addSuppressed(e);
+                return ch.service(op, payloadWriter, payloadReader);
+            }
+            catch (ClientConnectionException e) {
+                if (failure == null)
+                    failure = e;
+                else
+                    failure.addSuppressed(e);
 
-                    changeServer();
-                }
+                changeServer(ch);
             }
         }
-        finally {
-            svcLock.unlock();
-        }
-
-        if (failure != null)
-            throw failure;
 
-        return res;
+        throw failure;
     }
 
     /**
      * Send request without payload and handle response.
      */
-    public <T> T service(ClientOperation op, Function<BinaryInputStream, T> 
payloadReader)
+    public <T> T service(ClientOperation op, Function<PayloadInputChannel, T> 
payloadReader)
         throws ClientException {
         return service(op, null, payloadReader);
     }
@@ -173,18 +159,11 @@ final class ReliableChannel implements AutoCloseable {
     /**
      * Send request and handle response without payload.
      */
-    public void request(ClientOperation op, Consumer<BinaryOutputStream> 
payloadWriter) throws ClientException {
+    public void request(ClientOperation op, Consumer<PayloadOutputChannel> 
payloadWriter) throws ClientException {
         service(op, payloadWriter, null);
     }
 
     /**
-     * @return Server version.
-     */
-    public ProtocolVersion serverVersion() {
-        return ch.serverVersion();
-    }
-
-    /**
      * @return host:port_range address lines parsed as {@link 
InetSocketAddress}.
      */
     private static List<InetSocketAddress> parseAddresses(String[] addrs) 
throws ClientException {
@@ -216,19 +195,41 @@ final class ReliableChannel implements AutoCloseable {
     }
 
     /** */
-    private void changeServer() {
+    private synchronized ClientChannel channel() {
+        if (closed)
+            throw new ClientException("Channel is closed");
+
+        if (ch == null) {
+            try {
+                ch = chFactory.apply(new 
ClientChannelConfiguration(clientCfg).setAddress(primary)).get();
+            }
+            catch (ClientConnectionException e) {
+                rollAddress();
+
+                throw e;
+            }
+        }
+
+        return ch;
+    }
+
+    /** */
+    private void rollAddress() {
         if (!backups.isEmpty()) {
             backups.addLast(primary);
 
             primary = backups.removeFirst();
         }
+    }
 
-        try {
-            ch.close();
-        }
-        catch (Exception ignored) {
-        }
+    /** */
+    private synchronized void changeServer(ClientChannel oldCh) {
+        if (oldCh == ch && ch != null) {
+            rollAddress();
+
+            U.closeQuiet(ch);
 
-        ch = null;
+            ch = null;
+        }
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index 3e3204c..5771b72 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -40,7 +40,6 @@ import 
org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
 
 import static java.util.AbstractMap.SimpleEntry;
-import static 
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext.DEFAULT_VER;
 
 /**
  * Implementation of {@link ClientCache} over TCP protocol.
@@ -83,7 +82,7 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_GET,
             req -> {
                 writeCacheInfo(req);
-                serDes.writeObject(req, key);
+                writeObject(req, key);
             },
             this::readObject
         );
@@ -101,8 +100,8 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_PUT,
             req -> {
                 writeCacheInfo(req);
-                serDes.writeObject(req, key);
-                serDes.writeObject(req, val);
+                writeObject(req, key);
+                writeObject(req, val);
             }
         );
     }
@@ -116,9 +115,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_CONTAINS_KEY,
             req -> {
                 writeCacheInfo(req);
-                serDes.writeObject(req, key);
+                writeObject(req, key);
             },
-            BinaryInputStream::readBoolean
+            res -> res.in().readBoolean()
         );
     }
 
@@ -134,7 +133,7 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             this::writeCacheInfo,
             res -> {
                 try {
-                    return serDes.cacheConfiguration(res, DEFAULT_VER);
+                    return serDes.cacheConfiguration(res.in(), 
res.clientChannel().serverVersion());
                 }
                 catch (IOException e) {
                     return null;
@@ -149,9 +148,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_GET_SIZE,
             req -> {
                 writeCacheInfo(req);
-                ClientUtils.collection(peekModes, req, (out, m) -> 
out.writeByte((byte)m.ordinal()));
+                ClientUtils.collection(peekModes, req.out(), (out, m) -> 
out.writeByte((byte)m.ordinal()));
             },
-            res -> (int)res.readLong()
+            res -> (int)res.in().readLong()
         );
     }
 
@@ -167,10 +166,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_GET_ALL,
             req -> {
                 writeCacheInfo(req);
-                ClientUtils.collection(keys, req, serDes::writeObject);
+                ClientUtils.collection(keys, req.out(), serDes::writeObject);
             },
             res -> ClientUtils.collection(
-                res,
+                res.in(),
                 in -> new SimpleEntry<K, V>(readObject(in), readObject(in))
             )
         ).stream().collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue));
@@ -190,7 +189,7 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
                 writeCacheInfo(req);
                 ClientUtils.collection(
                     map.entrySet(),
-                    req,
+                    req.out(),
                     (out, e) -> {
                         serDes.writeObject(out, e.getKey());
                         serDes.writeObject(out, e.getValue());
@@ -214,11 +213,11 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_REPLACE_IF_EQUALS,
             req -> {
                 writeCacheInfo(req);
-                serDes.writeObject(req, key);
-                serDes.writeObject(req, oldVal);
-                serDes.writeObject(req, newVal);
+                writeObject(req, key);
+                writeObject(req, oldVal);
+                writeObject(req, newVal);
             },
-            BinaryInputStream::readBoolean
+            res -> res.in().readBoolean()
         );
     }
 
@@ -234,10 +233,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_REPLACE,
             req -> {
                 writeCacheInfo(req);
-                serDes.writeObject(req, key);
-                serDes.writeObject(req, val);
+                writeObject(req, key);
+                writeObject(req, val);
             },
-            BinaryInputStream::readBoolean
+            res -> res.in().readBoolean()
         );
     }
 
@@ -250,9 +249,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_REMOVE_KEY,
             req -> {
                 writeCacheInfo(req);
-                serDes.writeObject(req, key);
+                writeObject(req, key);
             },
-            BinaryInputStream::readBoolean
+            res -> res.in().readBoolean()
         );
     }
 
@@ -268,10 +267,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_REMOVE_IF_EQUALS,
             req -> {
                 writeCacheInfo(req);
-                serDes.writeObject(req, key);
-                serDes.writeObject(req, oldVal);
+                writeObject(req, key);
+                writeObject(req, oldVal);
             },
-            BinaryInputStream::readBoolean
+            res -> res.in().readBoolean()
         );
     }
 
@@ -287,7 +286,7 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_REMOVE_KEYS,
             req -> {
                 writeCacheInfo(req);
-                ClientUtils.collection(keys, req, serDes::writeObject);
+                ClientUtils.collection(keys, req.out(), serDes::writeObject);
             }
         );
     }
@@ -309,8 +308,8 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_GET_AND_PUT,
             req -> {
                 writeCacheInfo(req);
-                serDes.writeObject(req, key);
-                serDes.writeObject(req, val);
+                writeObject(req, key);
+                writeObject(req, val);
             },
             this::readObject
         );
@@ -325,7 +324,7 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_GET_AND_REMOVE,
             req -> {
                 writeCacheInfo(req);
-                serDes.writeObject(req, key);
+                writeObject(req, key);
             },
             this::readObject
         );
@@ -343,8 +342,8 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_GET_AND_REPLACE,
             req -> {
                 writeCacheInfo(req);
-                serDes.writeObject(req, key);
-                serDes.writeObject(req, val);
+                writeObject(req, key);
+                writeObject(req, val);
             },
             this::readObject
         );
@@ -362,10 +361,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
             ClientOperation.CACHE_PUT_IF_ABSENT,
             req -> {
                 writeCacheInfo(req);
-                serDes.writeObject(req, key);
-                serDes.writeObject(req, val);
+                writeObject(req, key);
+                writeObject(req, val);
             },
-            BinaryInputStream::readBoolean
+            res -> res.in().readBoolean()
         );
     }
 
@@ -425,9 +424,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         if (qry == null)
             throw new NullPointerException("qry");
 
-        Consumer<BinaryOutputStream> qryWriter = out -> {
-            writeCacheInfo(out);
-            serDes.write(qry, out);
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            writeCacheInfo(payloadCh);
+            serDes.write(qry, payloadCh.out());
         };
 
         return new ClientFieldsQueryCursor<>(new ClientFieldsQueryPager(
@@ -442,8 +441,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
 
     /** Handle scan query. */
     private QueryCursor<Cache.Entry<K, V>> scanQuery(ScanQuery<K, V> qry) {
-        Consumer<BinaryOutputStream> qryWriter = out -> {
-            writeCacheInfo(out);
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            writeCacheInfo(payloadCh);
+
+            BinaryOutputStream out = payloadCh.out();
 
             if (qry.getFilter() == null)
                 out.writeByte(GridBinaryMarshaller.NULL);
@@ -469,8 +470,11 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
 
     /** Handle SQL query. */
     private QueryCursor<Cache.Entry<K, V>> sqlQuery(SqlQuery qry) {
-        Consumer<BinaryOutputStream> qryWriter = out -> {
-            writeCacheInfo(out);
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            writeCacheInfo(payloadCh);
+
+            BinaryOutputStream out = payloadCh.out();
+
             serDes.writeObject(out, qry.getType());
             serDes.writeObject(out, qry.getSql());
             ClientUtils.collection(qry.getArgs(), out, serDes::writeObject);
@@ -492,7 +496,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
     }
 
     /** Write cache ID and flags. */
-    private void writeCacheInfo(BinaryOutputStream out) {
+    private void writeCacheInfo(PayloadOutputChannel payloadCh) {
+        BinaryOutputStream out = payloadCh.out();
+
         out.writeInt(cacheId);
         out.writeByte((byte)(keepBinary ? 1 : 0));
     }
@@ -501,4 +507,14 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
     private <T> T readObject(BinaryInputStream in) {
         return serDes.readObject(in, keepBinary);
     }
+
+    /** */
+    private <T> T readObject(PayloadInputChannel payloadCh) {
+        return readObject(payloadCh.in());
+    }
+
+    /** */
+    private void writeObject(PayloadOutputChannel payloadCh, Object obj) {
+        serDes.writeObject(payloadCh.out(), obj);
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index d6097f2..66930e0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.client.thin;
 
+import java.io.DataInput;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -34,7 +35,11 @@ import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -49,42 +54,45 @@ import javax.net.ssl.SSLSocketFactory;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 import javax.net.ssl.X509TrustManager;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.client.ClientAuthenticationException;
 import org.apache.ignite.client.ClientAuthorizationException;
 import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.SslMode;
 import org.apache.ignite.client.SslProtocol;
 import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.binary.BinaryReaderExImpl;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
 import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.binary.streams.BinaryOffheapOutputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
 import org.apache.ignite.internal.processors.platform.client.ClientStatus;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_0_0;
+import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_1_0;
+import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_2_0;
 
 /**
  * Implements {@link ClientChannel} over TCP.
  */
 class TcpClientChannel implements ClientChannel {
-    /** Protocol version: 1.2.0. */
-    private static final ProtocolVersion V1_2_0 = new 
ProtocolVersion((short)1, (short)2, (short)0);
-    
-    /** Protocol version: 1.1.0. */
-    private static final ProtocolVersion V1_1_0 = new 
ProtocolVersion((short)1, (short)1, (short)0);
-
-    /** Protocol version 1 0 0. */
-    private static final ProtocolVersion V1_0_0 = new 
ProtocolVersion((short)1, (short)0, (short)0);
-
     /** Supported protocol versions. */
     private static final Collection<ProtocolVersion> supportedVers = 
Arrays.asList(
-        V1_2_0, 
+        V1_2_0,
         V1_1_0, 
         V1_0_0
     );
 
+    /** Timeout before next attempt to lock channel and process next response 
by current thread. */
+    private static final long PAYLOAD_WAIT_TIMEOUT = 10L;
+
     /** Protocol version agreed with the server. */
     private ProtocolVersion ver = V1_2_0;
 
@@ -97,9 +105,24 @@ class TcpClientChannel implements ClientChannel {
     /** Input stream. */
     private final InputStream in;
 
+    /** Data input. */
+    private final DataInput dataInput;
+
+    /** Total bytes read by channel. */
+    private long totalBytesRead;
+
     /** Request id. */
     private final AtomicLong reqId = new AtomicLong(1);
 
+    /** Send lock. */
+    private final Lock sndLock = new ReentrantLock();
+
+    /** Receive lock. */
+    private final Lock rcvLock = new ReentrantLock();
+
+    /** Pending requests. */
+    private final Map<Long, ClientRequestFuture> pendingReqs = new 
ConcurrentHashMap<>();
+
     /** Constructor. */
     TcpClientChannel(ClientChannelConfiguration cfg) throws 
ClientConnectionException, ClientAuthenticationException {
         validateConfiguration(cfg);
@@ -109,9 +132,13 @@ class TcpClientChannel implements ClientChannel {
 
             out = sock.getOutputStream();
             in = sock.getInputStream();
+
+            GridUnsafeDataInput dis = new GridUnsafeDataInput();
+            dis.inputStream(in);
+            dataInput = dis;
         }
         catch (IOException e) {
-            throw new ClientConnectionException(e);
+            throw handleIOError("addr=" + cfg.getAddress(), e);
         }
 
         handshake(cfg.getUserName(), cfg.getUserPassword());
@@ -122,71 +149,157 @@ class TcpClientChannel implements ClientChannel {
         in.close();
         out.close();
         sock.close();
+
+        for (ClientRequestFuture pendingReq : pendingReqs.values())
+            pendingReq.onDone(new ClientConnectionException("Channel is 
closed"));
     }
 
     /** {@inheritDoc} */
-    @Override public long send(ClientOperation op, 
Consumer<BinaryOutputStream> payloadWriter)
+    @Override public <T> T service(ClientOperation op, 
Consumer<PayloadOutputChannel> payloadWriter,
+        Function<PayloadInputChannel, T> payloadReader) throws 
ClientConnectionException, ClientAuthorizationException {
+        long id = send(op, payloadWriter);
+
+        return receive(id, payloadReader);
+    }
+
+    /**
+     * @param op Operation.
+     * @param payloadWriter Payload writer to stream or {@code null} if 
request has no payload.
+     * @return Request ID.
+     */
+    private long send(ClientOperation op, Consumer<PayloadOutputChannel> 
payloadWriter)
         throws ClientConnectionException {
         long id = reqId.getAndIncrement();
 
-        try (BinaryOutputStream req = new BinaryHeapOutputStream(1024)) {
-            req.writeInt(0); // reserve an integer for the request size
+        // Only one thread at a time can have access to write to the channel.
+        sndLock.lock();
+
+        try (PayloadOutputChannel payloadCh = new PayloadOutputChannel(this)) {
+            pendingReqs.put(id, new ClientRequestFuture());
+
+            BinaryOutputStream req = payloadCh.out();
+
+            req.writeInt(0); // Reserve an integer for the request size.
             req.writeShort(op.code());
             req.writeLong(id);
 
             if (payloadWriter != null)
-                payloadWriter.accept(req);
+                payloadWriter.accept(payloadCh);
 
-            req.writeInt(0, req.position() - 4); // actual size
+            req.writeInt(0, req.position() - 4); // Actual size.
 
             write(req.array(), req.position());
         }
+        catch (Throwable t) {
+            pendingReqs.remove(id);
+
+            throw t;
+        }
+        finally {
+            sndLock.unlock();
+        }
 
         return id;
     }
 
-    /** {@inheritDoc} */
-    @Override public <T> T receive(ClientOperation op, long reqId, 
Function<BinaryInputStream, T> payloadReader)
+    /**
+     * @param reqId ID of the request to receive the response for.
+     * @param payloadReader Payload reader from stream.
+     * @return Received operation payload or {@code null} if response has no 
payload.
+     */
+    private <T> T receive(long reqId, Function<PayloadInputChannel, T> 
payloadReader)
         throws ClientConnectionException, ClientAuthorizationException {
+        ClientRequestFuture pendingReq = pendingReqs.get(reqId);
+
+        assert pendingReq != null : "Pending request future not found for 
request " + reqId;
+
+        // Each thread creates a future on request sent and returns a response 
when this future is completed.
+        // Only one thread at a time can have access to read from the channel. 
This thread reads the next available
+        // response and complete corresponding future. All other concurrent 
threads wait for their own futures with
+        // a timeout and periodically try to lock the channel to process the 
next response.
+        try {
+            while (true) {
+                if (rcvLock.tryLock()) {
+                    try {
+                        if (!pendingReq.isDone())
+                            processNextResponse();
+                    }
+                    finally {
+                        rcvLock.unlock();
+                    }
+                }
+
+                try {
+                    byte[] payload = pendingReq.get(PAYLOAD_WAIT_TIMEOUT);
+
+                    if (payload == null || payloadReader == null)
+                        return null;
 
-        final int MIN_RES_SIZE = 8 + 4; // minimal response size: long (8 
bytes) ID + int (4 bytes) status
+                    return payloadReader.apply(new PayloadInputChannel(this, 
payload));
+                }
+                catch (IgniteFutureTimeoutCheckedException ignore) {
+                    // Next cycle if timed out.
+                }
+            }
+        }
+        catch (IgniteCheckedException e) {
+            if (e.getCause() instanceof ClientError)
+                throw (ClientError)e.getCause();
+
+            if (e.getCause() instanceof ClientException)
+                throw (ClientException)e.getCause();
+
+            throw new ClientException(e.getMessage(), e);
+        }
+        finally {
+            pendingReqs.remove(reqId);
+        }
+    }
 
-        int resSize = new BinaryHeapInputStream(read(4)).readInt();
+    /**
+     * Process next response from the input stream and complete corresponding 
future.
+     */
+    private void processNextResponse() throws ClientProtocolError, 
ClientConnectionException {
+        int resSize = readInt();
 
-        if (resSize < 0)
+        if (resSize <= 0)
             throw new ClientProtocolError(String.format("Invalid response 
size: %s", resSize));
 
-        if (resSize == 0)
-            return null;
+        long bytesReadOnStartReq = totalBytesRead;
+
+        long resId = readLong();
+
+        ClientRequestFuture pendingReq = pendingReqs.get(resId);
 
-        BinaryInputStream resIn = new 
BinaryHeapInputStream(read(MIN_RES_SIZE));
+        if (pendingReq == null)
+            throw new ClientProtocolError(String.format("Unexpected response 
ID [%s]", resId));
 
-        long resId = resIn.readLong();
+        int status;
 
-        if (resId != reqId)
-            throw new ClientProtocolError(String.format("Unexpected response 
ID [%s], [%s] was expected", resId, reqId));
+        BinaryInputStream resIn;
 
-        int status = resIn.readInt();
+        status = readInt();
 
-        if (status != 0) {
-            resIn = new BinaryHeapInputStream(read(resSize - MIN_RES_SIZE));
+        int hdrSize = (int)(totalBytesRead - bytesReadOnStartReq);
+
+        if (status == 0) {
+            if (resSize <= hdrSize)
+                pendingReq.onDone();
+            else
+                pendingReq.onDone(read(resSize - hdrSize));
+        }
+        else {
+            resIn = new BinaryHeapInputStream(read(resSize - hdrSize));
 
             String err = new BinaryReaderExImpl(null, resIn, null, 
true).readString();
 
             switch (status) {
                 case ClientStatus.SECURITY_VIOLATION:
-                    throw new ClientAuthorizationException();
+                    pendingReq.onDone(new ClientAuthorizationException());
                 default:
-                    throw new ClientServerError(err, status, reqId);
+                    pendingReq.onDone(new ClientServerError(err, status, 
resId));
             }
         }
-
-        if (resSize <= MIN_RES_SIZE || payloadReader == null)
-            return null;
-
-        BinaryInputStream payload = new BinaryHeapInputStream(read(resSize - 
MIN_RES_SIZE));
-
-        return payloadReader.apply(payload);
     }
 
     /** {@inheritDoc} */
@@ -249,7 +362,7 @@ class TcpClientChannel implements ClientChannel {
 
     /** Send handshake request. */
     private void handshakeReq(String user, String pwd) throws 
ClientConnectionException {
-        try (BinaryOutputStream req = new BinaryOffheapOutputStream(32)) {
+        try (BinaryOutputStream req = new BinaryHeapOutputStream(32)) {
             req.writeInt(0); // reserve an integer for the request size
             req.writeByte((byte)1); // handshake code, always 1
             req.writeShort(ver.major());
@@ -271,7 +384,7 @@ class TcpClientChannel implements ClientChannel {
     /** Receive and handle handshake response. */
     private void handshakeRes(String user, String pwd)
         throws ClientConnectionException, ClientAuthenticationException {
-        int resSize = new BinaryHeapInputStream(read(4)).readInt();
+        int resSize = readInt();
 
         if (resSize <= 0)
             throw new ClientProtocolError(String.format("Invalid handshake 
response size: %s", resSize));
@@ -310,7 +423,7 @@ class TcpClientChannel implements ClientChannel {
                 }
             }
             catch (IOException e) {
-                throw new ClientConnectionException(e);
+                throw handleIOError(e);
             }
         }
     }
@@ -326,18 +439,68 @@ class TcpClientChannel implements ClientChannel {
                 bytesNum = in.read(bytes, readBytesNum, len - readBytesNum);
             }
             catch (IOException e) {
-                throw new ClientConnectionException(e);
+                throw handleIOError(e);
             }
 
             if (bytesNum < 0)
-                throw new ClientConnectionException();
+                throw handleIOError(null);
 
             readBytesNum += bytesNum;
         }
 
+        totalBytesRead += readBytesNum;
+
         return bytes;
     }
 
+    /**
+     * Read long value from input stream.
+     */
+    private long readLong() {
+        try {
+            long val = dataInput.readLong();
+
+            totalBytesRead += Long.BYTES;
+
+            return val;
+        }
+        catch (IOException e) {
+            throw handleIOError(e);
+        }
+    }
+
+    /**
+     * Read int value from input stream.
+     */
+    private int readInt() {
+        try {
+            int val = dataInput.readInt();
+
+            totalBytesRead += Integer.BYTES;
+
+            return val;
+        }
+        catch (IOException e) {
+            throw handleIOError(e);
+        }
+    }
+
+    /**
+     * Read short value from input stream.
+     */
+    private short readShort() {
+        try {
+            short val = dataInput.readShort();
+
+            totalBytesRead += Short.BYTES;
+
+            return val;
+        }
+        catch (IOException e) {
+            throw handleIOError(e);
+        }
+    }
+
     /** Write bytes to the output stream. */
     private void write(byte[] bytes, int len) throws ClientConnectionException 
{
         try {
@@ -345,10 +508,31 @@ class TcpClientChannel implements ClientChannel {
             out.flush();
         }
         catch (IOException e) {
-            throw new ClientConnectionException(e);
+            throw handleIOError(e);
         }
     }
 
+    /**
+     * @param ex IO exception (cause).
+     */
+    private ClientException handleIOError(@Nullable IOException ex) {
+        return handleIOError("sock=" + sock, ex);
+    }
+
+    /**
+     * @param chInfo Additional channel info
+     * @param ex IO exception (cause).
+     */
+    private ClientException handleIOError(String chInfo, @Nullable IOException 
ex) {
+        return new ClientConnectionException("Ignite cluster is unavailable [" 
+ chInfo + ']', ex);
+    }
+
+    /**
+     *
+     */
+    private static class ClientRequestFuture extends GridFutureAdapter<byte[]> 
{
+    }
+
     /** SSL Socket Factory. */
     private static class ClientSslSocketFactory {
         /** Trust manager ignoring all certificate checks. */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index 5c1275b..4dbcb93 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -47,7 +47,6 @@ import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
-import 
org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.marshaller.MarshallerContext;
@@ -103,7 +102,7 @@ public class TcpIgniteClient implements IgniteClient {
     @Override public <K, V> ClientCache<K, V> getOrCreateCache(String name) 
throws ClientException {
         ensureCacheName(name);
 
-        ch.request(ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME, req -> 
writeString(name, req));
+        ch.request(ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME, req -> 
writeString(name, req.out()));
 
         return new TcpClientCache<>(name, ch, marsh);
     }
@@ -114,7 +113,7 @@ public class TcpIgniteClient implements IgniteClient {
         ensureCacheConfiguration(cfg);
 
         ch.request(ClientOperation.CACHE_GET_OR_CREATE_WITH_CONFIGURATION, 
-            req -> serDes.cacheConfiguration(cfg, req, 
toClientVersion(ch.serverVersion())));
+            req -> serDes.cacheConfiguration(cfg, req.out(), 
req.clientChannel().serverVersion()));
 
         return new TcpClientCache<>(cfg.getName(), ch, marsh);
     }
@@ -128,21 +127,21 @@ public class TcpIgniteClient implements IgniteClient {
 
     /** {@inheritDoc} */
     @Override public Collection<String> cacheNames() throws ClientException {
-        return ch.service(ClientOperation.CACHE_GET_NAMES, res -> 
Arrays.asList(BinaryUtils.doReadStringArray(res)));
+        return ch.service(ClientOperation.CACHE_GET_NAMES, res -> 
Arrays.asList(BinaryUtils.doReadStringArray(res.in())));
     }
 
     /** {@inheritDoc} */
     @Override public void destroyCache(String name) throws ClientException {
         ensureCacheName(name);
 
-        ch.request(ClientOperation.CACHE_DESTROY, req -> 
req.writeInt(ClientUtils.cacheId(name)));
+        ch.request(ClientOperation.CACHE_DESTROY, req -> 
req.out().writeInt(ClientUtils.cacheId(name)));
     }
 
     /** {@inheritDoc} */
     @Override public <K, V> ClientCache<K, V> createCache(String name) throws 
ClientException {
         ensureCacheName(name);
 
-        ch.request(ClientOperation.CACHE_CREATE_WITH_NAME, req -> 
writeString(name, req));
+        ch.request(ClientOperation.CACHE_CREATE_WITH_NAME, req -> 
writeString(name, req.out()));
 
         return new TcpClientCache<>(name, ch, marsh);
     }
@@ -152,21 +151,11 @@ public class TcpIgniteClient implements IgniteClient {
         ensureCacheConfiguration(cfg);
 
         ch.request(ClientOperation.CACHE_CREATE_WITH_CONFIGURATION, 
-            req -> serDes.cacheConfiguration(cfg, req, 
toClientVersion(ch.serverVersion())));
+            req -> serDes.cacheConfiguration(cfg, req.out(), 
req.clientChannel().serverVersion()));
 
         return new TcpClientCache<>(cfg.getName(), ch, marsh);
     }
 
-    /**
-     * Converts {@link ProtocolVersion} to {@link 
ClientListenerProtocolVersion}.
-     *
-     * @param srvVer Server protocol version.
-     * @return Client protocol version.
-     */
-    private ClientListenerProtocolVersion toClientVersion(ProtocolVersion 
srvVer) {
-        return ClientListenerProtocolVersion.create(srvVer.major(), 
srvVer.minor(), srvVer.patch());
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteBinary binary() {
         return binary;
@@ -177,7 +166,9 @@ public class TcpIgniteClient implements IgniteClient {
         if (qry == null)
             throw new NullPointerException("qry");
 
-        Consumer<BinaryOutputStream> qryWriter = out -> {
+        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
+            BinaryOutputStream out = payloadCh.out();
+
             out.writeInt(0); // no cache ID
             out.writeByte((byte)1); // keep binary
             serDes.write(qry, out);
@@ -249,7 +240,7 @@ public class TcpIgniteClient implements IgniteClient {
                 try {
                     ch.request(
                         ClientOperation.PUT_BINARY_TYPE,
-                        req -> 
serDes.binaryMetadata(((BinaryTypeImpl)meta).metadata(), req)
+                        req -> 
serDes.binaryMetadata(((BinaryTypeImpl)meta).metadata(), req.out())
                     );
                 }
                 catch (ClientException e) {
@@ -285,10 +276,10 @@ public class TcpIgniteClient implements IgniteClient {
                 try {
                     meta = ch.service(
                         ClientOperation.GET_BINARY_TYPE,
-                        req -> req.writeInt(typeId),
+                        req -> req.out().writeInt(typeId),
                         res -> {
                             try {
-                                return res.readBoolean() ? 
serDes.binaryMetadata(res) : null;
+                                return res.in().readBoolean() ? 
serDes.binaryMetadata(res.in()) : null;
                             }
                             catch (IOException e) {
                                 throw new BinaryObjectException(e);
@@ -341,12 +332,14 @@ public class TcpIgniteClient implements IgniteClient {
                 try {
                     res = ch.service(
                         ClientOperation.REGISTER_BINARY_TYPE_NAME,
-                        req -> {
-                            req.writeByte(platformId);
-                            req.writeInt(typeId);
-                            writeString(clsName, req);
+                        payloadCh -> {
+                            BinaryOutputStream out = payloadCh.out();
+
+                            out.writeByte(platformId);
+                            out.writeInt(typeId);
+                            writeString(clsName, out);
                         },
-                        BinaryInputStream::readBoolean
+                        payloadCh -> payloadCh.in().readBoolean()
                     );
                 }
                 catch (ClientException e) {
@@ -361,9 +354,8 @@ public class TcpIgniteClient implements IgniteClient {
         }
 
         /** {@inheritDoc} */
-        @Override
         @Deprecated
-        public boolean registerClassName(byte platformId, int typeId, String 
clsName) throws IgniteCheckedException {
+        @Override public boolean registerClassName(byte platformId, int 
typeId, String clsName) throws IgniteCheckedException {
             return registerClassName(platformId, typeId, clsName, false);
         }
 
@@ -398,10 +390,12 @@ public class TcpIgniteClient implements IgniteClient {
                     clsName = ch.service(
                         ClientOperation.GET_BINARY_TYPE_NAME,
                         req -> {
-                            req.writeByte(platformId);
-                            req.writeInt(typeId);
+                            BinaryOutputStream out = req.out();
+
+                            out.writeByte(platformId);
+                            out.writeInt(typeId);
                         },
-                        TcpIgniteClient.this::readString
+                        res -> readString(res.in())
                     );
                 }
                 catch (ClientException e) {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/client/AsyncChannelTest.java 
b/modules/core/src/test/java/org/apache/ignite/client/AsyncChannelTest.java
new file mode 100644
index 0000000..543321c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/client/AsyncChannelTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.Query;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Async channel tests.
+ */
+public class AsyncChannelTest extends GridCommonAbstractTest {
+    /** Nodes count. */
+    private static final int NODES_CNT = 3;
+
+    /** Threads count. */
+    private static final int THREADS_CNT = 25;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "tx_cache";
+
+    /** Client connector address. */
+    private static final String CLIENT_CONN_ADDR = "127.0.0.1:" + 
ClientConnectorConfiguration.DFLT_PORT;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return 
super.getConfiguration(igniteInstanceName).setCacheConfiguration(
+            new 
CacheConfiguration(CACHE_NAME).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(NODES_CNT);
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * Test that client channel works in async mode.
+     */
+    @Test
+    public void testAsyncRequests() throws Exception {
+        try (IgniteClient client = Ignition.startClient(new 
ClientConfiguration().setAddresses(CLIENT_CONN_ADDR))) {
+            Ignite ignite = grid(0);
+
+            IgniteCache<Integer, Integer> igniteCache = 
ignite.cache(CACHE_NAME);
+            ClientCache<Integer, Integer> clientCache = 
client.cache(CACHE_NAME);
+
+            clientCache.clear();
+
+            Lock keyLock = igniteCache.lock(0);
+
+            IgniteInternalFuture fut;
+
+            keyLock.lock();
+
+            try {
+                CountDownLatch latch = new CountDownLatch(1);
+
+                fut = GridTestUtils.runAsync(() -> {
+                    latch.countDown();
+
+                    // This request is blocked until we explicitly unlock key 
in another thread.
+                    clientCache.put(0, 0);
+
+                    clientCache.put(1, 1);
+
+                    assertEquals(10, clientCache.size(CachePeekMode.PRIMARY));
+                });
+
+                latch.await();
+
+                for (int i = 2; i < 10; i++) {
+                    clientCache.put(i, i);
+
+                    assertEquals((Integer)i, igniteCache.get(i));
+                    assertEquals((Integer)i, clientCache.get(i));
+                }
+
+                // Parallel thread must be blocked on key 0.
+                assertFalse(clientCache.containsKey(1));
+            }
+            finally {
+                keyLock.unlock();
+            }
+
+            fut.get();
+
+            assertTrue(clientCache.containsKey(1));
+        }
+    }
+
+    /**
+     * Test multiple concurrent async requests.
+     */
+    @Test
+    public void testConcurrentRequests() throws Exception {
+        try (IgniteClient client = Ignition.startClient(new 
ClientConfiguration().setAddresses(CLIENT_CONN_ADDR))) {
+            ClientCache<Integer, Integer> clientCache = 
client.cache(CACHE_NAME);
+
+            clientCache.clear();
+
+            AtomicInteger keyCnt = new AtomicInteger();
+
+            CyclicBarrier barrier = new CyclicBarrier(THREADS_CNT);
+
+            GridTestUtils.runMultiThreaded(() -> {
+                try {
+                    barrier.await();
+                }
+                catch (Exception e) {
+                    fail();
+                }
+
+                for (int i = 0; i < 100; i++) {
+                    int key = keyCnt.incrementAndGet();
+
+                    clientCache.put(key, key);
+
+                    assertEquals(key, (long)clientCache.get(key));
+                }
+
+            }, THREADS_CNT, "thin-client-thread");
+        }
+    }
+
+    /**
+     * Test multiple concurrent async queries.
+     */
+    @Test
+    public void testConcurrentQueries() throws Exception {
+        try (IgniteClient client = Ignition.startClient(new 
ClientConfiguration().setAddresses(CLIENT_CONN_ADDR))) {
+            ClientCache<Integer, Integer> clientCache = 
client.cache(CACHE_NAME);
+
+            clientCache.clear();
+
+            for (int i = 0; i < 10; i++)
+                clientCache.put(i, i);
+
+            CyclicBarrier barrier = new CyclicBarrier(THREADS_CNT);
+
+            GridTestUtils.runMultiThreaded(() -> {
+                try {
+                    barrier.await();
+                }
+                catch (Exception e) {
+                    fail();
+                }
+
+                for (int i = 0; i < 10; i++) {
+                    Query<Cache.Entry<Integer, String>> qry = new 
ScanQuery<Integer, String>().setPageSize(1);
+
+                    try (QueryCursor<Cache.Entry<Integer, String>> cur = 
clientCache.query(qry)) {
+                        int cacheSize = 
clientCache.size(CachePeekMode.PRIMARY);
+                        int curSize = cur.getAll().size();
+
+                        assertEquals(cacheSize, curSize);
+                    }
+                }
+            }, THREADS_CNT, "thin-client-thread");
+        }
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java 
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 0558b0a..80a77ff 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -36,7 +36,8 @@ import org.junit.runners.Suite;
     IgniteBinaryQueryTest.class,
     SslParametersTest.class,
     ConnectionTest.class,
-    ConnectToStartingNodeTest.class
+    ConnectToStartingNodeTest.class,
+    AsyncChannelTest.class
 })
 public class ClientTestSuite {
     // No-op.

Reply via email to