http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadStreamerWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadStreamerWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadStreamerWriter.java
new file mode 100644
index 0000000..3e5efd9
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadStreamerWriter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * A bulk load cache writer object that adds entries using {@link 
IgniteDataStreamer}.
+ */
+public class BulkLoadStreamerWriter extends BulkLoadCacheWriter {
+    /** Serialization version UID. */
+    private static final long serialVersionUID = 0L;
+
+    /** The streamer. */
+    private final IgniteDataStreamer<Object, Object> streamer;
+
+    /**
+     * A number of {@link IgniteDataStreamer#addData(Object, Object)} calls 
made,
+     * since we don't have any kind of result data back from the streamer.
+     */
+    private long updateCnt;
+
+    /**
+     * Creates a cache writer.
+     *
+     * @param streamer The streamer to use.
+     */
+    public BulkLoadStreamerWriter(IgniteDataStreamer<Object, Object> streamer) 
{
+        this.streamer = streamer;
+        updateCnt = 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void apply(IgniteBiTuple<?, ?> entry) {
+        streamer.addData(entry.getKey(), entry.getValue());
+
+        updateCnt++;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        streamer.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long updateCnt() {
+        return updateCnt;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CharsetDecoderBlock.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CharsetDecoderBlock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CharsetDecoderBlock.java
new file mode 100644
index 0000000..5b18def
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CharsetDecoderBlock.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload.pipeline;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteIllegalStateException;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
+import java.util.Arrays;
+
+/**
+ * A {@link PipelineBlock}, which converts stream of bytes supplied as byte[] 
arrays to an array of char[] using
+ * the specified encoding. Decoding errors (malformed input and unmappable 
characters) are to handled by dropping
+ * the erroneous input, appending the coder's replacement value to the output 
buffer, and resuming the coding operation.
+ */
+public class CharsetDecoderBlock extends PipelineBlock<byte[], char[]> {
+    /** Charset decoder */
+    private final CharsetDecoder charsetDecoder;
+
+    /** Leftover bytes (partial characters) from the last batch,
+     * or null if everything was processed. */
+    private byte[] leftover;
+
+    /** True once we've reached the end of input. */
+    private boolean isEndOfInput;
+
+    /**
+     * Creates charset decoder block.
+     *
+     * @param charset The charset encoding to decode bytes from.
+     */
+    public CharsetDecoderBlock(Charset charset) {
+        charsetDecoder = charset.newDecoder()
+            .onMalformedInput(CodingErrorAction.REPLACE)
+            .onUnmappableCharacter(CodingErrorAction.REPLACE);
+
+        isEndOfInput = false;
+        leftover = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void accept(byte[] data, boolean isLastAppend) throws 
IgniteCheckedException {
+        assert nextBlock != null;
+
+        assert !isEndOfInput : "convertBytes() called after end of input";
+
+        isEndOfInput = isLastAppend;
+
+        if (leftover == null && data.length == 0) {
+            nextBlock.accept(new char[0], isLastAppend);
+            return;
+        }
+
+        ByteBuffer dataBuf;
+
+        if (leftover == null)
+            dataBuf = ByteBuffer.wrap(data);
+        else {
+            dataBuf = ByteBuffer.allocate(leftover.length + data.length);
+
+            dataBuf.put(leftover)
+                   .put(data);
+
+            dataBuf.flip();
+
+            leftover = null;
+        }
+
+        int outBufLen = (int)Math.ceil(charsetDecoder.maxCharsPerByte() * 
(data.length + 1));
+
+        assert outBufLen > 0;
+
+        CharBuffer outBuf = CharBuffer.allocate(outBufLen);
+
+        for (;;) {
+            CoderResult res = charsetDecoder.decode(dataBuf, outBuf, 
isEndOfInput);
+
+            if (res.isUnderflow()) {
+                // End of input buffer reached. Either skip the partial 
character at the end or wait for the next batch.
+                if (!isEndOfInput && dataBuf.remaining() > 0)
+                    leftover = Arrays.copyOfRange(dataBuf.array(),
+                        dataBuf.arrayOffset() + dataBuf.position(), 
dataBuf.limit());
+
+                if (isEndOfInput)
+                    charsetDecoder.flush(outBuf); // See {@link 
CharsetDecoder} class javadoc for the protocol.
+
+                if (outBuf.position() > 0)
+                    nextBlock.accept(Arrays.copyOfRange(outBuf.array(), 
outBuf.arrayOffset(), outBuf.position()),
+                        isEndOfInput);
+
+                break;
+            }
+
+            if (res.isOverflow()) { // Not enough space in the output buffer, 
flush it and retry.
+                assert outBuf.position() > 0;
+
+                nextBlock.accept(Arrays.copyOfRange(outBuf.array(), 
outBuf.arrayOffset(), outBuf.position()),
+                    isEndOfInput);
+
+                outBuf.flip();
+
+                continue;
+            }
+
+            assert ! res.isMalformed() && ! res.isUnmappable();
+
+            // We're not supposed to reach this point with the current 
implementation.
+            // The code below will fire exception if Oracle implementation of 
CharsetDecoder will be changed in future.
+            throw new IgniteIllegalStateException("Unknown CharsetDecoder 
state");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java
new file mode 100644
index 0000000..5b2ee4b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload.pipeline;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.regex.Pattern;
+
+/**
+ * A {@link PipelineBlock}, which splits line according to CSV format rules 
and unquotes fields.
+ * The next block {@link PipelineBlock#accept(Object, boolean)} is called 
per-line.
+ */
+public class CsvLineProcessorBlock extends PipelineBlock<String, String[]> {
+    /** Field delimiter pattern. */
+    private final Pattern fldDelim;
+
+    /** Quote character. */
+    private final String quoteChars;
+
+    /**
+     * Creates a CSV line parser.
+     *
+     * @param fldDelim The pattern for the field delimiter.
+     * @param quoteChars Quoting character.
+     */
+    public CsvLineProcessorBlock(Pattern fldDelim, String quoteChars) {
+        this.fldDelim = fldDelim;
+        this.quoteChars = quoteChars;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void accept(String input, boolean isLastPortion) throws 
IgniteCheckedException {
+        // Currently we don't process quoted field delimiter properly, will be 
fixed in IGNITE-7537.
+        String[] fields = fldDelim.split(input);
+
+        for (int i = 0; i < fields.length; i++)
+            fields[i] = trim(fields[i]);
+
+        nextBlock.accept(fields, isLastPortion);
+    }
+
+    /**
+     * Trims quote characters from beginning and end of the line.
+     *
+     * @param str String to trim.
+     * @return The trimmed string.
+     */
+    @NotNull private String trim(String str) {
+        int startPos = quoteChars.indexOf(str.charAt(0)) != -1 ? 1 : 0;
+        int endPos = quoteChars.indexOf(str.charAt(str.length() - 1)) != -1 ? 
str.length() - 1 : str.length();
+
+        return str.substring(startPos, endPos);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/LineSplitterBlock.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/LineSplitterBlock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/LineSplitterBlock.java
new file mode 100644
index 0000000..122d0db
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/LineSplitterBlock.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload.pipeline;
+
+import org.apache.ignite.IgniteCheckedException;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A {@link PipelineBlock}, which splits input stream of char[] into lines 
using the specified {@link Pattern}
+ * as line separator. Next block {@link PipelineBlock#accept(Object, boolean)} 
is invoked for each line.
+ * Leftover characters are remembered and used during processing the next 
input batch,
+ * unless isLastPortion flag is specified.
+ */
+public class LineSplitterBlock extends PipelineBlock<char[], String> {
+    /** Line separator pattern */
+    private final Pattern delim;
+
+    /** Leftover characters from the previous invocation of {@link 
#accept(char[], boolean)}. */
+    private StringBuilder leftover = new StringBuilder();
+
+    /**
+     * Creates line splitter block.
+     *
+     * @param delim The line separator pattern.
+     */
+    public LineSplitterBlock(Pattern delim) {
+        this.delim = delim;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void accept(char[] chars, boolean isLastPortion) throws 
IgniteCheckedException {
+        leftover.append(chars);
+
+        String input = leftover.toString();
+        Matcher matcher = delim.matcher(input);
+
+        int lastPos = 0;
+        while (matcher.find()) {
+            String outStr = input.substring(lastPos, matcher.start());
+
+            if (!outStr.isEmpty())
+                nextBlock.accept(outStr, false);
+
+            lastPos = matcher.end();
+        }
+
+        if (lastPos != 0)
+            leftover.delete(0, lastPos);
+
+        if (isLastPortion && leftover.length() > 0) {
+            nextBlock.accept(leftover.toString(), true);
+            leftover.setLength(0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/PipelineBlock.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/PipelineBlock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/PipelineBlock.java
new file mode 100644
index 0000000..914b4b4
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/PipelineBlock.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload.pipeline;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A file parsing pipeline block. Accepts an portion of an input 
(isLastPortion flag is provided to signify the last
+ * block to process) and optionally calls the next block with transformed 
input or performs any other handling,
+ * such as storing input to internal structures.
+ */
+public abstract class PipelineBlock<I, O> {
+    /** The next block in pipeline or null if this block is a terminator. */
+    @Nullable protected PipelineBlock<O, ?> nextBlock;
+
+    /**
+     * Creates a pipeline block.
+     *
+     * <p>(There is no nextBlock argument in the constructor: setting the next 
block using
+     * {@link #append(PipelineBlock)} method is more convenient.
+     */
+    protected PipelineBlock() {
+        nextBlock = null;
+    }
+
+    /**
+     * Sets the next block in this block and returns the <b>next</b> block.
+     *
+     * <p>Below is an example of using this method to set up a pipeline:<br>
+     * {@code block1.append(block2).append(block3); }.
+     * <p>Block2 here becomes the next for block1, and block3 is the next one 
for the block2.
+     *
+     * @param next The next block for the current block.
+     * @return The next block ({@code next} argument).
+     */
+    public <N> PipelineBlock<O, N> append(PipelineBlock<O, N> next) {
+        nextBlock = next;
+        return next;
+    }
+
+    /**
+     * Accepts a portion of input. {@code isLastPortion} parameter should be 
set if this is a last portion
+     * of the input. The method must not be called after the end of input: the 
call with {@code isLastPortion == true}
+     * is the last one.
+     *
+     * @param inputPortion Portion of input.
+     * @param isLastPortion Is this the last portion.
+     */
+    public abstract void accept(I inputPortion, boolean isLastPortion) throws 
IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/StrListAppenderBlock.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/StrListAppenderBlock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/StrListAppenderBlock.java
new file mode 100644
index 0000000..91cbc1e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/StrListAppenderBlock.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload.pipeline;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The PipelineBlock which appends its input to a user-supplied list.
+ *
+ * <p>The list is set using {@link #output(List)} method.
+ */
+public class StrListAppenderBlock extends PipelineBlock<String[], Object> {
+    /** The output list. */
+    private List<List<Object>> output;
+
+    /**
+     * Creates the block. List can be configured using {@link #output(List)} 
method.
+     */
+    public StrListAppenderBlock() {
+        output = null;
+    }
+
+    /**
+     * Sets the output list.
+     *
+     * @param output The output list.
+     */
+    public void output(List<List<Object>> output) {
+        this.output = output;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void accept(String[] elements, boolean isLastPortion) {
+        output.add(Arrays.asList(elements));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java
new file mode 100644
index 0000000..8a170ab
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import 
org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * A reply from server to SQL COPY command, which is essentially a request 
from server to client
+ * to send files from client to server (see IGNITE-6917 for details).
+ *
+ * @see JdbcBulkLoadProcessor for the protocol.
+ * @see SqlBulkLoadCommand
+ */
+public class JdbcBulkLoadAckResult extends JdbcResult {
+    /** Query ID for matching this command on server in further {@link 
JdbcBulkLoadBatchRequest} commands. */
+    private long qryId;
+
+    /**
+     * Bulk load parameters, which are parsed on the server side and sent to 
client to specify
+     * what files to send, batch size, etc.
+     */
+    private BulkLoadAckClientParameters params;
+
+    /**Creates uninitialized bulk load batch request result. */
+    public JdbcBulkLoadAckResult() {
+        super(BULK_LOAD_ACK);
+
+        qryId = 0;
+        params = null;
+    }
+
+    /**
+     * Constructs a request from server (in form of reply) to send files from 
client to server.
+     *
+     * @param qryId Query ID to send in further {@link 
JdbcBulkLoadBatchRequest}s.
+     * @param params Various parameters for sending batches from client side.
+     */
+    public JdbcBulkLoadAckResult(long qryId, BulkLoadAckClientParameters 
params) {
+        super(BULK_LOAD_ACK);
+
+        this.qryId = qryId;
+        this.params = params;
+    }
+
+    /**
+     * Returns the query ID.
+     *
+     * @return Query ID.
+     */
+    public long queryId() {
+        return qryId;
+    }
+
+    /**
+     * Returns the parameters for the client.
+     *
+     * @return The parameters for the client.
+     */
+    public BulkLoadAckClientParameters params() {
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBinary(BinaryWriterExImpl writer) throws 
BinaryObjectException {
+        super.writeBinary(writer);
+
+        writer.writeLong(qryId);
+        writer.writeString(params.localFileName());
+        writer.writeInt(params.batchSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readBinary(BinaryReaderExImpl reader) throws 
BinaryObjectException {
+        super.readBinary(reader);
+
+        qryId = reader.readLong();
+
+        String locFileName = reader.readString();
+        int batchSize = reader.readInt();
+
+        if (!BulkLoadAckClientParameters.isValidBatchSize(batchSize))
+            throw new 
BinaryObjectException(BulkLoadAckClientParameters.batchSizeErrorMsg(batchSize));
+
+        params = new BulkLoadAckClientParameters(locFileName, batchSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(JdbcBulkLoadAckResult.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java
new file mode 100644
index 0000000..b75de5a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A JDBC request that sends a batch of a file to the server. Used when 
handling
+ * {@link SqlBulkLoadCommand} command.
+ */
+public class JdbcBulkLoadBatchRequest extends JdbcRequest {
+    /** A sentinel to indicate that {@link #cmd} field was not initialized. */
+    public static final int CMD_UNKNOWN = -1;
+
+    /** Next batch comes in this request and there are more batches. */
+    public static final int CMD_CONTINUE = 0;
+
+    /**
+     * This is the final batch from the client and there was an error on the 
client side,
+     * so terminate with error on the server side as well.
+     */
+    public static final int CMD_FINISHED_ERROR = 1;
+
+    /**
+     * This is the final batch of the file and everything went well on the 
client side.
+     * Server may complete the request.
+     */
+    public static final int CMD_FINISHED_EOF = 2;
+
+    /** QueryID of the original COPY command request. */
+    private long qryId;
+
+    /** Batch index starting from 0. */
+    private int batchIdx;
+
+    /** Command (see CMD_xxx constants above). */
+    private int cmd;
+
+    /** Data in this batch. */
+    @NotNull private byte[] data;
+
+    /**
+     * Creates the request with uninitialized parameters.
+     */
+    public JdbcBulkLoadBatchRequest() {
+        super(BULK_LOAD_BATCH);
+
+        qryId = -1;
+        batchIdx = -1;
+        cmd = CMD_UNKNOWN;
+        data = null;
+    }
+
+    /**
+     * Creates the request with specified parameters and zero-length data.
+     * Typically used with {@link #CMD_FINISHED_ERROR} and {@link 
#CMD_FINISHED_EOF}.
+     *
+     * @param qryId The query ID from the {@link JdbcBulkLoadAckResult}.
+     * @param batchIdx Index of the current batch starting with 0.
+     * @param cmd The command ({@link #CMD_CONTINUE}, {@link 
#CMD_FINISHED_EOF}, or {@link #CMD_FINISHED_ERROR}).
+     */
+    public JdbcBulkLoadBatchRequest(long qryId, int batchIdx, int cmd) {
+        this(qryId, batchIdx, cmd, new byte[0]);
+    }
+
+    /**
+     * Creates the request with the specified parameters.
+     *
+     * @param qryId The query ID from the {@link JdbcBulkLoadAckResult}.
+     * @param batchIdx Index of the current batch starting with 0.
+     * @param cmd The command ({@link #CMD_CONTINUE}, {@link 
#CMD_FINISHED_EOF}, or {@link #CMD_FINISHED_ERROR}).
+     * @param data The data block (zero length is acceptable).
+     */
+    public JdbcBulkLoadBatchRequest(long qryId, int batchIdx, int cmd, 
@NotNull byte[] data) {
+        super(BULK_LOAD_BATCH);
+
+        this.qryId = qryId;
+        this.batchIdx = batchIdx;
+
+        assert isCmdValid(cmd) : "Invalid command value: " + cmd;
+        this.cmd = cmd;
+
+        this.data = data;
+    }
+
+    /**
+     * Returns the original query ID.
+     *
+     * @return The original query ID.
+     */
+    public long queryId() {
+        return qryId;
+    }
+
+    /**
+     * Returns the batch index.
+     *
+     * @return The batch index.
+     */
+    public long batchIdx() {
+        return batchIdx;
+    }
+
+    /**
+     * Returns the command (see CMD_xxx constants for details).
+     *
+     * @return The command.
+     */
+    public int cmd() {
+        return cmd;
+    }
+
+    /**
+     * Returns the data.
+     *
+     * @return data if data was not supplied
+     */
+    @NotNull public byte[] data() {
+        return data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBinary(BinaryWriterExImpl writer) throws 
BinaryObjectException {
+        super.writeBinary(writer);
+
+        writer.writeLong(qryId);
+        writer.writeInt(batchIdx);
+        writer.writeInt(cmd);
+        writer.writeByteArray(data);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readBinary(BinaryReaderExImpl reader) throws 
BinaryObjectException {
+        super.readBinary(reader);
+
+        qryId = reader.readLong();
+        batchIdx = reader.readInt();
+
+        int c = reader.readInt();
+        if (!isCmdValid(c))
+            throw new BinaryObjectException("Invalid command: " + cmd);
+
+        cmd = c;
+
+        data = reader.readByteArray();
+        assert data != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(JdbcBulkLoadBatchRequest.class, this);
+    }
+
+    /**
+     * Checks if the command value is valid.
+     *
+     * @param c The command value to check.
+     * @return True if valid, false otherwise.
+     */
+    private static boolean isCmdValid(int c) {
+        return c >= CMD_CONTINUE && c <= CMD_FINISHED_EOF;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java
new file mode 100644
index 0000000..9757791
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+
+import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_CONTINUE;
+import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF;
+import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR;
+
+/**
+ * JDBC wrapper around {@link BulkLoadProcessor} that provides extra logic.
+ *
+ * Unlike other "single shot" request-reply commands, the
+ * COPY command the client-server interaction looks like this:
+ *
+ * <pre>
+ * Thin JDBC client                            Server
+ *        |                                       |
+ *        |------- JdbcQueryExecuteRequest ------>|
+ *        |         with SQL copy command         |
+ *        |                                       |
+ *        |<---- JdbcBulkLoadAckResult -----------|
+ *        | with BulkLoadAckClientParameters      |
+ *        | containing file name and batch size.  |
+ *        |                                       |
+ * (open the file,                                |
+ *  read portions and send them)                  |
+ *        |                                       |
+ *        |------- JdbcBulkLoadBatchRequest #1 -->|
+ *        | with a portion of input file.         |
+ *        |                                       |
+ *        |<--- JdbcQueryExecuteResult -----------|
+ *        | with current update counter.          |
+ *        |                                       |
+ *        |------- JdbcBulkLoadBatchRequest #2--->|
+ *        | with a portion of input file.         |
+ *        |                                       |
+ *        |<--- JdbcQueryExecuteResult -----------|
+ *        | with current update counter.          |
+ *        |                                       |
+ *        |------- JdbcBulkLoadBatchRequest #3--->|
+ *        | with the LAST portion of input file.  |
+ *        |                                       |
+ *        |<--- JdbcQueryExecuteResult -----------|
+ *        | with the final update counter.        |
+ *        |                                       |
+ * (close the file)                               |
+ *        |                                       |
+ * </pre>
+ *
+ * In case of input file reading error, a flag is carried to the server:
+ * {@link JdbcBulkLoadBatchRequest#CMD_FINISHED_ERROR} and the processing
+ * is aborted on the both sides.
+ */
+public class JdbcBulkLoadProcessor {
+    /** A core processor that handles incoming data packets. */
+    private final BulkLoadProcessor processor;
+
+    /** Next batch index (for a very simple check that all batches were 
delivered to us). */
+    protected long nextBatchIdx;
+
+    /**
+     * Creates a JDBC-specific adapter for bulk load processor.
+     *
+     * @param processor Bulk load processor from the core to delegate calls to.
+     */
+    public JdbcBulkLoadProcessor(BulkLoadProcessor processor) {
+        this.processor = processor;
+        nextBatchIdx = 0;
+    }
+
+    /**
+     * Completely processes a bulk load batch request.
+     *
+     * Calls {@link BulkLoadProcessor} wrapping around some JDBC-specific logic
+     * (commands, bulk load batch index checking).
+     *
+     * @param req The current request.
+     */
+    public void processBatch(JdbcBulkLoadBatchRequest req)
+        throws IgniteCheckedException {
+        if (nextBatchIdx != req.batchIdx())
+            throw new IgniteSQLException("Batch #" + (nextBatchIdx + 1) +
+                    " is missing. Received #" + req.batchIdx() + " instead.");
+
+        nextBatchIdx++;
+
+        switch (req.cmd()) {
+            case CMD_FINISHED_EOF:
+                processor.processBatch(req.data(), true);
+
+                break;
+
+            case CMD_CONTINUE:
+                processor.processBatch(req.data(), false);
+
+                break;
+
+            case CMD_FINISHED_ERROR:
+                break;
+
+            default:
+                throw new IgniteIllegalStateException("Command was not 
recognized: " + req.cmd());
+        }
+    }
+
+    /**
+     * Closes the underlying objects.
+     * Currently we don't handle normal termination vs. abort.
+     */
+    public void close() throws Exception {
+        processor.close();
+
+        nextBatchIdx = -1;
+    }
+
+    /**
+     * Provides update counter for sending in the {@link 
JdbcBatchExecuteResult}.
+     *
+     * @return The update counter for sending in {@link 
JdbcBatchExecuteResult}.
+     */
+    public long updateCnt() {
+        return processor.outputStreamer().updateCnt();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
index 385924c..22522ad 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java
@@ -60,6 +60,8 @@ public class JdbcRequest extends ClientListenerRequestNoId 
implements JdbcRawBin
     /** Get schemas metadata request. */
     static final byte META_SCHEMAS = 12;
 
+    /** Send a batch of a data from client to server. */
+    static final byte BULK_LOAD_BATCH = 13;
 
     /** Request type. */
     private byte type;
@@ -154,6 +156,11 @@ public class JdbcRequest extends ClientListenerRequestNoId 
implements JdbcRawBin
 
                 break;
 
+            case BULK_LOAD_BATCH:
+                req = new JdbcBulkLoadBatchRequest();
+
+                break;
+
             default:
                 throw new IgniteException("Unknown SQL listener request ID: 
[request ID=" + reqType + ']');
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 11b50ec..59fc06b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -32,11 +32,14 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.BulkLoadContextCursor;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteVersionUtils;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import 
org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
@@ -57,9 +60,13 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 
+import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_CONTINUE;
+import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF;
+import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_3_0;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_4_0;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC;
+import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BULK_LOAD_BATCH;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_COLUMNS;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_INDEXES;
 import static 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_PARAMS;
@@ -93,6 +100,9 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
     /** Current queries cursors. */
     private final ConcurrentHashMap<Long, JdbcQueryCursor> qryCursors = new 
ConcurrentHashMap<>();
 
+    /** Current bulk load processors. */
+    private final ConcurrentHashMap<Long, JdbcBulkLoadProcessor> 
bulkLoadRequests = new ConcurrentHashMap<>();
+
     /** Distributed joins flag. */
     private final boolean distributedJoins;
 
@@ -197,6 +207,9 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
 
                 case META_SCHEMAS:
                     return getSchemas((JdbcMetaSchemasRequest)req);
+
+                case BULK_LOAD_BATCH:
+                    return 
processBulkLoadFileBatch((JdbcBulkLoadBatchRequest)req);
             }
 
             return new JdbcResponse(IgniteQueryErrorCode.UNSUPPORTED_OPERATION,
@@ -207,6 +220,46 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
         }
     }
 
+    /**
+     * Processes a file batch sent from client as part of bulk load COPY 
command.
+     *
+     * @param req Request object with a batch of a file received from client.
+     * @return Response to send to the client.
+     */
+    private ClientListenerResponse 
processBulkLoadFileBatch(JdbcBulkLoadBatchRequest req) {
+        JdbcBulkLoadProcessor processor = bulkLoadRequests.get(req.queryId());
+
+        if (ctx == null)
+            return new JdbcResponse(IgniteQueryErrorCode.UNEXPECTED_OPERATION, 
"Unknown query ID: "
+                + req.queryId() + ". Bulk load session may have been reclaimed 
due to timeout.");
+
+        try {
+            processor.processBatch(req);
+
+            switch (req.cmd()) {
+                case CMD_FINISHED_ERROR:
+                case CMD_FINISHED_EOF:
+                    bulkLoadRequests.remove(req.queryId());
+
+                    processor.close();
+
+                    break;
+
+                case CMD_CONTINUE:
+                    break;
+
+                default:
+                    throw new IllegalArgumentException();
+            }
+
+            return new JdbcResponse(new JdbcQueryExecuteResult(req.queryId(), 
processor.updateCnt()));
+        }
+        catch (Exception e) {
+            U.error(null, "Error processing file batch", e);
+            return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server 
error: " + e);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public ClientListenerResponse handleException(Exception e, 
ClientListenerRequest req) {
         return exceptionToResult(e);
@@ -237,6 +290,17 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
             {
                 for (JdbcQueryCursor cursor : qryCursors.values())
                     cursor.close();
+
+                for (JdbcBulkLoadProcessor processor : 
bulkLoadRequests.values()) {
+                    try {
+                        processor.close();
+                    }
+                    catch (Exception e) {
+                        U.error(null, "Error closing JDBC bulk load 
processor.", e);
+                    }
+                }
+
+                bulkLoadRequests.clear();
             }
             finally {
                 busyLock.leaveBusy();
@@ -310,10 +374,22 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
             List<FieldsQueryCursor<List<?>>> results = 
ctx.query().querySqlFields(qry, true,
                 protocolVer.compareTo(VER_2_3_0) < 0);
 
-            if (results.size() == 1) {
-                FieldsQueryCursor<List<?>> qryCur = results.get(0);
+            FieldsQueryCursor<List<?>> fieldsCur = results.get(0);
 
-                JdbcQueryCursor cur = new JdbcQueryCursor(qryId, 
req.pageSize(), req.maxRows(), (QueryCursorImpl)qryCur);
+            if (fieldsCur instanceof BulkLoadContextCursor) {
+                BulkLoadContextCursor blCur = (BulkLoadContextCursor) 
fieldsCur;
+
+                BulkLoadProcessor blProcessor = blCur.bulkLoadProcessor();
+                BulkLoadAckClientParameters clientParams = 
blCur.clientParams();
+
+                bulkLoadRequests.put(qryId, new 
JdbcBulkLoadProcessor(blProcessor));
+
+                return new JdbcResponse(new JdbcBulkLoadAckResult(qryId, 
clientParams));
+            }
+
+            if (results.size() == 1) {
+                JdbcQueryCursor cur = new JdbcQueryCursor(qryId, 
req.pageSize(), req.maxRows(),
+                    (QueryCursorImpl)fieldsCur);
 
                 JdbcQueryExecuteResult res;
 
@@ -350,8 +426,7 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
                     if (qryCur.isQuery()) {
                         jdbcRes = new JdbcResultInfo(true, -1, qryId);
 
-                        JdbcQueryCursor cur = new JdbcQueryCursor(qryId, 
req.pageSize(), req.maxRows(),
-                            (QueryCursorImpl)qryCur);
+                        JdbcQueryCursor cur = new JdbcQueryCursor(qryId, 
req.pageSize(), req.maxRows(), qryCur);
 
                         qryCursors.put(qryId, cur);
 
@@ -370,8 +445,6 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
 
                 return new JdbcResponse(new 
JdbcQueryExecuteMultipleStatementsResult(jdbcResults, items, last));
             }
-
-
         }
         catch (Exception e) {
             qryCursors.remove(qryId);
@@ -534,6 +607,9 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
             List<FieldsQueryCursor<List<?>>> qryRes = 
ctx.query().querySqlFields(qry, true, true);
 
             for (FieldsQueryCursor<List<?>> cur : qryRes) {
+                if (cur instanceof BulkLoadContextCursor)
+                    throw new IgniteSQLException("COPY command cannot be 
executed in batch mode.");
+
                 assert !((QueryCursorImpl)cur).isQuery();
 
                 Iterator<List<?>> it = cur.iterator();

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
index 623a339..43631e9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
@@ -65,6 +65,9 @@ public class JdbcResult implements JdbcRawBinarylizable {
     /** Columns metadata result V3. */
     static final byte META_COLUMNS_V3 = 15;
 
+    /** A request to send file from client to server. */
+    static final byte BULK_LOAD_ACK = 16;
+
     /** Success status. */
     private byte type;
 
@@ -163,6 +166,11 @@ public class JdbcResult implements JdbcRawBinarylizable {
 
                 break;
 
+            case BULK_LOAD_ACK:
+                res = new JdbcBulkLoadAckResult();
+
+                break;
+
             default:
                 throw new IgniteException("Unknown SQL listener request ID: 
[request ID=" + resId + ']');
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java 
b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
index f5b8c3c..0238b01 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
@@ -42,6 +42,9 @@ public class SqlKeyword {
     /** Keyword: BOOL. */
     public static final String BOOL = "BOOL";
 
+    /** Keyword: BATCH_SIZE. */
+    public static final String BATCH_SIZE = "BATCH_SIZE";
+
     /** Keyword: BOOLEAN. */
     public static final String BOOLEAN = "BOOLEAN";
 
@@ -54,6 +57,9 @@ public class SqlKeyword {
     /** Keyword: CHARACTER. */
     public static final String CHARACTER = "CHARACTER";
 
+    /** Keyword: COPY. */
+    public static final String COPY = "COPY";
+
     /** Keyword: CREATE. */
     public static final String CREATE = "CREATE";
 
@@ -90,6 +96,12 @@ public class SqlKeyword {
     /** Keyword: FLOAT8. */
     public static final String FLOAT8 = "FLOAT8";
 
+    /** Keyword: FORMAT. */
+    public static final String FORMAT = "FORMAT";
+
+    /** Keyword: FROM. */
+    public static final String FROM = "FROM";
+
     /** Keyword: FULLTEXT. */
     public static final String FULLTEXT = "FULLTEXT";
 
@@ -120,6 +132,9 @@ public class SqlKeyword {
     /** Keyword: INTEGER. */
     public static final String INTEGER = "INTEGER";
 
+    /** Keyword: INTO. */
+    public static final String INTO = "INTO";
+
     /** Keyword: KEY. */
     public static final String KEY = "KEY";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java 
b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java
index 401ee98..0627def 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java
@@ -18,12 +18,14 @@
 package org.apache.ignite.internal.sql;
 
 import org.apache.ignite.internal.sql.command.SqlAlterTableCommand;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
 import org.apache.ignite.internal.sql.command.SqlCommand;
 import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand;
 import org.apache.ignite.internal.sql.command.SqlDropIndexCommand;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.sql.SqlKeyword.ALTER;
+import static org.apache.ignite.internal.sql.SqlKeyword.COPY;
 import static org.apache.ignite.internal.sql.SqlKeyword.CREATE;
 import static org.apache.ignite.internal.sql.SqlKeyword.DROP;
 import static org.apache.ignite.internal.sql.SqlKeyword.HASH;
@@ -103,6 +105,11 @@ public class SqlParser {
 
                             break;
 
+                        case COPY:
+                            cmd = processCopy();
+
+                            break;
+
                         case ALTER:
                             cmd = processAlter();
                     }
@@ -115,7 +122,7 @@ public class SqlParser {
                         return cmd;
                     }
                     else
-                        throw errorUnexpectedToken(lex, CREATE, DROP, ALTER);
+                        throw errorUnexpectedToken(lex, CREATE, DROP, COPY, 
ALTER);
 
                 case QUOTED:
                 case MINUS:
@@ -130,6 +137,15 @@ public class SqlParser {
     }
 
     /**
+     * Processes COPY command.
+     *
+     * @return The {@link SqlBulkLoadCommand} command.
+     */
+    private SqlCommand processCopy() {
+        return new SqlBulkLoadCommand().parse(lex);
+    }
+
+    /**
      * Process CREATE keyword.
      *
      * @return Command.

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java
new file mode 100644
index 0000000..e5246d5
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.command;
+
+import org.apache.ignite.internal.processors.bulkload.BulkLoadCsvFormat;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadFormat;
+import 
org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.sql.SqlKeyword;
+import org.apache.ignite.internal.sql.SqlLexer;
+import org.apache.ignite.internal.sql.SqlLexerTokenType;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.ignite.internal.sql.SqlParserUtils.error;
+import static org.apache.ignite.internal.sql.SqlParserUtils.parseIdentifier;
+import static org.apache.ignite.internal.sql.SqlParserUtils.parseInt;
+import static 
org.apache.ignite.internal.sql.SqlParserUtils.parseQualifiedIdentifier;
+import static 
org.apache.ignite.internal.sql.SqlParserUtils.skipCommaOrRightParenthesis;
+import static org.apache.ignite.internal.sql.SqlParserUtils.skipIfMatches;
+import static 
org.apache.ignite.internal.sql.SqlParserUtils.skipIfMatchesKeyword;
+
+/**
+ * A parser for a COPY command (called 'bulk load' in the code, since word 
'copy' is too generic).
+ */
+public class SqlBulkLoadCommand implements SqlCommand {
+    /** Local file name to send from client to server. */
+    private String locFileName;
+
+    /** Schema name + table name. */
+    private SqlQualifiedName tblQName;
+
+    /** User-specified list of columns. */
+    private List<String> cols;
+
+    /** File format. */
+    private BulkLoadFormat inputFormat;
+
+    /** Batch size (size of portion of a file sent in each sub-request). */
+    private Integer batchSize;
+
+    /**
+     * Parses the command.
+     *
+     * @param lex The lexer.
+     * @return The parsed command object.
+     */
+    @Override public SqlCommand parse(SqlLexer lex) {
+        skipIfMatchesKeyword(lex, SqlKeyword.FROM); // COPY keyword is already 
parsed
+
+        parseFileName(lex);
+
+        parseTableName(lex);
+
+        parseColumns(lex);
+
+        parseFormat(lex);
+
+        parseParameters(lex);
+
+        return this;
+    }
+
+    /**
+     * Parses the file name.
+     *
+     * @param lex The lexer.
+     */
+    private void parseFileName(SqlLexer lex) {
+        locFileName = parseIdentifier(lex);
+    }
+
+    /**
+     * Parses the schema and table names.
+     *
+     * @param lex The lexer.
+     */
+    private void parseTableName(SqlLexer lex) {
+        skipIfMatchesKeyword(lex, SqlKeyword.INTO);
+
+        tblQName = parseQualifiedIdentifier(lex);
+    }
+
+    /**
+     * Parses the list of columns.
+     *
+     * @param lex The lexer.
+     */
+    private void parseColumns(SqlLexer lex) {
+        skipIfMatches(lex, SqlLexerTokenType.PARENTHESIS_LEFT);
+
+        cols = new ArrayList<>();
+
+        do {
+            cols.add(parseColumn(lex));
+        }
+        while (!skipCommaOrRightParenthesis(lex));
+    }
+
+    /**
+     * Parses column clause.
+     *
+     * @param lex The lexer.
+     * @return The column name.
+     */
+    private String parseColumn(SqlLexer lex) {
+        return parseIdentifier(lex);
+    }
+
+    /**
+     * Parses the format clause.
+     *
+     * @param lex The lexer.
+     */
+    private void parseFormat(SqlLexer lex) {
+        skipIfMatchesKeyword(lex, SqlKeyword.FORMAT);
+
+        String name = parseIdentifier(lex);
+
+        switch (name.toUpperCase()) {
+            case BulkLoadCsvFormat.NAME:
+                BulkLoadCsvFormat fmt = new BulkLoadCsvFormat();
+
+                // IGNITE-7537 will introduce user-defined values
+                fmt.lineSeparator(BulkLoadCsvFormat.DEFAULT_LINE_SEPARATOR);
+                fmt.fieldSeparator(BulkLoadCsvFormat.DEFAULT_FIELD_SEPARATOR);
+                fmt.quoteChars(BulkLoadCsvFormat.DEFAULT_QUOTE_CHARS);
+                fmt.commentChars(BulkLoadCsvFormat.DEFAULT_COMMENT_CHARS);
+                fmt.escapeChars(BulkLoadCsvFormat.DEFAULT_ESCAPE_CHARS);
+
+                inputFormat = fmt;
+
+                break;
+
+            default:
+                throw error(lex, "Unknown format name: " + name +
+                    ". Currently supported format is " + 
BulkLoadCsvFormat.NAME);
+        }
+    }
+
+    /**
+     * Parses the optional parameters.
+     *
+     * @param lex The lexer.
+     */
+    private void parseParameters(SqlLexer lex) {
+        while (lex.lookAhead().tokenType() == SqlLexerTokenType.DEFAULT) {
+            switch (lex.lookAhead().token()) {
+                case SqlKeyword.BATCH_SIZE:
+                    lex.shift();
+
+                    int sz = parseInt(lex);
+
+                    if (!BulkLoadAckClientParameters.isValidBatchSize(sz))
+                        throw error(lex, 
BulkLoadAckClientParameters.batchSizeErrorMsg(sz));
+
+                    batchSize = sz;
+
+                    break;
+
+                default:
+                    return;
+            }
+        }
+    }
+
+    /**
+     * Returns the schemaName.
+     *
+     * @return schemaName.
+     */
+    @Override public String schemaName() {
+        return tblQName.schemaName();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void schemaName(String schemaName) {
+        tblQName.schemaName(schemaName);
+    }
+
+    /**
+     * Returns the table name.
+     *
+     * @return The table name
+     */
+    public String tableName() {
+        return tblQName.name();
+    }
+
+    /**
+     * Sets the table name
+     *
+     * @param tblName The table name.
+     */
+    public void tableName(String tblName) {
+        tblQName.name(tblName);
+    }
+
+    /**
+     * Returns the local file name.
+     *
+     * @return The local file name.
+     */
+    public String localFileName() {
+        return locFileName;
+    }
+
+    /**
+     * Sets the local file name.
+     *
+     * @param locFileName The local file name.
+     */
+    public void localFileName(String locFileName) {
+        this.locFileName = locFileName;
+    }
+
+    /**
+     * Returns the list of columns.
+     *
+     * @return The list of columns.
+     */
+    public List<String> columns() {
+        return cols;
+    }
+
+    /**
+     * Returns the input file format.
+     *
+     * @return The input file format.
+     */
+    public BulkLoadFormat inputFormat() {
+        return inputFormat;
+    }
+
+    /**
+     * Returns the batch size.
+     *
+     * @return The batch size.
+     */
+    public Integer batchSize() {
+        return batchSize;
+    }
+
+    /**
+     * Sets the batch size.
+     *
+     * @param batchSize The batch size.
+     */
+    public void batchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SqlBulkLoadCommand.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java
new file mode 100644
index 0000000..b5cd55b
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql;
+
+/**
+ * Tests for SQL parser: COPY command.
+ */
+public class SqlParserBulkLoadSelfTest extends SqlParserAbstractSelfTest {
+    /**
+     * Tests for COPY command.
+     *
+     * @throws Exception If any of sub-tests was failed.
+     */
+    public void testCopy() {
+        assertParseError(null,
+            "copy grom \"any.file\" into Person (_key, age, firstName, 
lastName) format csv",
+            "Unexpected token: \"GROM\" (expected: \"FROM\")");
+
+        assertParseError(null,
+            "copy from into Person (_key, age, firstName, lastName) format 
csv",
+            "Unexpected token: \"INTO\" (expected: \"[identifier]\"");
+
+        assertParseError(null,
+            "copy from any.file into Person (_key, age, firstName, lastName) 
format csv",
+            "Unexpected token: \".\" (expected: \"INTO\"");
+
+        assertParseError(null,
+            "copy from \"any.file\" to Person (_key, age, firstName, lastName) 
format csv",
+            "Unexpected token: \"TO\" (expected: \"INTO\")");
+
+        // Column list
+
+        assertParseError(null,
+            "copy from \"any.file\" into Person () format csv",
+            "Unexpected token: \")\" (expected: \"[identifier]\")");
+
+        assertParseError(null,
+            "copy from \"any.file\" into Person (,) format csv",
+            "Unexpected token: \",\" (expected: \"[identifier]\")");
+
+        assertParseError(null,
+            "copy from \"any.file\" into Person format csv",
+            "Unexpected token: \"FORMAT\" (expected: \"(\")");
+
+        // FORMAT
+
+        assertParseError(null,
+            "copy from \"any.file\" into Person (_key, age, firstName, 
lastName)",
+            "Unexpected end of command (expected: \"FORMAT\")");
+
+        assertParseError(null,
+            "copy from \"any.file\" into Person (_key, age, firstName, 
lastName) format lsd",
+            "Unknown format name: LSD");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 72e80e2..c46c906 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -39,8 +39,15 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.BulkLoadContextCursor;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
+import 
org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadParser;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
@@ -58,8 +65,12 @@ import 
org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
+import org.apache.ignite.internal.sql.command.SqlCommand;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
+import org.apache.ignite.internal.util.lang.IgniteClosureX;
 import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T3;
@@ -967,6 +978,67 @@ public class DmlStatementsProcessor {
         return updateSqlFields(schemaName, c, 
GridSqlQueryParser.prepared(stmt), fldsQry, local, filter, cancel);
     }
 
+    /**
+     * Runs a DML statement for which we have internal command executor.
+     *
+     * @param sql The SQL command text to execute.
+     * @param cmd The command to execute.
+     * @return The cursor returned by the statement.
+     * @throws IgniteSQLException If failed.
+     */
+    public FieldsQueryCursor<List<?>> runNativeDmlStatement(String sql, 
SqlCommand cmd) {
+        try {
+            if (cmd instanceof SqlBulkLoadCommand)
+                return processBulkLoadCommand((SqlBulkLoadCommand)cmd);
+            else
+                throw new IgniteSQLException("Unsupported DML operation: " + 
sql,
+                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+        }
+        catch (IgniteSQLException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteSQLException("Unexpected DML operation failure: " 
+ e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Process bulk load COPY command.
+     *
+     * @param cmd The command.
+     * @return The context (which is the result of the first request/response).
+     * @throws IgniteCheckedException If something failed.
+     */
+    public FieldsQueryCursor<List<?>> 
processBulkLoadCommand(SqlBulkLoadCommand cmd) throws IgniteCheckedException {
+        if (cmd.batchSize() == null)
+            cmd.batchSize(BulkLoadAckClientParameters.DEFAULT_BATCH_SIZE);
+
+        GridH2Table tbl = idx.dataTable(cmd.schemaName(), cmd.tableName());
+
+        if (tbl == null)
+            throw new IgniteSQLException("Table does not exist: " + 
cmd.tableName(),
+                IgniteQueryErrorCode.TABLE_NOT_FOUND);
+
+        UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl);
+
+        IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new 
BulkLoadDataConverter(plan);
+
+        GridCacheContext cache = tbl.cache();
+
+        IgniteDataStreamer<Object, Object> streamer = 
cache.grid().dataStreamer(cache.name());
+
+        BulkLoadCacheWriter outputWriter = new 
BulkLoadStreamerWriter(streamer);
+
+        BulkLoadParser inputParser = 
BulkLoadParser.createParser(cmd.inputFormat());
+
+        BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, 
dataConverter, outputWriter);
+
+        BulkLoadAckClientParameters params = new 
BulkLoadAckClientParameters(cmd.localFileName(), cmd.batchSize());
+
+        return new BulkLoadContextCursor(processor, params);
+    }
+
     /** */
     private final static class InsertEntryProcessor implements 
EntryProcessor<Object, Object, Boolean> {
         /** Value to set. */
@@ -1081,4 +1153,31 @@ public class DmlStatementsProcessor {
         }
     }
 
+    /**
+     * Converts a row of values to actual key+value using {@link 
UpdatePlan#processRow(List)}.
+     */
+    private static class BulkLoadDataConverter extends IgniteClosureX<List<?>, 
IgniteBiTuple<?, ?>> {
+        /** Update plan to convert incoming rows. */
+        private final UpdatePlan plan;
+
+        /**
+         * Creates the converter with the given update plan.
+         *
+         * @param plan The update plan to use.
+         */
+        private BulkLoadDataConverter(UpdatePlan plan) {
+            this.plan = plan;
+        }
+
+        /**
+         * Converts the record to a key+value.
+         *
+         * @param record The record to convert.
+         * @return The key+value.
+         * @throws IgniteCheckedException If conversion failed for some reason.
+         */
+        @Override public IgniteBiTuple<?, ?> applyx(List<?> record) throws 
IgniteCheckedException {
+            return plan.processRow(record);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 96b8935..06c936b 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
 import javax.cache.Cache;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
@@ -99,7 +100,6 @@ import 
org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO;
 import 
org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
-import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import 
org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import 
org.apache.ignite.internal.processors.query.h2.opt.GridH2PlainRowFactory;
@@ -120,6 +120,7 @@ import 
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisito
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.sql.SqlParseException;
 import org.apache.ignite.internal.sql.SqlParser;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
 import org.apache.ignite.internal.sql.command.SqlAlterTableCommand;
 import org.apache.ignite.internal.sql.command.SqlCommand;
 import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand;
@@ -190,6 +191,9 @@ import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType
  */
 @SuppressWarnings({"UnnecessaryFullyQualifiedName", 
"NonFinalStaticVariableUsedInClassInitialization"})
 public class IgniteH2Indexing implements GridQueryIndexing {
+    public static final Pattern INTERNAL_CMD_RE = Pattern.compile(
+        "^(create|drop)\\s+index|^alter\\s+table|^copy", 
Pattern.CASE_INSENSITIVE);
+
     /*
      * Register IO for indexes.
      */
@@ -1437,9 +1441,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
      */
     private List<FieldsQueryCursor<List<?>>> 
tryQueryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry) {
         // Heuristic check for fast return.
-        String sqlUpper = qry.getSql().toUpperCase();
-
-        if (!(sqlUpper.contains("INDEX") || sqlUpper.contains("ALTER")))
+        if (!INTERNAL_CMD_RE.matcher(qry.getSql().trim()).find())
             return null;
 
         // Parse.
@@ -1454,9 +1456,9 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             if (parser.nextCommand() != null)
                 return null;
 
-            // Only CREATE/DROP INDEX and ALTER TABLE commands are supported 
for now.
+            // Currently supported commands are: CREATE/DROP INDEX/COPY/ALTER 
TABLE
             if (!(cmd instanceof SqlCreateIndexCommand || cmd instanceof 
SqlDropIndexCommand ||
-                cmd instanceof SqlAlterTableCommand))
+                cmd instanceof SqlBulkLoadCommand || cmd instanceof 
SqlAlterTableCommand))
                 return null;
         }
         catch (Exception e) {
@@ -1472,17 +1474,26 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
             if (e instanceof SqlParseException)
                 code = ((SqlParseException)e).code();
 
-            throw new IgniteSQLException("Failed to parse DDL statement: " + 
qry.getSql(), code, e);
+            throw new IgniteSQLException("Failed to parse DDL statement: " + 
qry.getSql() + ": " + e.getMessage(),
+                code, e);
         }
 
         // Execute.
-        try {
-            FieldsQueryCursor<List<?>> res = 
ddlProc.runDdlStatement(qry.getSql(), cmd);
+        if (cmd instanceof SqlBulkLoadCommand) {
+            FieldsQueryCursor<List<?>> cursor = 
dmlProc.runNativeDmlStatement(qry.getSql(), cmd);
 
-            return Collections.singletonList(res);
+            return Collections.singletonList(cursor);
         }
-        catch (IgniteCheckedException e) {
-            throw new IgniteSQLException("Failed to execute DDL statement 
[stmt=" + qry.getSql() + ']', e);
+        else {
+            try {
+                FieldsQueryCursor<List<?>> cursor = 
ddlProc.runDdlStatement(qry.getSql(), cmd);
+
+                return Collections.singletonList(cursor);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteSQLException("Failed to execute DDL statement 
[stmt=" + qry.getSql() + "]: "
+                    + e.getMessage(), e);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
index ca7680a..6f5b51f 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
@@ -62,6 +62,7 @@ import 
org.apache.ignite.internal.sql.command.SqlDropIndexCommand;
 import org.apache.ignite.internal.sql.command.SqlIndexColumn;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.command.Prepared;
 import org.h2.command.ddl.AlterTableAlterColumn;
 import org.h2.command.ddl.CreateIndex;
@@ -483,6 +484,7 @@ public class DdlStatementsProcessor {
             return resCur;
         }
         catch (SchemaOperationException e) {
+            U.error(null, "DDL operation failure", e);
             throw convert(e);
         }
         catch (IgniteSQLException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdateMode.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdateMode.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdateMode.java
index 0440648..d9c627a 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdateMode.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdateMode.java
@@ -22,15 +22,18 @@ package org.apache.ignite.internal.processors.query.h2.dml;
  * or UPDATE/DELETE from subquery or literals/params based.
  */
 public enum UpdateMode {
-    /** */
+    /** MERGE command. */
     MERGE,
 
-    /** */
+    /** INSERT command. */
     INSERT,
 
-    /** */
+    /** UPDATE command. */
     UPDATE,
 
-    /** */
+    /** DELETE command. */
     DELETE,
+
+    /** COPY command. */
+    BULK_LOAD
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
index 17dc9d1..10d485a 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
@@ -39,6 +39,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.h2.table.Column;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.internal.processors.query.h2.dml.UpdateMode.BULK_LOAD;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT;
 
 /**
@@ -182,6 +183,10 @@ public final class UpdatePlan {
      * @throws IgniteCheckedException if failed.
      */
     public IgniteBiTuple<?, ?> processRow(List<?> row) throws 
IgniteCheckedException {
+        if (mode != BULK_LOAD && row.size() != colNames.length)
+            throw new IgniteSQLException("Not enough values in a row: " + 
row.size() + " instead of " + colNames.length,
+                IgniteQueryErrorCode.ENTRY_PROCESSING);
+
         GridH2RowDescriptor rowDesc = tbl.rowDescriptor();
         GridQueryTypeDescriptor desc = rowDesc.type();
 
@@ -205,7 +210,8 @@ public final class UpdatePlan {
 
         if (key == null) {
             if (F.isEmpty(desc.keyFieldName()))
-                throw new IgniteSQLException("Key for INSERT or MERGE must not 
be null", IgniteQueryErrorCode.NULL_KEY);
+                throw new IgniteSQLException("Key for INSERT, COPY, or MERGE 
must not be null",
+                    IgniteQueryErrorCode.NULL_KEY);
             else
                 throw new IgniteSQLException("Null value is not allowed for 
column '" + desc.keyFieldName() + "'",
                     IgniteQueryErrorCode.NULL_KEY);
@@ -213,16 +219,18 @@ public final class UpdatePlan {
 
         if (val == null) {
             if (F.isEmpty(desc.valueFieldName()))
-                throw new IgniteSQLException("Value for INSERT, MERGE, or 
UPDATE must not be null",
+                throw new IgniteSQLException("Value for INSERT, COPY, MERGE, 
or UPDATE must not be null",
                     IgniteQueryErrorCode.NULL_VALUE);
             else
                 throw new IgniteSQLException("Null value is not allowed for 
column '" + desc.valueFieldName() + "'",
                     IgniteQueryErrorCode.NULL_VALUE);
         }
 
+        int actualColCnt = Math.min(colNames.length, row.size());
+
         Map<String, Object> newColVals = new HashMap<>();
 
-        for (int i = 0; i < colNames.length; i++) {
+        for (int i = 0; i < actualColCnt; i++) {
             if (i == keyColIdx || i == valColIdx)
                 continue;
 
@@ -241,14 +249,14 @@ public final class UpdatePlan {
 
         // We update columns in the order specified by the table for a reason 
- table's
         // column order preserves their precedence for correct update of 
nested properties.
-        Column[] cols = tbl.getColumns();
+        Column[] tblCols = tbl.getColumns();
 
         // First 3 columns are _key, _val and _ver. Skip 'em.
-        for (int i = DEFAULT_COLUMNS_COUNT; i < cols.length; i++) {
+        for (int i = DEFAULT_COLUMNS_COUNT; i < tblCols.length; i++) {
             if (tbl.rowDescriptor().isKeyValueOrVersionColumn(i))
                 continue;
 
-            String colName = cols[i].getName();
+            String colName = tblCols[i].getName();
 
             if (!newColVals.containsKey(colName))
                 continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index 3305b00..bced836 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -54,6 +54,7 @@ import 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUpdate;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -407,6 +408,91 @@ public final class UpdatePlanBuilder {
     }
 
     /**
+     * Prepare update plan for COPY command (AKA bulk load).
+     *
+     * @param cmd Bulk load command
+     * @return The update plan for this command.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    public static UpdatePlan planForBulkLoad(SqlBulkLoadCommand cmd, 
GridH2Table tbl) throws IgniteCheckedException {
+        GridH2RowDescriptor desc = tbl.rowDescriptor();
+
+        if (desc == null)
+            throw new IgniteSQLException("Row descriptor undefined for table 
'" + tbl.getName() + "'",
+                IgniteQueryErrorCode.NULL_TABLE_DESCRIPTOR);
+
+        GridCacheContext<?, ?> cctx = desc.context();
+
+        List<String> cols = cmd.columns();
+
+        if (cols == null)
+            throw new IgniteSQLException("Columns are not defined", 
IgniteQueryErrorCode.NULL_TABLE_DESCRIPTOR);
+
+        String[] colNames = new String[cols.size()];
+
+        int[] colTypes = new int[cols.size()];
+
+        int keyColIdx = -1;
+        int valColIdx = -1;
+
+        boolean hasKeyProps = false;
+        boolean hasValProps = false;
+
+        for (int i = 0; i < cols.size(); i++) {
+            String colName = cols.get(i);
+
+            colNames[i] = colName;
+
+            Column h2Col = tbl.getColumn(colName);
+
+            colTypes[i] = h2Col.getType();
+            int colId = h2Col.getColumnId();
+
+            if (desc.isKeyColumn(colId)) {
+                keyColIdx = i;
+                continue;
+            }
+
+            if (desc.isValueColumn(colId)) {
+                valColIdx = i;
+                continue;
+            }
+
+            GridQueryProperty prop = desc.type().property(colName);
+
+            assert prop != null : "Property '" + colName + "' not found.";
+
+            if (prop.key())
+                hasKeyProps = true;
+            else
+                hasValProps = true;
+        }
+
+        KeyValueSupplier keySupplier = createSupplier(cctx, desc.type(), 
keyColIdx, hasKeyProps,
+            true, false);
+        KeyValueSupplier valSupplier = createSupplier(cctx, desc.type(), 
valColIdx, hasValProps,
+            false, false);
+
+        return new UpdatePlan(
+            UpdateMode.BULK_LOAD,
+            tbl,
+            colNames,
+            colTypes,
+            keySupplier,
+            valSupplier,
+            keyColIdx,
+            valColIdx,
+            null,
+            true,
+            null,
+            0,
+            null,
+            null
+        );
+    }
+
+    /**
      * Detect appropriate method of instantiating key or value (take from 
param, create binary builder,
      * invoke default ctor, or allocate).
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 68610a1..6295d8d 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -165,6 +165,7 @@ import 
org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
 import 
org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryDistributedJoinsTest;
 import 
org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest;
 import 
org.apache.ignite.internal.processors.sql.SqlConnectorConfigurationValidationSelfTest;
+import org.apache.ignite.internal.sql.SqlParserBulkLoadSelfTest;
 import org.apache.ignite.internal.sql.SqlParserCreateIndexSelfTest;
 import org.apache.ignite.internal.sql.SqlParserDropIndexSelfTest;
 import 
org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest;
@@ -183,6 +184,7 @@ public class IgniteCacheQuerySelfTestSuite extends 
TestSuite {
 
         suite.addTestSuite(SqlParserCreateIndexSelfTest.class);
         suite.addTestSuite(SqlParserDropIndexSelfTest.class);
+        suite.addTestSuite(SqlParserBulkLoadSelfTest.class);
 
         suite.addTestSuite(SqlConnectorConfigurationValidationSelfTest.class);
         
suite.addTestSuite(ClientConnectorConfigurationValidationSelfTest.class);

Reply via email to