http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadStreamerWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadStreamerWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadStreamerWriter.java new file mode 100644 index 0000000..3e5efd9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadStreamerWriter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.bulkload; + +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.lang.IgniteBiTuple; + +/** + * A bulk load cache writer object that adds entries using {@link IgniteDataStreamer}. + */ +public class BulkLoadStreamerWriter extends BulkLoadCacheWriter { + /** Serialization version UID. */ + private static final long serialVersionUID = 0L; + + /** The streamer. */ + private final IgniteDataStreamer<Object, Object> streamer; + + /** + * A number of {@link IgniteDataStreamer#addData(Object, Object)} calls made, + * since we don't have any kind of result data back from the streamer. + */ + private long updateCnt; + + /** + * Creates a cache writer. + * + * @param streamer The streamer to use. + */ + public BulkLoadStreamerWriter(IgniteDataStreamer<Object, Object> streamer) { + this.streamer = streamer; + updateCnt = 0; + } + + /** {@inheritDoc} */ + @Override public void apply(IgniteBiTuple<?, ?> entry) { + streamer.addData(entry.getKey(), entry.getValue()); + + updateCnt++; + } + + /** {@inheritDoc} */ + @Override public void close() { + streamer.close(); + } + + /** {@inheritDoc} */ + @Override public long updateCnt() { + return updateCnt; + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CharsetDecoderBlock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CharsetDecoderBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CharsetDecoderBlock.java new file mode 100644 index 0000000..5b18def --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CharsetDecoderBlock.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.bulkload.pipeline; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteIllegalStateException; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; +import java.util.Arrays; + +/** + * A {@link PipelineBlock}, which converts stream of bytes supplied as byte[] arrays to an array of char[] using + * the specified encoding. Decoding errors (malformed input and unmappable characters) are to handled by dropping + * the erroneous input, appending the coder's replacement value to the output buffer, and resuming the coding operation. + */ +public class CharsetDecoderBlock extends PipelineBlock<byte[], char[]> { + /** Charset decoder */ + private final CharsetDecoder charsetDecoder; + + /** Leftover bytes (partial characters) from the last batch, + * or null if everything was processed. */ + private byte[] leftover; + + /** True once we've reached the end of input. */ + private boolean isEndOfInput; + + /** + * Creates charset decoder block. + * + * @param charset The charset encoding to decode bytes from. + */ + public CharsetDecoderBlock(Charset charset) { + charsetDecoder = charset.newDecoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE); + + isEndOfInput = false; + leftover = null; + } + + /** {@inheritDoc} */ + @Override public void accept(byte[] data, boolean isLastAppend) throws IgniteCheckedException { + assert nextBlock != null; + + assert !isEndOfInput : "convertBytes() called after end of input"; + + isEndOfInput = isLastAppend; + + if (leftover == null && data.length == 0) { + nextBlock.accept(new char[0], isLastAppend); + return; + } + + ByteBuffer dataBuf; + + if (leftover == null) + dataBuf = ByteBuffer.wrap(data); + else { + dataBuf = ByteBuffer.allocate(leftover.length + data.length); + + dataBuf.put(leftover) + .put(data); + + dataBuf.flip(); + + leftover = null; + } + + int outBufLen = (int)Math.ceil(charsetDecoder.maxCharsPerByte() * (data.length + 1)); + + assert outBufLen > 0; + + CharBuffer outBuf = CharBuffer.allocate(outBufLen); + + for (;;) { + CoderResult res = charsetDecoder.decode(dataBuf, outBuf, isEndOfInput); + + if (res.isUnderflow()) { + // End of input buffer reached. Either skip the partial character at the end or wait for the next batch. + if (!isEndOfInput && dataBuf.remaining() > 0) + leftover = Arrays.copyOfRange(dataBuf.array(), + dataBuf.arrayOffset() + dataBuf.position(), dataBuf.limit()); + + if (isEndOfInput) + charsetDecoder.flush(outBuf); // See {@link CharsetDecoder} class javadoc for the protocol. + + if (outBuf.position() > 0) + nextBlock.accept(Arrays.copyOfRange(outBuf.array(), outBuf.arrayOffset(), outBuf.position()), + isEndOfInput); + + break; + } + + if (res.isOverflow()) { // Not enough space in the output buffer, flush it and retry. + assert outBuf.position() > 0; + + nextBlock.accept(Arrays.copyOfRange(outBuf.array(), outBuf.arrayOffset(), outBuf.position()), + isEndOfInput); + + outBuf.flip(); + + continue; + } + + assert ! res.isMalformed() && ! res.isUnmappable(); + + // We're not supposed to reach this point with the current implementation. + // The code below will fire exception if Oracle implementation of CharsetDecoder will be changed in future. + throw new IgniteIllegalStateException("Unknown CharsetDecoder state"); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java new file mode 100644 index 0000000..5b2ee4b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/CsvLineProcessorBlock.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.bulkload.pipeline; + +import org.apache.ignite.IgniteCheckedException; +import org.jetbrains.annotations.NotNull; + +import java.util.regex.Pattern; + +/** + * A {@link PipelineBlock}, which splits line according to CSV format rules and unquotes fields. + * The next block {@link PipelineBlock#accept(Object, boolean)} is called per-line. + */ +public class CsvLineProcessorBlock extends PipelineBlock<String, String[]> { + /** Field delimiter pattern. */ + private final Pattern fldDelim; + + /** Quote character. */ + private final String quoteChars; + + /** + * Creates a CSV line parser. + * + * @param fldDelim The pattern for the field delimiter. + * @param quoteChars Quoting character. + */ + public CsvLineProcessorBlock(Pattern fldDelim, String quoteChars) { + this.fldDelim = fldDelim; + this.quoteChars = quoteChars; + } + + /** {@inheritDoc} */ + @Override public void accept(String input, boolean isLastPortion) throws IgniteCheckedException { + // Currently we don't process quoted field delimiter properly, will be fixed in IGNITE-7537. + String[] fields = fldDelim.split(input); + + for (int i = 0; i < fields.length; i++) + fields[i] = trim(fields[i]); + + nextBlock.accept(fields, isLastPortion); + } + + /** + * Trims quote characters from beginning and end of the line. + * + * @param str String to trim. + * @return The trimmed string. + */ + @NotNull private String trim(String str) { + int startPos = quoteChars.indexOf(str.charAt(0)) != -1 ? 1 : 0; + int endPos = quoteChars.indexOf(str.charAt(str.length() - 1)) != -1 ? str.length() - 1 : str.length(); + + return str.substring(startPos, endPos); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/LineSplitterBlock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/LineSplitterBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/LineSplitterBlock.java new file mode 100644 index 0000000..122d0db --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/LineSplitterBlock.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.bulkload.pipeline; + +import org.apache.ignite.IgniteCheckedException; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A {@link PipelineBlock}, which splits input stream of char[] into lines using the specified {@link Pattern} + * as line separator. Next block {@link PipelineBlock#accept(Object, boolean)} is invoked for each line. + * Leftover characters are remembered and used during processing the next input batch, + * unless isLastPortion flag is specified. + */ +public class LineSplitterBlock extends PipelineBlock<char[], String> { + /** Line separator pattern */ + private final Pattern delim; + + /** Leftover characters from the previous invocation of {@link #accept(char[], boolean)}. */ + private StringBuilder leftover = new StringBuilder(); + + /** + * Creates line splitter block. + * + * @param delim The line separator pattern. + */ + public LineSplitterBlock(Pattern delim) { + this.delim = delim; + } + + /** {@inheritDoc} */ + @Override public void accept(char[] chars, boolean isLastPortion) throws IgniteCheckedException { + leftover.append(chars); + + String input = leftover.toString(); + Matcher matcher = delim.matcher(input); + + int lastPos = 0; + while (matcher.find()) { + String outStr = input.substring(lastPos, matcher.start()); + + if (!outStr.isEmpty()) + nextBlock.accept(outStr, false); + + lastPos = matcher.end(); + } + + if (lastPos != 0) + leftover.delete(0, lastPos); + + if (isLastPortion && leftover.length() > 0) { + nextBlock.accept(leftover.toString(), true); + leftover.setLength(0); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/PipelineBlock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/PipelineBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/PipelineBlock.java new file mode 100644 index 0000000..914b4b4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/PipelineBlock.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.bulkload.pipeline; + +import org.apache.ignite.IgniteCheckedException; +import org.jetbrains.annotations.Nullable; + +/** + * A file parsing pipeline block. Accepts an portion of an input (isLastPortion flag is provided to signify the last + * block to process) and optionally calls the next block with transformed input or performs any other handling, + * such as storing input to internal structures. + */ +public abstract class PipelineBlock<I, O> { + /** The next block in pipeline or null if this block is a terminator. */ + @Nullable protected PipelineBlock<O, ?> nextBlock; + + /** + * Creates a pipeline block. + * + * <p>(There is no nextBlock argument in the constructor: setting the next block using + * {@link #append(PipelineBlock)} method is more convenient. + */ + protected PipelineBlock() { + nextBlock = null; + } + + /** + * Sets the next block in this block and returns the <b>next</b> block. + * + * <p>Below is an example of using this method to set up a pipeline:<br> + * {@code block1.append(block2).append(block3); }. + * <p>Block2 here becomes the next for block1, and block3 is the next one for the block2. + * + * @param next The next block for the current block. + * @return The next block ({@code next} argument). + */ + public <N> PipelineBlock<O, N> append(PipelineBlock<O, N> next) { + nextBlock = next; + return next; + } + + /** + * Accepts a portion of input. {@code isLastPortion} parameter should be set if this is a last portion + * of the input. The method must not be called after the end of input: the call with {@code isLastPortion == true} + * is the last one. + * + * @param inputPortion Portion of input. + * @param isLastPortion Is this the last portion. + */ + public abstract void accept(I inputPortion, boolean isLastPortion) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/StrListAppenderBlock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/StrListAppenderBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/StrListAppenderBlock.java new file mode 100644 index 0000000..91cbc1e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/pipeline/StrListAppenderBlock.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.bulkload.pipeline; + +import java.util.Arrays; +import java.util.List; + +/** + * The PipelineBlock which appends its input to a user-supplied list. + * + * <p>The list is set using {@link #output(List)} method. + */ +public class StrListAppenderBlock extends PipelineBlock<String[], Object> { + /** The output list. */ + private List<List<Object>> output; + + /** + * Creates the block. List can be configured using {@link #output(List)} method. + */ + public StrListAppenderBlock() { + output = null; + } + + /** + * Sets the output list. + * + * @param output The output list. + */ + public void output(List<List<Object>> output) { + this.output = output; + } + + /** {@inheritDoc} */ + @Override public void accept(String[] elements, boolean isLastPortion) { + output.add(Arrays.asList(elements)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java new file mode 100644 index 0000000..8a170ab --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadAckResult.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters; +import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * A reply from server to SQL COPY command, which is essentially a request from server to client + * to send files from client to server (see IGNITE-6917 for details). + * + * @see JdbcBulkLoadProcessor for the protocol. + * @see SqlBulkLoadCommand + */ +public class JdbcBulkLoadAckResult extends JdbcResult { + /** Query ID for matching this command on server in further {@link JdbcBulkLoadBatchRequest} commands. */ + private long qryId; + + /** + * Bulk load parameters, which are parsed on the server side and sent to client to specify + * what files to send, batch size, etc. + */ + private BulkLoadAckClientParameters params; + + /**Creates uninitialized bulk load batch request result. */ + public JdbcBulkLoadAckResult() { + super(BULK_LOAD_ACK); + + qryId = 0; + params = null; + } + + /** + * Constructs a request from server (in form of reply) to send files from client to server. + * + * @param qryId Query ID to send in further {@link JdbcBulkLoadBatchRequest}s. + * @param params Various parameters for sending batches from client side. + */ + public JdbcBulkLoadAckResult(long qryId, BulkLoadAckClientParameters params) { + super(BULK_LOAD_ACK); + + this.qryId = qryId; + this.params = params; + } + + /** + * Returns the query ID. + * + * @return Query ID. + */ + public long queryId() { + return qryId; + } + + /** + * Returns the parameters for the client. + * + * @return The parameters for the client. + */ + public BulkLoadAckClientParameters params() { + return params; + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException { + super.writeBinary(writer); + + writer.writeLong(qryId); + writer.writeString(params.localFileName()); + writer.writeInt(params.batchSize()); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException { + super.readBinary(reader); + + qryId = reader.readLong(); + + String locFileName = reader.readString(); + int batchSize = reader.readInt(); + + if (!BulkLoadAckClientParameters.isValidBatchSize(batchSize)) + throw new BinaryObjectException(BulkLoadAckClientParameters.batchSizeErrorMsg(batchSize)); + + params = new BulkLoadAckClientParameters(locFileName, batchSize); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(JdbcBulkLoadAckResult.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java new file mode 100644 index 0000000..b75de5a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadBatchRequest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; + +/** + * A JDBC request that sends a batch of a file to the server. Used when handling + * {@link SqlBulkLoadCommand} command. + */ +public class JdbcBulkLoadBatchRequest extends JdbcRequest { + /** A sentinel to indicate that {@link #cmd} field was not initialized. */ + public static final int CMD_UNKNOWN = -1; + + /** Next batch comes in this request and there are more batches. */ + public static final int CMD_CONTINUE = 0; + + /** + * This is the final batch from the client and there was an error on the client side, + * so terminate with error on the server side as well. + */ + public static final int CMD_FINISHED_ERROR = 1; + + /** + * This is the final batch of the file and everything went well on the client side. + * Server may complete the request. + */ + public static final int CMD_FINISHED_EOF = 2; + + /** QueryID of the original COPY command request. */ + private long qryId; + + /** Batch index starting from 0. */ + private int batchIdx; + + /** Command (see CMD_xxx constants above). */ + private int cmd; + + /** Data in this batch. */ + @NotNull private byte[] data; + + /** + * Creates the request with uninitialized parameters. + */ + public JdbcBulkLoadBatchRequest() { + super(BULK_LOAD_BATCH); + + qryId = -1; + batchIdx = -1; + cmd = CMD_UNKNOWN; + data = null; + } + + /** + * Creates the request with specified parameters and zero-length data. + * Typically used with {@link #CMD_FINISHED_ERROR} and {@link #CMD_FINISHED_EOF}. + * + * @param qryId The query ID from the {@link JdbcBulkLoadAckResult}. + * @param batchIdx Index of the current batch starting with 0. + * @param cmd The command ({@link #CMD_CONTINUE}, {@link #CMD_FINISHED_EOF}, or {@link #CMD_FINISHED_ERROR}). + */ + public JdbcBulkLoadBatchRequest(long qryId, int batchIdx, int cmd) { + this(qryId, batchIdx, cmd, new byte[0]); + } + + /** + * Creates the request with the specified parameters. + * + * @param qryId The query ID from the {@link JdbcBulkLoadAckResult}. + * @param batchIdx Index of the current batch starting with 0. + * @param cmd The command ({@link #CMD_CONTINUE}, {@link #CMD_FINISHED_EOF}, or {@link #CMD_FINISHED_ERROR}). + * @param data The data block (zero length is acceptable). + */ + public JdbcBulkLoadBatchRequest(long qryId, int batchIdx, int cmd, @NotNull byte[] data) { + super(BULK_LOAD_BATCH); + + this.qryId = qryId; + this.batchIdx = batchIdx; + + assert isCmdValid(cmd) : "Invalid command value: " + cmd; + this.cmd = cmd; + + this.data = data; + } + + /** + * Returns the original query ID. + * + * @return The original query ID. + */ + public long queryId() { + return qryId; + } + + /** + * Returns the batch index. + * + * @return The batch index. + */ + public long batchIdx() { + return batchIdx; + } + + /** + * Returns the command (see CMD_xxx constants for details). + * + * @return The command. + */ + public int cmd() { + return cmd; + } + + /** + * Returns the data. + * + * @return data if data was not supplied + */ + @NotNull public byte[] data() { + return data; + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException { + super.writeBinary(writer); + + writer.writeLong(qryId); + writer.writeInt(batchIdx); + writer.writeInt(cmd); + writer.writeByteArray(data); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException { + super.readBinary(reader); + + qryId = reader.readLong(); + batchIdx = reader.readInt(); + + int c = reader.readInt(); + if (!isCmdValid(c)) + throw new BinaryObjectException("Invalid command: " + cmd); + + cmd = c; + + data = reader.readByteArray(); + assert data != null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(JdbcBulkLoadBatchRequest.class, this); + } + + /** + * Checks if the command value is valid. + * + * @param c The command value to check. + * @return True if valid, false otherwise. + */ + private static boolean isCmdValid(int c) { + return c >= CMD_CONTINUE && c <= CMD_FINISHED_EOF; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java new file mode 100644 index 0000000..9757791 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBulkLoadProcessor.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor; +import org.apache.ignite.internal.processors.query.IgniteSQLException; + +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_CONTINUE; +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF; +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR; + +/** + * JDBC wrapper around {@link BulkLoadProcessor} that provides extra logic. + * + * Unlike other "single shot" request-reply commands, the + * COPY command the client-server interaction looks like this: + * + * <pre> + * Thin JDBC client Server + * | | + * |------- JdbcQueryExecuteRequest ------>| + * | with SQL copy command | + * | | + * |<---- JdbcBulkLoadAckResult -----------| + * | with BulkLoadAckClientParameters | + * | containing file name and batch size. | + * | | + * (open the file, | + * read portions and send them) | + * | | + * |------- JdbcBulkLoadBatchRequest #1 -->| + * | with a portion of input file. | + * | | + * |<--- JdbcQueryExecuteResult -----------| + * | with current update counter. | + * | | + * |------- JdbcBulkLoadBatchRequest #2--->| + * | with a portion of input file. | + * | | + * |<--- JdbcQueryExecuteResult -----------| + * | with current update counter. | + * | | + * |------- JdbcBulkLoadBatchRequest #3--->| + * | with the LAST portion of input file. | + * | | + * |<--- JdbcQueryExecuteResult -----------| + * | with the final update counter. | + * | | + * (close the file) | + * | | + * </pre> + * + * In case of input file reading error, a flag is carried to the server: + * {@link JdbcBulkLoadBatchRequest#CMD_FINISHED_ERROR} and the processing + * is aborted on the both sides. + */ +public class JdbcBulkLoadProcessor { + /** A core processor that handles incoming data packets. */ + private final BulkLoadProcessor processor; + + /** Next batch index (for a very simple check that all batches were delivered to us). */ + protected long nextBatchIdx; + + /** + * Creates a JDBC-specific adapter for bulk load processor. + * + * @param processor Bulk load processor from the core to delegate calls to. + */ + public JdbcBulkLoadProcessor(BulkLoadProcessor processor) { + this.processor = processor; + nextBatchIdx = 0; + } + + /** + * Completely processes a bulk load batch request. + * + * Calls {@link BulkLoadProcessor} wrapping around some JDBC-specific logic + * (commands, bulk load batch index checking). + * + * @param req The current request. + */ + public void processBatch(JdbcBulkLoadBatchRequest req) + throws IgniteCheckedException { + if (nextBatchIdx != req.batchIdx()) + throw new IgniteSQLException("Batch #" + (nextBatchIdx + 1) + + " is missing. Received #" + req.batchIdx() + " instead."); + + nextBatchIdx++; + + switch (req.cmd()) { + case CMD_FINISHED_EOF: + processor.processBatch(req.data(), true); + + break; + + case CMD_CONTINUE: + processor.processBatch(req.data(), false); + + break; + + case CMD_FINISHED_ERROR: + break; + + default: + throw new IgniteIllegalStateException("Command was not recognized: " + req.cmd()); + } + } + + /** + * Closes the underlying objects. + * Currently we don't handle normal termination vs. abort. + */ + public void close() throws Exception { + processor.close(); + + nextBatchIdx = -1; + } + + /** + * Provides update counter for sending in the {@link JdbcBatchExecuteResult}. + * + * @return The update counter for sending in {@link JdbcBatchExecuteResult}. + */ + public long updateCnt() { + return processor.outputStreamer().updateCnt(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java index 385924c..22522ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java @@ -60,6 +60,8 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin /** Get schemas metadata request. */ static final byte META_SCHEMAS = 12; + /** Send a batch of a data from client to server. */ + static final byte BULK_LOAD_BATCH = 13; /** Request type. */ private byte type; @@ -154,6 +156,11 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin break; + case BULK_LOAD_BATCH: + req = new JdbcBulkLoadBatchRequest(); + + break; + default: throw new IgniteException("Unknown SQL listener request ID: [request ID=" + reqType + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index 11b50ec..59fc06b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -32,11 +32,14 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.BulkLoadContextCursor; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteVersionUtils; import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters; +import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; @@ -57,9 +60,13 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_CONTINUE; +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF; +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_3_0; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_4_0; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BATCH_EXEC; +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.BULK_LOAD_BATCH; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_COLUMNS; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_INDEXES; import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.META_PARAMS; @@ -93,6 +100,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { /** Current queries cursors. */ private final ConcurrentHashMap<Long, JdbcQueryCursor> qryCursors = new ConcurrentHashMap<>(); + /** Current bulk load processors. */ + private final ConcurrentHashMap<Long, JdbcBulkLoadProcessor> bulkLoadRequests = new ConcurrentHashMap<>(); + /** Distributed joins flag. */ private final boolean distributedJoins; @@ -197,6 +207,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { case META_SCHEMAS: return getSchemas((JdbcMetaSchemasRequest)req); + + case BULK_LOAD_BATCH: + return processBulkLoadFileBatch((JdbcBulkLoadBatchRequest)req); } return new JdbcResponse(IgniteQueryErrorCode.UNSUPPORTED_OPERATION, @@ -207,6 +220,46 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { } } + /** + * Processes a file batch sent from client as part of bulk load COPY command. + * + * @param req Request object with a batch of a file received from client. + * @return Response to send to the client. + */ + private ClientListenerResponse processBulkLoadFileBatch(JdbcBulkLoadBatchRequest req) { + JdbcBulkLoadProcessor processor = bulkLoadRequests.get(req.queryId()); + + if (ctx == null) + return new JdbcResponse(IgniteQueryErrorCode.UNEXPECTED_OPERATION, "Unknown query ID: " + + req.queryId() + ". Bulk load session may have been reclaimed due to timeout."); + + try { + processor.processBatch(req); + + switch (req.cmd()) { + case CMD_FINISHED_ERROR: + case CMD_FINISHED_EOF: + bulkLoadRequests.remove(req.queryId()); + + processor.close(); + + break; + + case CMD_CONTINUE: + break; + + default: + throw new IllegalArgumentException(); + } + + return new JdbcResponse(new JdbcQueryExecuteResult(req.queryId(), processor.updateCnt())); + } + catch (Exception e) { + U.error(null, "Error processing file batch", e); + return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Server error: " + e); + } + } + /** {@inheritDoc} */ @Override public ClientListenerResponse handleException(Exception e, ClientListenerRequest req) { return exceptionToResult(e); @@ -237,6 +290,17 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { { for (JdbcQueryCursor cursor : qryCursors.values()) cursor.close(); + + for (JdbcBulkLoadProcessor processor : bulkLoadRequests.values()) { + try { + processor.close(); + } + catch (Exception e) { + U.error(null, "Error closing JDBC bulk load processor.", e); + } + } + + bulkLoadRequests.clear(); } finally { busyLock.leaveBusy(); @@ -310,10 +374,22 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { List<FieldsQueryCursor<List<?>>> results = ctx.query().querySqlFields(qry, true, protocolVer.compareTo(VER_2_3_0) < 0); - if (results.size() == 1) { - FieldsQueryCursor<List<?>> qryCur = results.get(0); + FieldsQueryCursor<List<?>> fieldsCur = results.get(0); - JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), (QueryCursorImpl)qryCur); + if (fieldsCur instanceof BulkLoadContextCursor) { + BulkLoadContextCursor blCur = (BulkLoadContextCursor) fieldsCur; + + BulkLoadProcessor blProcessor = blCur.bulkLoadProcessor(); + BulkLoadAckClientParameters clientParams = blCur.clientParams(); + + bulkLoadRequests.put(qryId, new JdbcBulkLoadProcessor(blProcessor)); + + return new JdbcResponse(new JdbcBulkLoadAckResult(qryId, clientParams)); + } + + if (results.size() == 1) { + JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), + (QueryCursorImpl)fieldsCur); JdbcQueryExecuteResult res; @@ -350,8 +426,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { if (qryCur.isQuery()) { jdbcRes = new JdbcResultInfo(true, -1, qryId); - JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), - (QueryCursorImpl)qryCur); + JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), qryCur); qryCursors.put(qryId, cur); @@ -370,8 +445,6 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { return new JdbcResponse(new JdbcQueryExecuteMultipleStatementsResult(jdbcResults, items, last)); } - - } catch (Exception e) { qryCursors.remove(qryId); @@ -534,6 +607,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { List<FieldsQueryCursor<List<?>>> qryRes = ctx.query().querySqlFields(qry, true, true); for (FieldsQueryCursor<List<?>> cur : qryRes) { + if (cur instanceof BulkLoadContextCursor) + throw new IgniteSQLException("COPY command cannot be executed in batch mode."); + assert !((QueryCursorImpl)cur).isQuery(); Iterator<List<?>> it = cur.iterator(); http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java index 623a339..43631e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java @@ -65,6 +65,9 @@ public class JdbcResult implements JdbcRawBinarylizable { /** Columns metadata result V3. */ static final byte META_COLUMNS_V3 = 15; + /** A request to send file from client to server. */ + static final byte BULK_LOAD_ACK = 16; + /** Success status. */ private byte type; @@ -163,6 +166,11 @@ public class JdbcResult implements JdbcRawBinarylizable { break; + case BULK_LOAD_ACK: + res = new JdbcBulkLoadAckResult(); + + break; + default: throw new IgniteException("Unknown SQL listener request ID: [request ID=" + resId + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java index f5b8c3c..0238b01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java @@ -42,6 +42,9 @@ public class SqlKeyword { /** Keyword: BOOL. */ public static final String BOOL = "BOOL"; + /** Keyword: BATCH_SIZE. */ + public static final String BATCH_SIZE = "BATCH_SIZE"; + /** Keyword: BOOLEAN. */ public static final String BOOLEAN = "BOOLEAN"; @@ -54,6 +57,9 @@ public class SqlKeyword { /** Keyword: CHARACTER. */ public static final String CHARACTER = "CHARACTER"; + /** Keyword: COPY. */ + public static final String COPY = "COPY"; + /** Keyword: CREATE. */ public static final String CREATE = "CREATE"; @@ -90,6 +96,12 @@ public class SqlKeyword { /** Keyword: FLOAT8. */ public static final String FLOAT8 = "FLOAT8"; + /** Keyword: FORMAT. */ + public static final String FORMAT = "FORMAT"; + + /** Keyword: FROM. */ + public static final String FROM = "FROM"; + /** Keyword: FULLTEXT. */ public static final String FULLTEXT = "FULLTEXT"; @@ -120,6 +132,9 @@ public class SqlKeyword { /** Keyword: INTEGER. */ public static final String INTEGER = "INTEGER"; + /** Keyword: INTO. */ + public static final String INTO = "INTO"; + /** Keyword: KEY. */ public static final String KEY = "KEY"; http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java index 401ee98..0627def 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java @@ -18,12 +18,14 @@ package org.apache.ignite.internal.sql; import org.apache.ignite.internal.sql.command.SqlAlterTableCommand; +import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand; import org.apache.ignite.internal.sql.command.SqlCommand; import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand; import org.apache.ignite.internal.sql.command.SqlDropIndexCommand; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.sql.SqlKeyword.ALTER; +import static org.apache.ignite.internal.sql.SqlKeyword.COPY; import static org.apache.ignite.internal.sql.SqlKeyword.CREATE; import static org.apache.ignite.internal.sql.SqlKeyword.DROP; import static org.apache.ignite.internal.sql.SqlKeyword.HASH; @@ -103,6 +105,11 @@ public class SqlParser { break; + case COPY: + cmd = processCopy(); + + break; + case ALTER: cmd = processAlter(); } @@ -115,7 +122,7 @@ public class SqlParser { return cmd; } else - throw errorUnexpectedToken(lex, CREATE, DROP, ALTER); + throw errorUnexpectedToken(lex, CREATE, DROP, COPY, ALTER); case QUOTED: case MINUS: @@ -130,6 +137,15 @@ public class SqlParser { } /** + * Processes COPY command. + * + * @return The {@link SqlBulkLoadCommand} command. + */ + private SqlCommand processCopy() { + return new SqlBulkLoadCommand().parse(lex); + } + + /** * Process CREATE keyword. * * @return Command. http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java new file mode 100644 index 0000000..e5246d5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBulkLoadCommand.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.command; + +import org.apache.ignite.internal.processors.bulkload.BulkLoadCsvFormat; +import org.apache.ignite.internal.processors.bulkload.BulkLoadFormat; +import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters; +import org.apache.ignite.internal.sql.SqlKeyword; +import org.apache.ignite.internal.sql.SqlLexer; +import org.apache.ignite.internal.sql.SqlLexerTokenType; +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.ignite.internal.sql.SqlParserUtils.error; +import static org.apache.ignite.internal.sql.SqlParserUtils.parseIdentifier; +import static org.apache.ignite.internal.sql.SqlParserUtils.parseInt; +import static org.apache.ignite.internal.sql.SqlParserUtils.parseQualifiedIdentifier; +import static org.apache.ignite.internal.sql.SqlParserUtils.skipCommaOrRightParenthesis; +import static org.apache.ignite.internal.sql.SqlParserUtils.skipIfMatches; +import static org.apache.ignite.internal.sql.SqlParserUtils.skipIfMatchesKeyword; + +/** + * A parser for a COPY command (called 'bulk load' in the code, since word 'copy' is too generic). + */ +public class SqlBulkLoadCommand implements SqlCommand { + /** Local file name to send from client to server. */ + private String locFileName; + + /** Schema name + table name. */ + private SqlQualifiedName tblQName; + + /** User-specified list of columns. */ + private List<String> cols; + + /** File format. */ + private BulkLoadFormat inputFormat; + + /** Batch size (size of portion of a file sent in each sub-request). */ + private Integer batchSize; + + /** + * Parses the command. + * + * @param lex The lexer. + * @return The parsed command object. + */ + @Override public SqlCommand parse(SqlLexer lex) { + skipIfMatchesKeyword(lex, SqlKeyword.FROM); // COPY keyword is already parsed + + parseFileName(lex); + + parseTableName(lex); + + parseColumns(lex); + + parseFormat(lex); + + parseParameters(lex); + + return this; + } + + /** + * Parses the file name. + * + * @param lex The lexer. + */ + private void parseFileName(SqlLexer lex) { + locFileName = parseIdentifier(lex); + } + + /** + * Parses the schema and table names. + * + * @param lex The lexer. + */ + private void parseTableName(SqlLexer lex) { + skipIfMatchesKeyword(lex, SqlKeyword.INTO); + + tblQName = parseQualifiedIdentifier(lex); + } + + /** + * Parses the list of columns. + * + * @param lex The lexer. + */ + private void parseColumns(SqlLexer lex) { + skipIfMatches(lex, SqlLexerTokenType.PARENTHESIS_LEFT); + + cols = new ArrayList<>(); + + do { + cols.add(parseColumn(lex)); + } + while (!skipCommaOrRightParenthesis(lex)); + } + + /** + * Parses column clause. + * + * @param lex The lexer. + * @return The column name. + */ + private String parseColumn(SqlLexer lex) { + return parseIdentifier(lex); + } + + /** + * Parses the format clause. + * + * @param lex The lexer. + */ + private void parseFormat(SqlLexer lex) { + skipIfMatchesKeyword(lex, SqlKeyword.FORMAT); + + String name = parseIdentifier(lex); + + switch (name.toUpperCase()) { + case BulkLoadCsvFormat.NAME: + BulkLoadCsvFormat fmt = new BulkLoadCsvFormat(); + + // IGNITE-7537 will introduce user-defined values + fmt.lineSeparator(BulkLoadCsvFormat.DEFAULT_LINE_SEPARATOR); + fmt.fieldSeparator(BulkLoadCsvFormat.DEFAULT_FIELD_SEPARATOR); + fmt.quoteChars(BulkLoadCsvFormat.DEFAULT_QUOTE_CHARS); + fmt.commentChars(BulkLoadCsvFormat.DEFAULT_COMMENT_CHARS); + fmt.escapeChars(BulkLoadCsvFormat.DEFAULT_ESCAPE_CHARS); + + inputFormat = fmt; + + break; + + default: + throw error(lex, "Unknown format name: " + name + + ". Currently supported format is " + BulkLoadCsvFormat.NAME); + } + } + + /** + * Parses the optional parameters. + * + * @param lex The lexer. + */ + private void parseParameters(SqlLexer lex) { + while (lex.lookAhead().tokenType() == SqlLexerTokenType.DEFAULT) { + switch (lex.lookAhead().token()) { + case SqlKeyword.BATCH_SIZE: + lex.shift(); + + int sz = parseInt(lex); + + if (!BulkLoadAckClientParameters.isValidBatchSize(sz)) + throw error(lex, BulkLoadAckClientParameters.batchSizeErrorMsg(sz)); + + batchSize = sz; + + break; + + default: + return; + } + } + } + + /** + * Returns the schemaName. + * + * @return schemaName. + */ + @Override public String schemaName() { + return tblQName.schemaName(); + } + + /** {@inheritDoc} */ + @Override public void schemaName(String schemaName) { + tblQName.schemaName(schemaName); + } + + /** + * Returns the table name. + * + * @return The table name + */ + public String tableName() { + return tblQName.name(); + } + + /** + * Sets the table name + * + * @param tblName The table name. + */ + public void tableName(String tblName) { + tblQName.name(tblName); + } + + /** + * Returns the local file name. + * + * @return The local file name. + */ + public String localFileName() { + return locFileName; + } + + /** + * Sets the local file name. + * + * @param locFileName The local file name. + */ + public void localFileName(String locFileName) { + this.locFileName = locFileName; + } + + /** + * Returns the list of columns. + * + * @return The list of columns. + */ + public List<String> columns() { + return cols; + } + + /** + * Returns the input file format. + * + * @return The input file format. + */ + public BulkLoadFormat inputFormat() { + return inputFormat; + } + + /** + * Returns the batch size. + * + * @return The batch size. + */ + public Integer batchSize() { + return batchSize; + } + + /** + * Sets the batch size. + * + * @param batchSize The batch size. + */ + public void batchSize(int batchSize) { + this.batchSize = batchSize; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SqlBulkLoadCommand.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java new file mode 100644 index 0000000..b5cd55b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserBulkLoadSelfTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql; + +/** + * Tests for SQL parser: COPY command. + */ +public class SqlParserBulkLoadSelfTest extends SqlParserAbstractSelfTest { + /** + * Tests for COPY command. + * + * @throws Exception If any of sub-tests was failed. + */ + public void testCopy() { + assertParseError(null, + "copy grom \"any.file\" into Person (_key, age, firstName, lastName) format csv", + "Unexpected token: \"GROM\" (expected: \"FROM\")"); + + assertParseError(null, + "copy from into Person (_key, age, firstName, lastName) format csv", + "Unexpected token: \"INTO\" (expected: \"[identifier]\""); + + assertParseError(null, + "copy from any.file into Person (_key, age, firstName, lastName) format csv", + "Unexpected token: \".\" (expected: \"INTO\""); + + assertParseError(null, + "copy from \"any.file\" to Person (_key, age, firstName, lastName) format csv", + "Unexpected token: \"TO\" (expected: \"INTO\")"); + + // Column list + + assertParseError(null, + "copy from \"any.file\" into Person () format csv", + "Unexpected token: \")\" (expected: \"[identifier]\")"); + + assertParseError(null, + "copy from \"any.file\" into Person (,) format csv", + "Unexpected token: \",\" (expected: \"[identifier]\")"); + + assertParseError(null, + "copy from \"any.file\" into Person format csv", + "Unexpected token: \"FORMAT\" (expected: \"(\")"); + + // FORMAT + + assertParseError(null, + "copy from \"any.file\" into Person (_key, age, firstName, lastName)", + "Unexpected end of command (expected: \"FORMAT\")"); + + assertParseError(null, + "copy from \"any.file\" into Person (_key, age, firstName, lastName) format lsd", + "Unknown format name: LSD"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index 72e80e2..c46c906 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -39,8 +39,15 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.BulkLoadContextCursor; +import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter; +import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor; +import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters; +import org.apache.ignite.internal.processors.bulkload.BulkLoadParser; +import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter; import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; @@ -58,8 +65,12 @@ import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils; import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser; +import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand; +import org.apache.ignite.internal.sql.command.SqlCommand; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; +import org.apache.ignite.internal.util.lang.IgniteClosureX; import org.apache.ignite.internal.util.lang.IgniteSingletonIterator; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T3; @@ -967,6 +978,67 @@ public class DmlStatementsProcessor { return updateSqlFields(schemaName, c, GridSqlQueryParser.prepared(stmt), fldsQry, local, filter, cancel); } + /** + * Runs a DML statement for which we have internal command executor. + * + * @param sql The SQL command text to execute. + * @param cmd The command to execute. + * @return The cursor returned by the statement. + * @throws IgniteSQLException If failed. + */ + public FieldsQueryCursor<List<?>> runNativeDmlStatement(String sql, SqlCommand cmd) { + try { + if (cmd instanceof SqlBulkLoadCommand) + return processBulkLoadCommand((SqlBulkLoadCommand)cmd); + else + throw new IgniteSQLException("Unsupported DML operation: " + sql, + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + + } + catch (IgniteSQLException e) { + throw e; + } + catch (Exception e) { + throw new IgniteSQLException("Unexpected DML operation failure: " + e.getMessage(), e); + } + } + + /** + * Process bulk load COPY command. + * + * @param cmd The command. + * @return The context (which is the result of the first request/response). + * @throws IgniteCheckedException If something failed. + */ + public FieldsQueryCursor<List<?>> processBulkLoadCommand(SqlBulkLoadCommand cmd) throws IgniteCheckedException { + if (cmd.batchSize() == null) + cmd.batchSize(BulkLoadAckClientParameters.DEFAULT_BATCH_SIZE); + + GridH2Table tbl = idx.dataTable(cmd.schemaName(), cmd.tableName()); + + if (tbl == null) + throw new IgniteSQLException("Table does not exist: " + cmd.tableName(), + IgniteQueryErrorCode.TABLE_NOT_FOUND); + + UpdatePlan plan = UpdatePlanBuilder.planForBulkLoad(cmd, tbl); + + IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter = new BulkLoadDataConverter(plan); + + GridCacheContext cache = tbl.cache(); + + IgniteDataStreamer<Object, Object> streamer = cache.grid().dataStreamer(cache.name()); + + BulkLoadCacheWriter outputWriter = new BulkLoadStreamerWriter(streamer); + + BulkLoadParser inputParser = BulkLoadParser.createParser(cmd.inputFormat()); + + BulkLoadProcessor processor = new BulkLoadProcessor(inputParser, dataConverter, outputWriter); + + BulkLoadAckClientParameters params = new BulkLoadAckClientParameters(cmd.localFileName(), cmd.batchSize()); + + return new BulkLoadContextCursor(processor, params); + } + /** */ private final static class InsertEntryProcessor implements EntryProcessor<Object, Object, Boolean> { /** Value to set. */ @@ -1081,4 +1153,31 @@ public class DmlStatementsProcessor { } } + /** + * Converts a row of values to actual key+value using {@link UpdatePlan#processRow(List)}. + */ + private static class BulkLoadDataConverter extends IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> { + /** Update plan to convert incoming rows. */ + private final UpdatePlan plan; + + /** + * Creates the converter with the given update plan. + * + * @param plan The update plan to use. + */ + private BulkLoadDataConverter(UpdatePlan plan) { + this.plan = plan; + } + + /** + * Converts the record to a key+value. + * + * @param record The record to convert. + * @return The key+value. + * @throws IgniteCheckedException If conversion failed for some reason. + */ + @Override public IgniteBiTuple<?, ?> applyx(List<?> record) throws IgniteCheckedException { + return plan.processRow(record); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 96b8935..06c936b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; import javax.cache.Cache; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; @@ -99,7 +100,6 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO; import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO; import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor; import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils; -import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode; import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine; import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; import org.apache.ignite.internal.processors.query.h2.opt.GridH2PlainRowFactory; @@ -120,6 +120,7 @@ import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisito import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.sql.SqlParseException; import org.apache.ignite.internal.sql.SqlParser; +import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand; import org.apache.ignite.internal.sql.command.SqlAlterTableCommand; import org.apache.ignite.internal.sql.command.SqlCommand; import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand; @@ -190,6 +191,9 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType */ @SuppressWarnings({"UnnecessaryFullyQualifiedName", "NonFinalStaticVariableUsedInClassInitialization"}) public class IgniteH2Indexing implements GridQueryIndexing { + public static final Pattern INTERNAL_CMD_RE = Pattern.compile( + "^(create|drop)\\s+index|^alter\\s+table|^copy", Pattern.CASE_INSENSITIVE); + /* * Register IO for indexes. */ @@ -1437,9 +1441,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { */ private List<FieldsQueryCursor<List<?>>> tryQueryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry) { // Heuristic check for fast return. - String sqlUpper = qry.getSql().toUpperCase(); - - if (!(sqlUpper.contains("INDEX") || sqlUpper.contains("ALTER"))) + if (!INTERNAL_CMD_RE.matcher(qry.getSql().trim()).find()) return null; // Parse. @@ -1454,9 +1456,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (parser.nextCommand() != null) return null; - // Only CREATE/DROP INDEX and ALTER TABLE commands are supported for now. + // Currently supported commands are: CREATE/DROP INDEX/COPY/ALTER TABLE if (!(cmd instanceof SqlCreateIndexCommand || cmd instanceof SqlDropIndexCommand || - cmd instanceof SqlAlterTableCommand)) + cmd instanceof SqlBulkLoadCommand || cmd instanceof SqlAlterTableCommand)) return null; } catch (Exception e) { @@ -1472,17 +1474,26 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (e instanceof SqlParseException) code = ((SqlParseException)e).code(); - throw new IgniteSQLException("Failed to parse DDL statement: " + qry.getSql(), code, e); + throw new IgniteSQLException("Failed to parse DDL statement: " + qry.getSql() + ": " + e.getMessage(), + code, e); } // Execute. - try { - FieldsQueryCursor<List<?>> res = ddlProc.runDdlStatement(qry.getSql(), cmd); + if (cmd instanceof SqlBulkLoadCommand) { + FieldsQueryCursor<List<?>> cursor = dmlProc.runNativeDmlStatement(qry.getSql(), cmd); - return Collections.singletonList(res); + return Collections.singletonList(cursor); } - catch (IgniteCheckedException e) { - throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.getSql() + ']', e); + else { + try { + FieldsQueryCursor<List<?>> cursor = ddlProc.runDdlStatement(qry.getSql(), cmd); + + return Collections.singletonList(cursor); + } + catch (IgniteCheckedException e) { + throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.getSql() + "]: " + + e.getMessage(), e); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java index ca7680a..6f5b51f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.sql.command.SqlDropIndexCommand; import org.apache.ignite.internal.sql.command.SqlIndexColumn; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.h2.command.Prepared; import org.h2.command.ddl.AlterTableAlterColumn; import org.h2.command.ddl.CreateIndex; @@ -483,6 +484,7 @@ public class DdlStatementsProcessor { return resCur; } catch (SchemaOperationException e) { + U.error(null, "DDL operation failure", e); throw convert(e); } catch (IgniteSQLException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdateMode.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdateMode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdateMode.java index 0440648..d9c627a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdateMode.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdateMode.java @@ -22,15 +22,18 @@ package org.apache.ignite.internal.processors.query.h2.dml; * or UPDATE/DELETE from subquery or literals/params based. */ public enum UpdateMode { - /** */ + /** MERGE command. */ MERGE, - /** */ + /** INSERT command. */ INSERT, - /** */ + /** UPDATE command. */ UPDATE, - /** */ + /** DELETE command. */ DELETE, + + /** COPY command. */ + BULK_LOAD } http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java index 17dc9d1..10d485a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java @@ -39,6 +39,7 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.h2.table.Column; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.processors.query.h2.dml.UpdateMode.BULK_LOAD; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT; /** @@ -182,6 +183,10 @@ public final class UpdatePlan { * @throws IgniteCheckedException if failed. */ public IgniteBiTuple<?, ?> processRow(List<?> row) throws IgniteCheckedException { + if (mode != BULK_LOAD && row.size() != colNames.length) + throw new IgniteSQLException("Not enough values in a row: " + row.size() + " instead of " + colNames.length, + IgniteQueryErrorCode.ENTRY_PROCESSING); + GridH2RowDescriptor rowDesc = tbl.rowDescriptor(); GridQueryTypeDescriptor desc = rowDesc.type(); @@ -205,7 +210,8 @@ public final class UpdatePlan { if (key == null) { if (F.isEmpty(desc.keyFieldName())) - throw new IgniteSQLException("Key for INSERT or MERGE must not be null", IgniteQueryErrorCode.NULL_KEY); + throw new IgniteSQLException("Key for INSERT, COPY, or MERGE must not be null", + IgniteQueryErrorCode.NULL_KEY); else throw new IgniteSQLException("Null value is not allowed for column '" + desc.keyFieldName() + "'", IgniteQueryErrorCode.NULL_KEY); @@ -213,16 +219,18 @@ public final class UpdatePlan { if (val == null) { if (F.isEmpty(desc.valueFieldName())) - throw new IgniteSQLException("Value for INSERT, MERGE, or UPDATE must not be null", + throw new IgniteSQLException("Value for INSERT, COPY, MERGE, or UPDATE must not be null", IgniteQueryErrorCode.NULL_VALUE); else throw new IgniteSQLException("Null value is not allowed for column '" + desc.valueFieldName() + "'", IgniteQueryErrorCode.NULL_VALUE); } + int actualColCnt = Math.min(colNames.length, row.size()); + Map<String, Object> newColVals = new HashMap<>(); - for (int i = 0; i < colNames.length; i++) { + for (int i = 0; i < actualColCnt; i++) { if (i == keyColIdx || i == valColIdx) continue; @@ -241,14 +249,14 @@ public final class UpdatePlan { // We update columns in the order specified by the table for a reason - table's // column order preserves their precedence for correct update of nested properties. - Column[] cols = tbl.getColumns(); + Column[] tblCols = tbl.getColumns(); // First 3 columns are _key, _val and _ver. Skip 'em. - for (int i = DEFAULT_COLUMNS_COUNT; i < cols.length; i++) { + for (int i = DEFAULT_COLUMNS_COUNT; i < tblCols.length; i++) { if (tbl.rowDescriptor().isKeyValueOrVersionColumn(i)) continue; - String colName = cols[i].getName(); + String colName = tblCols[i].getName(); if (!newColVals.containsKey(colName)) continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java index 3305b00..bced836 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUpdate; +import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -407,6 +408,91 @@ public final class UpdatePlanBuilder { } /** + * Prepare update plan for COPY command (AKA bulk load). + * + * @param cmd Bulk load command + * @return The update plan for this command. + * @throws IgniteCheckedException if failed. + */ + @SuppressWarnings("ConstantConditions") + public static UpdatePlan planForBulkLoad(SqlBulkLoadCommand cmd, GridH2Table tbl) throws IgniteCheckedException { + GridH2RowDescriptor desc = tbl.rowDescriptor(); + + if (desc == null) + throw new IgniteSQLException("Row descriptor undefined for table '" + tbl.getName() + "'", + IgniteQueryErrorCode.NULL_TABLE_DESCRIPTOR); + + GridCacheContext<?, ?> cctx = desc.context(); + + List<String> cols = cmd.columns(); + + if (cols == null) + throw new IgniteSQLException("Columns are not defined", IgniteQueryErrorCode.NULL_TABLE_DESCRIPTOR); + + String[] colNames = new String[cols.size()]; + + int[] colTypes = new int[cols.size()]; + + int keyColIdx = -1; + int valColIdx = -1; + + boolean hasKeyProps = false; + boolean hasValProps = false; + + for (int i = 0; i < cols.size(); i++) { + String colName = cols.get(i); + + colNames[i] = colName; + + Column h2Col = tbl.getColumn(colName); + + colTypes[i] = h2Col.getType(); + int colId = h2Col.getColumnId(); + + if (desc.isKeyColumn(colId)) { + keyColIdx = i; + continue; + } + + if (desc.isValueColumn(colId)) { + valColIdx = i; + continue; + } + + GridQueryProperty prop = desc.type().property(colName); + + assert prop != null : "Property '" + colName + "' not found."; + + if (prop.key()) + hasKeyProps = true; + else + hasValProps = true; + } + + KeyValueSupplier keySupplier = createSupplier(cctx, desc.type(), keyColIdx, hasKeyProps, + true, false); + KeyValueSupplier valSupplier = createSupplier(cctx, desc.type(), valColIdx, hasValProps, + false, false); + + return new UpdatePlan( + UpdateMode.BULK_LOAD, + tbl, + colNames, + colTypes, + keySupplier, + valSupplier, + keyColIdx, + valColIdx, + null, + true, + null, + 0, + null, + null + ); + } + + /** * Detect appropriate method of instantiating key or value (take from param, create binary builder, * invoke default ctor, or allocate). * http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 68610a1..6295d8d 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -165,6 +165,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest; import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryDistributedJoinsTest; import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest; import org.apache.ignite.internal.processors.sql.SqlConnectorConfigurationValidationSelfTest; +import org.apache.ignite.internal.sql.SqlParserBulkLoadSelfTest; import org.apache.ignite.internal.sql.SqlParserCreateIndexSelfTest; import org.apache.ignite.internal.sql.SqlParserDropIndexSelfTest; import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest; @@ -183,6 +184,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(SqlParserCreateIndexSelfTest.class); suite.addTestSuite(SqlParserDropIndexSelfTest.class); + suite.addTestSuite(SqlParserBulkLoadSelfTest.class); suite.addTestSuite(SqlConnectorConfigurationValidationSelfTest.class); suite.addTestSuite(ClientConnectorConfigurationValidationSelfTest.class);