IGNITE-6917: Implemented SQL COPY command This closes #3419
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/25d38cc9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/25d38cc9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/25d38cc9 Branch: refs/heads/ignite-7485-2 Commit: 25d38cc98c4ca098679d69ad90fc8bed66e6916d Parents: faf50f1 Author: gg-shq <kshiro...@gridgain.com> Authored: Wed Feb 7 14:28:04 2018 +0300 Committer: Igor Sapego <isap...@gridgain.com> Committed: Wed Feb 7 14:30:39 2018 +0300 ---------------------------------------------------------------------- .../internal/jdbc2/JdbcBulkLoadSelfTest.java | 185 ++++++ .../ignite/jdbc/JdbcErrorsAbstractSelfTest.java | 2 +- .../jdbc/suite/IgniteJdbcDriverTestSuite.java | 14 + .../thin/JdbcThinBulkLoadAbstractSelfTest.java | 601 +++++++++++++++++++ ...inBulkLoadAtomicPartitionedNearSelfTest.java | 39 ++ ...bcThinBulkLoadAtomicPartitionedSelfTest.java | 39 ++ ...dbcThinBulkLoadAtomicReplicatedSelfTest.java | 39 ++ ...oadTransactionalPartitionedNearSelfTest.java | 39 ++ ...ulkLoadTransactionalPartitionedSelfTest.java | 39 ++ ...BulkLoadTransactionalReplicatedSelfTest.java | 39 ++ .../JdbcThinDynamicIndexAbstractSelfTest.java | 1 - .../clients/src/test/resources/bulkload0.csv | 0 .../clients/src/test/resources/bulkload1.csv | 1 + .../clients/src/test/resources/bulkload2.csv | 2 + .../src/test/resources/bulkload2_utf.csv | 2 + .../cache/query/BulkLoadContextCursor.java | 97 +++ .../internal/jdbc/thin/JdbcThinStatement.java | 68 ++- .../ignite/internal/jdbc2/JdbcQueryTask.java | 12 +- .../bulkload/BulkLoadAckClientParameters.java | 92 +++ .../bulkload/BulkLoadCacheWriter.java | 31 + .../processors/bulkload/BulkLoadCsvFormat.java | 159 +++++ .../processors/bulkload/BulkLoadCsvParser.java | 65 ++ .../processors/bulkload/BulkLoadFormat.java | 33 + .../processors/bulkload/BulkLoadParser.java | 61 ++ .../processors/bulkload/BulkLoadProcessor.java | 104 ++++ .../bulkload/BulkLoadStreamerWriter.java | 65 ++ .../bulkload/pipeline/CharsetDecoderBlock.java | 132 ++++ .../pipeline/CsvLineProcessorBlock.java | 70 +++ .../bulkload/pipeline/LineSplitterBlock.java | 72 +++ .../bulkload/pipeline/PipelineBlock.java | 66 ++ .../bulkload/pipeline/StrListAppenderBlock.java | 52 ++ .../odbc/jdbc/JdbcBulkLoadAckResult.java | 111 ++++ .../odbc/jdbc/JdbcBulkLoadBatchRequest.java | 183 ++++++ .../odbc/jdbc/JdbcBulkLoadProcessor.java | 144 +++++ .../processors/odbc/jdbc/JdbcRequest.java | 7 + .../odbc/jdbc/JdbcRequestHandler.java | 90 ++- .../processors/odbc/jdbc/JdbcResult.java | 8 + .../apache/ignite/internal/sql/SqlKeyword.java | 15 + .../apache/ignite/internal/sql/SqlParser.java | 18 +- .../sql/command/SqlBulkLoadCommand.java | 273 +++++++++ .../internal/sql/SqlParserBulkLoadSelfTest.java | 70 +++ .../query/h2/DmlStatementsProcessor.java | 99 +++ .../processors/query/h2/IgniteH2Indexing.java | 35 +- .../query/h2/ddl/DdlStatementsProcessor.java | 2 + .../processors/query/h2/dml/UpdateMode.java | 11 +- .../processors/query/h2/dml/UpdatePlan.java | 20 +- .../query/h2/dml/UpdatePlanBuilder.java | 86 +++ .../IgniteCacheQuerySelfTestSuite.java | 2 + parent/pom.xml | 3 +- 49 files changed, 3361 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBulkLoadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBulkLoadSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBulkLoadSelfTest.java new file mode 100644 index 0000000..d9506cf --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBulkLoadSelfTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.ConnectorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.Callable; + +import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** COPY command test for the regular JDBC driver. */ +public class JdbcBulkLoadSelfTest extends GridCommonAbstractTest { + /** JDBC URL. */ + private static final String BASE_URL = CFG_URL_PREFIX + + "cache=default@modules/clients/src/test/config/jdbc-config.xml"; + + /** Connection. */ + protected Connection conn; + + /** The logger. */ + protected transient IgniteLogger log; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + return getConfiguration0(gridName); + } + + /** + * @param gridName Grid name. + * @return Grid configuration used for starting the grid. + * @throws Exception If failed. + */ + private IgniteConfiguration getConfiguration0(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration<?,?> cache = defaultCacheConfiguration(); + + cache.setCacheMode(PARTITIONED); + cache.setBackups(1); + cache.setWriteSynchronizationMode(FULL_SYNC); + cache.setIndexedTypes( + Integer.class, Person.class + ); + + cfg.setCacheConfiguration(cache); + cfg.setLocalHost("127.0.0.1"); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + ipFinder.setAddresses(Collections.singleton("127.0.0.1:47500..47501")); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setConnectorConfiguration(new ConnectorConfiguration()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrids(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * Establishes the JDBC connection. + * + * @return Connection to use for the test. + * @throws Exception if failed. + */ + private Connection createConnection() throws Exception { + Properties props = new Properties(); + + return DriverManager.getConnection(BASE_URL, props); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + U.closeQuiet(conn); + + ignite(0).cache(DEFAULT_CACHE_NAME).clear(); + + super.afterTest(); + } + + /** + * This is more a placeholder for implementation of IGNITE-7553. + * + * @throws Exception if failed. + */ + public void testBulkLoadThrows() throws Exception { + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + conn = createConnection(); + + try (Statement stmt = conn.createStatement()) { + stmt.executeUpdate("copy from \"dummy.csv\" into Person" + + " (_key, id, firstName, lastName) format csv"); + + return null; + } + } + }, SQLException.class, "COPY command is currently supported only in thin JDBC driver."); + } + + /** + * A test class for creating a query entity. + */ + @SuppressWarnings("UnusedDeclaration") + private static class Person implements Serializable { + /** ID. */ + @QuerySqlField + private final int id; + + /** First name. */ + @QuerySqlField(index = false) + private final String firstName; + + /** Last name. */ + @QuerySqlField(index = false) + private final String lastName; + + /** Age. */ + @QuerySqlField + private final int age; + + /** + * @param id ID. + * @param firstName First name + * @param lastName Last name + * @param age Age. + */ + private Person(int id, String firstName, String lastName, int age) { + assert !F.isEmpty(firstName); + assert !F.isEmpty(lastName); + assert age > 0; + + this.id = id; + this.firstName = firstName; + this.lastName = lastName; + this.age = age; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java index 49746b6..2059408 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java @@ -107,7 +107,7 @@ public abstract class JdbcErrorsAbstractSelfTest extends GridCommonAbstractTest */ public void testDmlErrors() throws SQLException { checkErrorState("INSERT INTO \"test\".INTEGER(_key, _val) values(1, null)", "22004", - "Value for INSERT, MERGE, or UPDATE must not be null"); + "Value for INSERT, COPY, MERGE, or UPDATE must not be null"); checkErrorState("INSERT INTO \"test\".INTEGER(_key, _val) values(1, 'zzz')", "0700B", "Value conversion failed [from=java.lang.String, to=java.lang.Integer]"); http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java index ff4d69f..656e218 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java @@ -35,6 +35,12 @@ import org.apache.ignite.jdbc.JdbcResultSetSelfTest; import org.apache.ignite.jdbc.JdbcStatementSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinAutoCloseServerCursorTest; import org.apache.ignite.jdbc.thin.JdbcThinBatchSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadAtomicPartitionedNearSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadAtomicPartitionedSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadAtomicReplicatedSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadTransactionalPartitionedNearSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadTransactionalPartitionedSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadTransactionalReplicatedSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinComplexDmlDdlSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinComplexQuerySelfTest; import org.apache.ignite.jdbc.thin.JdbcThinConnectionSelfTest; @@ -154,6 +160,14 @@ public class IgniteJdbcDriverTestSuite extends TestSuite { suite.addTest(new TestSuite(JdbcThinDynamicIndexTransactionalPartitionedSelfTest.class)); suite.addTest(new TestSuite(JdbcThinDynamicIndexTransactionalReplicatedSelfTest.class)); + // New thin JDBC driver, DML tests + suite.addTest(new TestSuite(JdbcThinBulkLoadAtomicPartitionedNearSelfTest.class)); + suite.addTest(new TestSuite(JdbcThinBulkLoadAtomicPartitionedSelfTest.class)); + suite.addTest(new TestSuite(JdbcThinBulkLoadAtomicReplicatedSelfTest.class)); + suite.addTest(new TestSuite(JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.class)); + suite.addTest(new TestSuite(JdbcThinBulkLoadTransactionalPartitionedSelfTest.class)); + suite.addTest(new TestSuite(JdbcThinBulkLoadTransactionalReplicatedSelfTest.class)); + // New thin JDBC driver, full SQL tests suite.addTest(new TestSuite(JdbcThinComplexDmlDdlSelfTest.class)); http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java new file mode 100644 index 0000000..761f700 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.testframework.GridTestUtils; + +import java.sql.BatchUpdateException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; +import java.util.concurrent.Callable; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath; + +/** + * COPY statement tests. + */ +public abstract class JdbcThinBulkLoadAbstractSelfTest extends JdbcThinAbstractDmlStatementSelfTest { + /** Default table name. */ + private static final String TBL_NAME = "Person"; + + /** JDBC statement. */ + private Statement stmt; + + /** A CSV file with zero records */ + private static final String BULKLOAD_EMPTY_CSV_FILE = + Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload0.csv")) + .getAbsolutePath(); + + /** A CSV file with one record. */ + private static final String BULKLOAD_ONE_LINE_CSV_FILE = + Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload1.csv")) + .getAbsolutePath(); + + /** A CSV file with two records. */ + private static final String BULKLOAD_TWO_LINES_CSV_FILE = + Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload2.csv")) + .getAbsolutePath(); + + /** A file with UTF records. */ + private static final String BULKLOAD_UTF_CSV_FILE = + Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload2_utf.csv")) + .getAbsolutePath(); + + /** Basic COPY statement used in majority of the tests. */ + public static final String BASIC_SQL_COPY_STMT = + "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\"" + + " into " + TBL_NAME + + " (_key, age, firstName, lastName)" + + " format csv"; + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfig() { + return cacheConfigWithIndexedTypes(); + } + + /** + * Creates cache configuration with {@link QueryEntity} created + * using {@link CacheConfiguration#setIndexedTypes(Class[])} call. + * + * @return The cache configuration. + */ + @SuppressWarnings("unchecked") + private CacheConfiguration cacheConfigWithIndexedTypes() { + CacheConfiguration<?,?> cache = defaultCacheConfiguration(); + + cache.setCacheMode(cacheMode()); + cache.setAtomicityMode(atomicityMode()); + cache.setWriteSynchronizationMode(FULL_SYNC); + + if (cacheMode() == PARTITIONED) + cache.setBackups(1); + + if (nearCache()) + cache.setNearConfiguration(new NearCacheConfiguration()); + + cache.setIndexedTypes( + String.class, Person.class + ); + + return cache; + } + + /** + * Returns true if we are testing near cache. + * + * @return true if we are testing near cache. + */ + protected abstract boolean nearCache(); + + /** + * Returns cache atomicity mode we are testing. + * + * @return The cache atomicity mode we are testing. + */ + protected abstract CacheAtomicityMode atomicityMode(); + + /** + * Returns cache mode we are testing. + * + * @return The cache mode we are testing. + */ + protected abstract CacheMode cacheMode(); + + /** + * Creates cache configuration with {@link QueryEntity} created + * using {@link CacheConfiguration#setQueryEntities(Collection)} call. + * + * @return The cache configuration. + */ + private CacheConfiguration cacheConfigWithQueryEntity() { + CacheConfiguration<?,?> cache = defaultCacheConfiguration(); + + cache.setCacheMode(PARTITIONED); + cache.setBackups(1); + cache.setWriteSynchronizationMode(FULL_SYNC); + + QueryEntity e = new QueryEntity(); + + e.setKeyType(String.class.getName()); + e.setValueType("Person"); + + e.addQueryField("id", Integer.class.getName(), null); + e.addQueryField("age", Integer.class.getName(), null); + e.addQueryField("firstName", String.class.getName(), null); + e.addQueryField("lastName", String.class.getName(), null); + + cache.setQueryEntities(Collections.singletonList(e)); + + return cache; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + System.setProperty(IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK, "TRUE"); + + stmt = conn.createStatement(); + + assertNotNull(stmt); + assertFalse(stmt.isClosed()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + if (stmt != null && !stmt.isClosed()) + stmt.close(); + + assertTrue(stmt.isClosed()); + + System.clearProperty(IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK); + + super.afterTest(); + } + + /** + * Dead-on-arrival test. Imports two-entry CSV file into a table and checks + * the created entries using SELECT statement. + * + * @throws SQLException If failed. + */ + public void testBasicStatement() throws SQLException { + int updatesCnt = stmt.executeUpdate(BASIC_SQL_COPY_STMT); + + assertEquals(2, updatesCnt); + + checkCacheContents(TBL_NAME, true, 2); + } + + /** + * Imports two-entry CSV file with UTF-8 characters into a table and checks + * the created entries using SELECT statement. + * + * @throws SQLException If failed. + */ + public void testUtf() throws SQLException { + int updatesCnt = stmt.executeUpdate( + "copy from \"" + BULKLOAD_UTF_CSV_FILE + "\" into " + TBL_NAME + + " (_key, age, firstName, lastName)" + + " format csv"); + + assertEquals(2, updatesCnt); + + checkUtfCacheContents(TBL_NAME, true, 2); + } + + /** + * Imports two-entry CSV file with UTF-8 characters into a table using batch size of one byte + * (thus splitting each two-byte UTF-8 character into two batches) + * and checks the created entries using SELECT statement. + * + * @throws SQLException If failed. + */ + public void testUtfBatchSize_1() throws SQLException { + int updatesCnt = stmt.executeUpdate( + "copy from \"" + BULKLOAD_UTF_CSV_FILE + "\" into " + TBL_NAME + + " (_key, age, firstName, lastName)" + + " format csv batch_size 1"); + + assertEquals(2, updatesCnt); + + checkUtfCacheContents(TBL_NAME, true, 2); + } + + /** + * Imports one-entry CSV file into a table and checks the entry created using SELECT statement. + * + * @throws SQLException If failed. + */ + public void testOneLineFile() throws SQLException { + int updatesCnt = stmt.executeUpdate( + "copy from \"" + BULKLOAD_ONE_LINE_CSV_FILE + "\" into " + TBL_NAME + + " (_key, age, firstName, lastName)" + + " format csv"); + + assertEquals(1, updatesCnt); + + checkCacheContents(TBL_NAME, true, 1); + } + + /** + * Imports zero-entry CSV file into a table and checks that no entries are created + * using SELECT statement. + * + * @throws SQLException If failed. + */ + public void testEmptyFile() throws SQLException { + int updatesCnt = stmt.executeUpdate( + "copy from \"" + BULKLOAD_EMPTY_CSV_FILE + "\" into " + TBL_NAME + + " (_key, age, firstName, lastName)" + + " format csv"); + + assertEquals(0, updatesCnt); + + checkCacheContents(TBL_NAME, true, 0); + } + + /** + * Checks that error is reported for a non-existent file. + */ + public void testWrongFileName() { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + stmt.executeUpdate( + "copy from \"nonexistent\" into Person" + + " (_key, age, firstName, lastName)" + + " format csv"); + + return null; + } + }, SQLException.class, "Failed to read file: 'nonexistent'"); + } + + /** + * Checks that error is reported if the destination table is missing. + */ + public void testMissingTable() { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + stmt.executeUpdate( + "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into Peterson" + + " (_key, age, firstName, lastName)" + + " format csv"); + + return null; + } + }, SQLException.class, "Table does not exist: PETERSON"); + } + + /** + * Checks that error is reported when a non-existing column is specified in the SQL command. + */ + public void testWrongColumnName() { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + stmt.executeUpdate( + "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into Person" + + " (_key, age, firstName, lostName)" + + " format csv"); + + return null; + } + }, SQLException.class, "Column \"LOSTNAME\" not found"); + } + + /** + * Checks that error is reported if field read from CSV file cannot be converted to the type of the column. + */ + public void testWrongColumnType() { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + stmt.executeUpdate( + "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into Person" + + " (_key, firstName, age, lastName)" + + " format csv"); + + return null; + } + }, SQLException.class, "Value conversion failed [from=java.lang.String, to=java.lang.Integer]"); + } + + /** + * Checks that if even a subset of fields is imported, the imported fields are set correctly. + * + * @throws SQLException If failed. + */ + public void testFieldsSubset() throws SQLException { + int updatesCnt = stmt.executeUpdate( + "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into " + TBL_NAME + + " (_key, age, firstName)" + + " format csv"); + + assertEquals(2, updatesCnt); + + checkCacheContents(TBL_NAME, false, 2); + } + + /** + * Checks that bulk load works when we create table using 'CREATE TABLE' command. + * + * The majority of the tests in this class use {@link CacheConfiguration#setIndexedTypes(Class[])} + * to create the table. + * + * @throws SQLException If failed. + */ + public void testCreateAndBulkLoadTable() throws SQLException { + String tblName = QueryUtils.DFLT_SCHEMA + ".\"PersonTbl\""; + + execute(conn, "create table " + tblName + + " (id int primary key, age int, firstName varchar(30), lastName varchar(30))"); + + int updatesCnt = stmt.executeUpdate( + "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into " + tblName + + "(_key, age, firstName, lastName)" + + " format csv"); + + assertEquals(2, updatesCnt); + + checkCacheContents(tblName, true, 2); + } + + /** + * Checks that bulk load works when we create table with {@link CacheConfiguration#setQueryEntities(Collection)}. + * + * The majority of the tests in this class use {@link CacheConfiguration#setIndexedTypes(Class[])} + * to create a table. + * + * @throws SQLException If failed. + */ + @SuppressWarnings("unchecked") + public void testConfigureQueryEntityAndBulkLoad() throws SQLException { + ignite(0).getOrCreateCache(cacheConfigWithQueryEntity()); + + int updatesCnt = stmt.executeUpdate(BASIC_SQL_COPY_STMT); + + assertEquals(2, updatesCnt); + + checkCacheContents(TBL_NAME, true, 2); + } + + /** + * Checks that bulk load works when we use batch size of 1 byte and thus + * create multiple batches per COPY. + * + * @throws SQLException If failed. + */ + public void testBatchSize_1() throws SQLException { + int updatesCnt = stmt.executeUpdate(BASIC_SQL_COPY_STMT + " batch_size 1"); + + assertEquals(2, updatesCnt); + + checkCacheContents(TBL_NAME, true, 2); + } + + /** + * Verifies exception thrown if COPY is added into a batch. + * + * @throws SQLException If failed. + */ + public void testMultipleStatement() throws SQLException { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + stmt.addBatch(BASIC_SQL_COPY_STMT); + + stmt.addBatch("copy from \"" + BULKLOAD_ONE_LINE_CSV_FILE + "\" into " + TBL_NAME + + " (_key, age, firstName, lastName)" + + " format csv"); + + stmt.addBatch("copy from \"" + BULKLOAD_UTF_CSV_FILE + "\" into " + TBL_NAME + + " (_key, age, firstName, lastName)" + + " format csv"); + + stmt.executeBatch(); + + return null; + } + }, BatchUpdateException.class, "COPY command cannot be executed in batch mode."); + } + + /** + * Verifies that COPY command is rejected by Statement.executeQuery(). + * + * @throws SQLException If failed. + */ + public void testExecuteQuery() throws SQLException { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + stmt.executeQuery(BASIC_SQL_COPY_STMT); + + return null; + } + }, SQLException.class, "The query isn't SELECT query"); + } + + /** + * Verifies that COPY command works in Statement.execute(). + * + * @throws SQLException If failed. + */ + public void testExecute() throws SQLException { + boolean isRowSet = stmt.execute(BASIC_SQL_COPY_STMT); + + assertFalse(isRowSet); + + checkCacheContents(TBL_NAME, true, 2); + } + + /** + * Verifies that COPY command can be called with PreparedStatement.executeUpdate(). + * + * @throws SQLException If failed. + */ + public void testPreparedStatementWithExecuteUpdate() throws SQLException { + PreparedStatement pstmt = conn.prepareStatement(BASIC_SQL_COPY_STMT); + + int updatesCnt = pstmt.executeUpdate(); + + assertEquals(2, updatesCnt); + + checkCacheContents(TBL_NAME, true, 2); + } + + /** + * Verifies that COPY command reports an error when used with PreparedStatement parameter. + * + * @throws SQLException If failed. + */ + public void testPreparedStatementWithParameter() throws SQLException { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + PreparedStatement pstmt = conn.prepareStatement( + "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into " + TBL_NAME + + " (_key, age, firstName, lastName)" + + " format ?"); + + pstmt.setString(1, "csv"); + + pstmt.executeUpdate(); + + return null; + } + }, SQLException.class, "Unexpected token: \"?\" (expected: \"[identifier]\""); + } + + /** + * Verifies that COPY command can be called with PreparedStatement.execute(). + * + * @throws SQLException If failed. + */ + public void testPreparedStatementWithExecute() throws SQLException { + PreparedStatement pstmt = conn.prepareStatement(BASIC_SQL_COPY_STMT); + + boolean isRowSet = pstmt.execute(); + + assertFalse(isRowSet); + + checkCacheContents(TBL_NAME, true, 2); + } + + /** + * Verifies that COPY command is rejected by PreparedStatement.executeQuery(). + */ + public void testPreparedStatementWithExecuteQuery() { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + PreparedStatement pstmt = conn.prepareStatement(BASIC_SQL_COPY_STMT); + + pstmt.executeQuery(); + + return null; + } + }, SQLException.class, "The query isn't SELECT query"); + } + + /** + * Checks cache contents for a typical test using SQL SELECT command. + * + * @param tblName Table name to query. + * @param checkLastName Check 'lastName' column (not imported in some tests). + * @param recCnt Number of records to expect. + * @throws SQLException When one of checks has failed. + */ + private void checkCacheContents(String tblName, boolean checkLastName, int recCnt) throws SQLException { + ResultSet rs = stmt.executeQuery("select _key, age, firstName, lastName from " + tblName); + + assert rs != null; + + int cnt = 0; + + while (rs.next()) { + int id = rs.getInt("_key"); + + if (id == 123) { + assertEquals(12, rs.getInt("age")); + assertEquals("FirstName123 MiddleName123", rs.getString("firstName")); + if (checkLastName) + assertEquals("LastName123", rs.getString("lastName")); + } + else if (id == 456) { + assertEquals(45, rs.getInt("age")); + assertEquals("FirstName456", rs.getString("firstName")); + if (checkLastName) + assertEquals("LastName456", rs.getString("lastName")); + } + else + fail("Wrong ID: " + id); + + cnt++; + } + + assertEquals(recCnt, cnt); + } + + /** + * Checks cache contents for a UTF-8 bulk load tests using SQL SELECT command. + * + * @param tblName Table name to query. + * @param checkLastName Check 'lastName' column (not imported in some tests). + * @param recCnt Number of records to expect. + * @throws SQLException When one of checks has failed. + */ + private void checkUtfCacheContents(String tblName, boolean checkLastName, int recCnt) throws SQLException { + ResultSet rs = stmt.executeQuery("select _key, age, firstName, lastName from " + tblName); + + assert rs != null; + + int cnt = 0; + + while (rs.next()) { + int id = rs.getInt("_key"); + + if (id == 123) { + assertEquals(12, rs.getInt("age")); + assertEquals("ÐмÑ123 ÐÑÑеÑÑво123", rs.getString("firstName")); + if (checkLastName) + assertEquals("ФамилиÑ123", rs.getString("lastName")); + } + else if (id == 456) { + assertEquals(45, rs.getInt("age")); + assertEquals("ÐмÑ456", rs.getString("firstName")); + if (checkLastName) + assertEquals("ФамилиÑ456", rs.getString("lastName")); + } + else + fail("Wrong ID: " + id); + + cnt++; + } + + assertEquals(recCnt, cnt); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedNearSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedNearSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedNearSelfTest.java new file mode 100644 index 0000000..887b1d9 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedNearSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** A {@link JdbcThinBulkLoadAbstractSelfTest} for partitioned atomic near-cache mode. */ +public class JdbcThinBulkLoadAtomicPartitionedNearSelfTest extends JdbcThinBulkLoadAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedSelfTest.java new file mode 100644 index 0000000..5581333 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** A {@link JdbcThinBulkLoadAbstractSelfTest} for partitioned atomic mode. */ +public class JdbcThinBulkLoadAtomicPartitionedSelfTest extends JdbcThinBulkLoadAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicReplicatedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicReplicatedSelfTest.java new file mode 100644 index 0000000..c3d69af --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicReplicatedSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** A {@link JdbcThinBulkLoadAbstractSelfTest} for replicated atomic near-cache mode. */ +public class JdbcThinBulkLoadAtomicReplicatedSelfTest extends JdbcThinBulkLoadAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.java new file mode 100644 index 0000000..9336dd1 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** A {@link JdbcThinBulkLoadAbstractSelfTest} for partitioned transactional near-cache mode. */ +public class JdbcThinBulkLoadTransactionalPartitionedNearSelfTest extends JdbcThinBulkLoadAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedSelfTest.java new file mode 100644 index 0000000..d1dea2a --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** A {@link JdbcThinBulkLoadAbstractSelfTest} for partitioned transactional mode. */ +public class JdbcThinBulkLoadTransactionalPartitionedSelfTest extends JdbcThinBulkLoadAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalReplicatedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalReplicatedSelfTest.java new file mode 100644 index 0000000..1c377fa --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalReplicatedSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** A {@link JdbcThinBulkLoadAbstractSelfTest} for replicated transactional mode. */ +public class JdbcThinBulkLoadTransactionalReplicatedSelfTest extends JdbcThinBulkLoadAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java index dbe93a4..539713a 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java @@ -25,7 +25,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/resources/bulkload0.csv ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/resources/bulkload0.csv b/modules/clients/src/test/resources/bulkload0.csv new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/resources/bulkload1.csv ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/resources/bulkload1.csv b/modules/clients/src/test/resources/bulkload1.csv new file mode 100644 index 0000000..596ac32 --- /dev/null +++ b/modules/clients/src/test/resources/bulkload1.csv @@ -0,0 +1 @@ +123,12,"FirstName123 MiddleName123",LastName123 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/resources/bulkload2.csv ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/resources/bulkload2.csv b/modules/clients/src/test/resources/bulkload2.csv new file mode 100644 index 0000000..d398c19 --- /dev/null +++ b/modules/clients/src/test/resources/bulkload2.csv @@ -0,0 +1,2 @@ +123,12,"FirstName123 MiddleName123",LastName123 +456,45,"FirstName456","LastName456" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/resources/bulkload2_utf.csv ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/resources/bulkload2_utf.csv b/modules/clients/src/test/resources/bulkload2_utf.csv new file mode 100644 index 0000000..bdb6489 --- /dev/null +++ b/modules/clients/src/test/resources/bulkload2_utf.csv @@ -0,0 +1,2 @@ +123,12,"ÐмÑ123 ÐÑÑеÑÑво123",ФамилиÑ123 +456,45,"ÐмÑ456","ФамилиÑ456" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/cache/query/BulkLoadContextCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/BulkLoadContextCursor.java b/modules/core/src/main/java/org/apache/ignite/cache/query/BulkLoadContextCursor.java new file mode 100644 index 0000000..b7fdec3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/BulkLoadContextCursor.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters; +import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor; +import org.jetbrains.annotations.NotNull; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * A special FieldsQueryCursor subclass that is used as a sentinel to transfer data from bulk load + * (COPY) command to the JDBC or other client-facing driver: the bulk load batch processor + * and parameters to send to the client. + * */ +public class BulkLoadContextCursor implements FieldsQueryCursor<List<?>> { + /** Bulk load context from SQL command. */ + private final BulkLoadProcessor processor; + + /** Bulk load parameters to send to the client. */ + private final BulkLoadAckClientParameters clientParams; + + /** + * Creates a cursor. + * + * @param processor Bulk load context object to store. + * @param clientParams Parameters to send to client. + */ + public BulkLoadContextCursor(BulkLoadProcessor processor, BulkLoadAckClientParameters clientParams) { + this.processor = processor; + this.clientParams = clientParams; + } + + /** + * Returns a bulk load context. + * + * @return a bulk load context. + */ + public BulkLoadProcessor bulkLoadProcessor() { + return processor; + } + + /** + * Returns the bulk load parameters to send to the client. + * + * @return The bulk load parameters to send to the client. + */ + public BulkLoadAckClientParameters clientParams() { + return clientParams; + } + + /** {@inheritDoc} */ + @Override public List<List<?>> getAll() { + return Collections.singletonList(Arrays.asList(processor, clientParams)); + } + + /** {@inheritDoc} */ + @NotNull @Override public Iterator<List<?>> iterator() { + return getAll().iterator(); + } + + /** {@inheritDoc} */ + @Override public void close() { + // no-op + } + + /** {@inheritDoc} */ + @Override public String getFieldName(int idx) { + if (idx < 0 || idx > 1) + throw new IndexOutOfBoundsException(); + + return idx == 0 ? "processor" : "clientParams"; + } + + /** {@inheritDoc} */ + @Override public int getColumnsCount() { + return 2; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java index d29df93..2020011 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.jdbc.thin; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.InputStream; import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.ResultSet; @@ -25,21 +28,24 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLWarning; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; -import org.apache.ignite.internal.processors.odbc.SqlStateCode; import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; +import org.apache.ignite.internal.processors.odbc.SqlStateCode; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadAckResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteMultipleStatementsResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult; -import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResultInfo; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType; import static java.sql.ResultSet.CONCUR_READ_ONLY; import static java.sql.ResultSet.FETCH_FORWARD; @@ -132,6 +138,9 @@ public class JdbcThinStatement implements Statement { assert res0 != null; + if (res0 instanceof JdbcBulkLoadAckResult) + res0 = sendFile((JdbcBulkLoadAckResult)res0); + if (res0 instanceof JdbcQueryExecuteResult) { JdbcQueryExecuteResult res = (JdbcQueryExecuteResult)res0; @@ -176,6 +185,61 @@ public class JdbcThinStatement implements Statement { assert resultSets.size() > 0 : "At least one results set is expected"; } + /** + * Sends a file to server in batches via multiple {@link JdbcBulkLoadBatchRequest}s. + * + * @param cmdRes Result of invoking COPY command: contains server-parsed + * bulk load parameters, such as file name and batch size. + */ + private JdbcResult sendFile(JdbcBulkLoadAckResult cmdRes) throws SQLException { + String fileName = cmdRes.params().localFileName(); + int batchSize = cmdRes.params().batchSize(); + + int batchNum = 0; + + try { + try (InputStream input = new BufferedInputStream(new FileInputStream(fileName))) { + byte[] buf = new byte[batchSize]; + + int readBytes; + while ((readBytes = input.read(buf)) != -1) { + if (readBytes == 0) + continue; + + JdbcResult res = conn.sendRequest(new JdbcBulkLoadBatchRequest( + cmdRes.queryId(), + batchNum++, + JdbcBulkLoadBatchRequest.CMD_CONTINUE, + readBytes == buf.length ? buf : Arrays.copyOf(buf, readBytes))); + + if (!(res instanceof JdbcQueryExecuteResult)) + throw new SQLException("Unknown response sent by the server: " + res); + } + + return conn.sendRequest(new JdbcBulkLoadBatchRequest( + cmdRes.queryId(), + batchNum++, + JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF)); + } + } + catch (Exception e) { + try { + conn.sendRequest(new JdbcBulkLoadBatchRequest( + cmdRes.queryId(), + batchNum, + JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR)); + } + catch (SQLException e1) { + throw new SQLException("Cannot send finalization request: " + e1.getMessage(), e); + } + + if (e instanceof SQLException) + throw (SQLException) e; + else + throw new SQLException("Failed to read file: '" + fileName + "'", SqlStateCode.INTERNAL_ERROR, e); + } + } + /** {@inheritDoc} */ @Override public int executeUpdate(String sql) throws SQLException { execute0(JdbcStatementType.UPDATE_STMT_TYPE, sql, null); http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java index aa9f009..07034f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java @@ -32,6 +32,8 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteJdbcDriver; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.query.BulkLoadContextCursor; +import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.IgniteKernal; @@ -168,7 +170,15 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTaskResult> { qry.setLazy(lazy()); qry.setSchema(schemaName); - QueryCursorImpl<List<?>> qryCursor = (QueryCursorImpl<List<?>>)cache.withKeepBinary().query(qry); + FieldsQueryCursor<List<?>> fldQryCursor = cache.withKeepBinary().query(qry); + + if (fldQryCursor instanceof BulkLoadContextCursor) { + fldQryCursor.close(); + + throw new SQLException("COPY command is currently supported only in thin JDBC driver."); + } + + QueryCursorImpl<List<?>> qryCursor = (QueryCursorImpl<List<?>>)fldQryCursor; if (isQry == null) isQry = qryCursor.isQuery(); http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadAckClientParameters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadAckClientParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadAckClientParameters.java new file mode 100644 index 0000000..119d9f9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadAckClientParameters.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.bulkload; + +import org.jetbrains.annotations.NotNull; + +/** + * Bulk load parameters, which are parsed from SQL command and sent from server to client. + */ +public class BulkLoadAckClientParameters { + /** Minimum batch size. */ + public static final int MIN_BATCH_SIZE = 1; + + /** + * Maximum batch size. Note that the batch is wrapped to transport objects and the overall packet should fit + * into a Java array. 512 has been chosen arbitrarily. + */ + public static final int MAX_BATCH_SIZE = Integer.MAX_VALUE - 512; + + /** Size of a file batch for COPY command. */ + public static final int DEFAULT_BATCH_SIZE = 4 * 1024 * 1024; + + /** Local name of the file to send to server */ + @NotNull private final String locFileName; + + /** File batch size in bytes. */ + private final int batchSize; + + /** + * Creates a bulk load parameters. + * + * @param locFileName File name to send from client to server. + * @param batchSize Batch size (Number of bytes in a portion of a file to send in one JDBC request/response). + */ + public BulkLoadAckClientParameters(@NotNull String locFileName, int batchSize) { + this.locFileName = locFileName; + this.batchSize = batchSize; + } + + /** + * Returns the local name of file to send. + * + * @return The local name of file to send. + */ + @NotNull public String localFileName() { + return locFileName; + } + + /** + * Returns the batch size. + * + * @return The batch size. + */ + public int batchSize() { + return batchSize; + } + + /** + * Checks if batch size value is valid. + * + * @param sz The batch size to check. + * @throws IllegalArgumentException if batch size is invalid. + */ + public static boolean isValidBatchSize(int sz) { + return sz >= MIN_BATCH_SIZE && sz <= MAX_BATCH_SIZE; + } + + /** + * Creates proper batch size error message if {@link #isValidBatchSize(int)} check has failed. + * + * @param sz The batch size. + * @return The string with the error message. + */ + public static String batchSizeErrorMsg(int sz) { + return "Batch size should be within [" + MIN_BATCH_SIZE + ".." + MAX_BATCH_SIZE + "]: " + sz; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCacheWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCacheWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCacheWriter.java new file mode 100644 index 0000000..90714c8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCacheWriter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.bulkload; + +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteInClosure; + +/** A proxy, which stores given key+value pair to a cache. */ +public abstract class BulkLoadCacheWriter implements IgniteInClosure<IgniteBiTuple<?, ?>>, AutoCloseable { + /** + * Returns number of entry updates made by the writer. + * + * @return The number of cache entry updates. + */ + public abstract long updateCnt(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java new file mode 100644 index 0000000..6f5e91e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.bulkload; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.regex.Pattern; + +/** A placeholder for bulk load CSV format parser options. */ +public class BulkLoadCsvFormat extends BulkLoadFormat { + + /** Line separator pattern. */ + @NotNull public static final Pattern DEFAULT_LINE_SEPARATOR = Pattern.compile("[\r\n]+"); + + /** Field separator pattern. */ + @NotNull public static final Pattern DEFAULT_FIELD_SEPARATOR = Pattern.compile(","); + + /** Quote characters */ + @NotNull public static final String DEFAULT_QUOTE_CHARS = "\""; + + /** Default escape sequence start characters. */ + @Nullable public static final String DEFAULT_ESCAPE_CHARS = null; + + /** Line comment start pattern. */ + @Nullable public static final Pattern DEFAULT_COMMENT_CHARS = null; + + /** Format name. */ + public static final String NAME = "CSV"; + + /** Line separator pattern. */ + @Nullable private Pattern lineSeparator; + + /** Field separator pattern. */ + @Nullable private Pattern fieldSeparator; + + /** Set of quote characters. */ + @Nullable private String quoteChars; + + /** Line comment start pattern. */ + @Nullable private Pattern commentChars; + + /** Set of escape start characters. */ + @Nullable private String escapeChars; + + /** + * Returns the name of the format. + * + * @return The name of the format. + */ + @Override public String name() { + return NAME; + } + + /** + * Returns the line separator pattern. + * + * @return The line separator pattern. + */ + @Nullable public Pattern lineSeparator() { + return lineSeparator; + } + + /** + * Sets the line separator pattern. + * + * @param lineSeparator The line separator pattern. + */ + public void lineSeparator(@Nullable Pattern lineSeparator) { + this.lineSeparator = lineSeparator; + } + + /** + * Returns the field separator pattern. + * + * @return The field separator pattern. + */ + @Nullable public Pattern fieldSeparator() { + return fieldSeparator; + } + + /** + * Sets the field separator pattern. + * + * @param fieldSeparator The field separator pattern. + */ + public void fieldSeparator(@Nullable Pattern fieldSeparator) { + this.fieldSeparator = fieldSeparator; + } + + /** + * Returns the quote characters. + * + * @return The quote characters. + */ + @Nullable public String quoteChars() { + return quoteChars; + } + + /** + * Sets the quote characters. + * + * @param quoteChars The quote characters. + */ + public void quoteChars(@Nullable String quoteChars) { + this.quoteChars = quoteChars; + } + + /** + * Returns the line comment start pattern. + * + * @return The line comment start pattern. + */ + @Nullable public Pattern commentChars() { + return commentChars; + } + + /** + * Sets the line comment start pattern. + * + * @param commentChars The line comment start pattern. + */ + public void commentChars(@Nullable Pattern commentChars) { + this.commentChars = commentChars; + } + + /** + * Returns the escape characters. + * + * @return The escape characters. + */ + @Nullable public String escapeChars() { + return escapeChars; + } + + /** + * Sets the escape characters. + * + * @param escapeChars The escape characters. + */ + public void escapeChars(@Nullable String escapeChars) { + this.escapeChars = escapeChars; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java new file mode 100644 index 0000000..0511596 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.bulkload; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.bulkload.pipeline.CharsetDecoderBlock; +import org.apache.ignite.internal.processors.bulkload.pipeline.CsvLineProcessorBlock; +import org.apache.ignite.internal.processors.bulkload.pipeline.PipelineBlock; +import org.apache.ignite.internal.processors.bulkload.pipeline.StrListAppenderBlock; +import org.apache.ignite.internal.processors.bulkload.pipeline.LineSplitterBlock; + +import java.util.LinkedList; +import java.util.List; + +/** CSV parser for COPY command. */ +public class BulkLoadCsvParser extends BulkLoadParser { + /** Processing pipeline input block: a decoder for the input stream of bytes */ + private final PipelineBlock<byte[], char[]> inputBlock; + + /** A record collecting block that appends its input to {@code List<String>}. */ + private final StrListAppenderBlock collectorBlock; + + /** + * Creates bulk load CSV parser. + * + * @param format Format options (parsed from COPY command). + */ + public BulkLoadCsvParser(BulkLoadCsvFormat format) { + inputBlock = new CharsetDecoderBlock(BulkLoadFormat.DEFAULT_INPUT_CHARSET); + + collectorBlock = new StrListAppenderBlock(); + + // Handling of the other options is to be implemented in IGNITE-7537. + inputBlock.append(new LineSplitterBlock(format.lineSeparator())) + .append(new CsvLineProcessorBlock(format.fieldSeparator(), format.quoteChars())) + .append(collectorBlock); + } + + /** {@inheritDoc} */ + @Override protected Iterable<List<Object>> parseBatch(byte[] batchData, boolean isLastBatch) + throws IgniteCheckedException { + List<List<Object>> res = new LinkedList<>(); + + collectorBlock.output(res); + + inputBlock.accept(batchData, isLastBatch); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadFormat.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadFormat.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadFormat.java new file mode 100644 index 0000000..cff93c5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadFormat.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.bulkload; + +import java.nio.charset.Charset; + +/** A superclass and a factory for bulk load format options. */ +public abstract class BulkLoadFormat { + /** The default input charset. */ + public static final Charset DEFAULT_INPUT_CHARSET = Charset.forName("UTF-8"); + + /** + * Returns the format name. + * + * @return The format name. + */ + public abstract String name(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadParser.java new file mode 100644 index 0000000..252e87b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadParser.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.bulkload; + +import org.apache.ignite.IgniteCheckedException; + +import java.util.List; + +/** + * Bulk load file format parser superclass + factory of known formats. + * + * <p>The parser processes a batch of input data and return a list of records. + * + * <p>The parser uses corresponding options from {@link BulkLoadFormat} subclass. + */ +public abstract class BulkLoadParser { + /** + * Parses a batch of input data and returns a list of records parsed + * (in most cases this is a list of strings). + * + * <p>Note that conversion between parsed and database table type is done by the other + * object (see {@link BulkLoadProcessor#dataConverter}) by the request processing code. + * This method is not obliged to do this conversion. + * + * @param batchData Data from the current batch. + * @param isLastBatch true if this is the last batch. + * @return The list of records. + * @throws IgniteCheckedException If any processing error occurs. + */ + protected abstract Iterable<List<Object>> parseBatch(byte[] batchData, boolean isLastBatch) + throws IgniteCheckedException; + + /** + * Creates a parser for a given format options. + * + * @param format The input format object. + * @return The parser. + * @throws IllegalArgumentException if the format is not known to the factory. + */ + public static BulkLoadParser createParser(BulkLoadFormat format) { + if (format instanceof BulkLoadCsvFormat) + return new BulkLoadCsvParser((BulkLoadCsvFormat)format); + + throw new IllegalArgumentException("Internal error: format is not defined"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java new file mode 100644 index 0000000..ccf3e25 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.bulkload; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteIllegalStateException; +import org.apache.ignite.internal.util.lang.IgniteClosureX; +import org.apache.ignite.lang.IgniteBiTuple; + +import java.util.List; + +/** + * Bulk load (COPY) command processor used on server to keep various context data and process portions of input + * received from the client side. + */ +public class BulkLoadProcessor implements AutoCloseable { + /** Parser of the input bytes. */ + private final BulkLoadParser inputParser; + + /** + * Converter, which transforms the list of strings parsed from the input stream to the key+value entry to add to + * the cache. + */ + private final IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter; + + /** Streamer that puts actual key/value into the cache. */ + private final BulkLoadCacheWriter outputStreamer; + + /** Becomes true after {@link #close()} method is called. */ + private boolean isClosed; + + /** + * Creates bulk load processor. + * + * @param inputParser Parser of the input bytes. + * @param dataConverter Converter, which transforms the list of strings parsed from the input stream to the + * key+value entry to add to the cache. + * @param outputStreamer Streamer that puts actual key/value into the cache. + */ + public BulkLoadProcessor(BulkLoadParser inputParser, IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter, + BulkLoadCacheWriter outputStreamer) { + this.inputParser = inputParser; + this.dataConverter = dataConverter; + this.outputStreamer = outputStreamer; + isClosed = false; + } + + /** + * Returns the streamer that puts actual key/value into the cache. + * + * @return Streamer that puts actual key/value into the cache. + */ + public BulkLoadCacheWriter outputStreamer() { + return outputStreamer; + } + + /** + * Processes the incoming batch and writes data to the cache by calling the data converter and output streamer. + * + * @param batchData Data from the current batch. + * @param isLastBatch true if this is the last batch. + * @throws IgniteIllegalStateException when called after {@link #close()}. + */ + public void processBatch(byte[] batchData, boolean isLastBatch) throws IgniteCheckedException { + if (isClosed) + throw new IgniteIllegalStateException("Attempt to process a batch on a closed BulkLoadProcessor"); + + Iterable<List<Object>> inputRecords = inputParser.parseBatch(batchData, isLastBatch); + + for (List<Object> record : inputRecords) { + IgniteBiTuple<?, ?> kv = dataConverter.apply(record); + + outputStreamer.apply(kv); + } + } + + /** + * Aborts processing and closes the underlying objects ({@link IgniteDataStreamer}). + */ + @Override public void close() throws Exception { + if (isClosed) + return; + + isClosed = true; + + outputStreamer.close(); + } +}