Repository: ignite Updated Branches: refs/heads/ignite-1753 aee462759 -> 8e26c84d1
IGNITE-1753 WIP on POJO store refactoring + fixed first POJO store test. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e26c84d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e26c84d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e26c84d Branch: refs/heads/ignite-1753 Commit: 8e26c84d1a5739ed97c90c28e65bd075fcab1ade Parents: aee4627 Author: AKuznetsov <akuznet...@gridgain.com> Authored: Fri Oct 23 23:24:32 2015 +0700 Committer: AKuznetsov <akuznet...@gridgain.com> Committed: Fri Oct 23 23:24:32 2015 +0700 ---------------------------------------------------------------------- .../cache/store/jdbc/CacheJdbcPojoStore.java | 30 +- .../jdbc/CacheJdbcPojoStoreConfiguration.java | 2 +- .../store/jdbc/CacheJdbcPojoStoreFactory.java | 1 + .../store/jdbc/CacheJdbcPojoStoreType.java | 14 +- .../CacheJdbcPojoStoreCompatibilityTest.java | 407 +++++++++++++++++++ .../store/jdbc/CacheJdbcPojoStoreTest.java | 162 +++++--- 6 files changed, 527 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8e26c84d/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java index 044f79d..a238f6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java @@ -246,7 +246,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @return Key type id. * @throws CacheException If failed to get type key id from object. */ - protected Object keyTypeId(String type) throws CacheException { + private Object keyTypeId(String type) throws CacheException { try { return Class.forName(type); } @@ -262,7 +262,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @param types Collection of types. * @throws CacheException If failed to prepare internal builders for types. */ - protected void prepareBuilders(@Nullable String cacheName, Collection<CacheJdbcPojoStoreType> types) + private void prepareBuilders(@Nullable String cacheName, Collection<CacheJdbcPojoStoreType> types) throws CacheException { Map<String, PojoMethodsCache> typeMethods = U.newHashMap(types.size() * 2); @@ -287,7 +287,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @return The resolved dialect. * @throws CacheException Indicates problems accessing the metadata. */ - protected JdbcDialect resolveDialect() throws CacheException { + private JdbcDialect resolveDialect() throws CacheException { Connection conn = null; String dbProductName = null; @@ -361,7 +361,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @return Connection. * @throws SQLException In case of error. */ - protected Connection connection() throws SQLException { + private Connection connection() throws SQLException { CacheStoreSession ses = session(); if (ses.transaction() != null) { @@ -388,7 +388,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * * @param conn Connection to close. */ - protected void closeConnection(@Nullable Connection conn) { + private void closeConnection(@Nullable Connection conn) { CacheStoreSession ses = session(); // Close connection right away if there is no transaction. @@ -402,7 +402,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @param conn Allocated connection. * @param st Created statement, */ - protected void end(@Nullable Connection conn, @Nullable Statement st) { + private void end(@Nullable Connection conn, @Nullable Statement st) { U.closeQuiet(st); closeConnection(conn); @@ -448,7 +448,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @return Value in column. * @throws SQLException If a database access error occurs or this method is called. */ - protected Object getColumnValue(ResultSet rs, int colIdx, Class<?> type) throws SQLException { + private Object getColumnValue(ResultSet rs, int colIdx, Class<?> type) throws SQLException { Object val = rs.getObject(colIdx); if (val == null) @@ -589,7 +589,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @param cls Class. * @return {@code True} if object is a simple type. */ - protected static boolean simpleType(Class<?> cls) { + private static boolean simpleType(Class<?> cls) { return (Number.class.isAssignableFrom(cls) || String.class.isAssignableFrom(cls) || java.util.Date.class.isAssignableFrom(cls) || Boolean.class.isAssignableFrom(cls) || UUID.class.isAssignableFrom(cls)); @@ -1394,7 +1394,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @param fieldVal Field value. * @throws CacheException If failed to set statement parameter. */ - protected void fillParameter(PreparedStatement stmt, int i, CacheJdbcPojoStoreTypeField field, @Nullable Object fieldVal) + private void fillParameter(PreparedStatement stmt, int i, CacheJdbcPojoStoreTypeField field, @Nullable Object fieldVal) throws CacheException { try { if (fieldVal != null) { @@ -1430,7 +1430,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @return Next index for parameters. * @throws CacheException If failed to set statement parameters. */ - protected int fillKeyParameters(PreparedStatement stmt, int idx, EntryMapping em, + private int fillKeyParameters(PreparedStatement stmt, int idx, EntryMapping em, Object key) throws CacheException { for (CacheJdbcPojoStoreTypeField field : em.keyColumns()) { Object fieldVal = extractParameter(em.cacheName, em.keyType(), field.getJavaName(), key); @@ -1448,7 +1448,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @return Next index for parameters. * @throws CacheException If failed to set statement parameters. */ - protected int fillKeyParameters(PreparedStatement stmt, EntryMapping m, Object key) throws CacheException { + private int fillKeyParameters(PreparedStatement stmt, EntryMapping m, Object key) throws CacheException { return fillKeyParameters(stmt, 1, m, key); } @@ -1460,7 +1460,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @return Next index for parameters. * @throws CacheException If failed to set statement parameters. */ - protected int fillValueParameters(PreparedStatement stmt, int idx, EntryMapping em, Object val) + private int fillValueParameters(PreparedStatement stmt, int idx, EntryMapping em, Object val) throws CacheWriterException { for (CacheJdbcPojoStoreTypeField field : em.uniqValFields) { Object fieldVal = extractParameter(em.cacheName, em.valueType(), field.getJavaName(), val); @@ -1618,7 +1618,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar */ private static class PojoMethodsCache { /** POJO class. */ - protected final Class<?> cls; + private final Class<?> cls; /** Constructor for POJO object. */ private Constructor ctor; @@ -1836,7 +1836,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @param keyCnt Key count. * @return Load query statement text. */ - protected String loadQuery(int keyCnt) { + private String loadQuery(int keyCnt) { assert keyCnt <= maxKeysPerStmt; if (keyCnt == maxKeysPerStmt) @@ -1855,7 +1855,7 @@ public class CacheJdbcPojoStore<K, V> implements CacheStore<K, V>, LifecycleAwar * @param appendUpperBound Need add upper bound for range. * @return Query with range. */ - protected String loadCacheRangeQuery(boolean appendLowerBound, boolean appendUpperBound) { + private String loadCacheRangeQuery(boolean appendLowerBound, boolean appendUpperBound) { return dialect.loadCacheRangeQuery(fullTblName, keyCols, cols, appendLowerBound, appendUpperBound); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8e26c84d/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java index 7322279..b333bc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java @@ -44,7 +44,7 @@ public class CacheJdbcPojoStoreConfiguration implements Serializable { private String dataSrcBean; /** Database dialect. */ - protected JdbcDialect dialect; + private JdbcDialect dialect; /** Max workers thread count. These threads are responsible for load cache. */ private int maxPoolSz = Runtime.getRuntime().availableProcessors(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8e26c84d/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java index 003cfdb..6d8f8af 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java @@ -145,6 +145,7 @@ public class CacheJdbcPojoStoreFactory<K, V> implements Factory<CacheJdbcPojoSto * @return {@code This} for chaining. * @see CacheJdbcPojoStore#setDataSource(DataSource) */ + @Deprecated public CacheJdbcPojoStoreFactory<K, V> setDataSourceBean(String dataSrcBean) { this.dataSrcBean = dataSrcBean; http://git-wip-us.apache.org/repos/asf/ignite/blob/8e26c84d/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java index 85d5ed5..e755165 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java @@ -81,21 +81,21 @@ public class CacheJdbcPojoStoreType implements Serializable { } /** - * Gets database schema name. + * Gets associated cache name. * - * @return Schema name. + * @return Сache name. */ public String getCacheName() { - return dbSchema; + return cacheName; } /** - * Sets database schema name. + * Sets associated cache name. * - * @param dbSchema Schema name. + * @param cacheName Cache name. */ - public CacheJdbcPojoStoreType setCacheName(String dbSchema) { - this.dbSchema = dbSchema; + public CacheJdbcPojoStoreType setCacheName(String cacheName) { + this.cacheName = cacheName; return this; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8e26c84d/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreCompatibilityTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreCompatibilityTest.java new file mode 100644 index 0000000..021a89d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreCompatibilityTest.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import org.apache.ignite.*; +import org.apache.ignite.cache.store.jdbc.dialect.*; +import org.apache.ignite.cache.store.jdbc.model.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.cache.*; +import org.h2.jdbcx.*; +import org.springframework.beans.*; +import org.springframework.beans.factory.xml.*; +import org.springframework.context.support.*; +import org.springframework.core.io.*; + +import javax.cache.integration.*; +import java.net.*; +import java.sql.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Class for {@code PojoCacheStore} tests. + */ +public class CacheJdbcPojoStoreCompatibilityTest extends GridAbstractCacheStoreSelfTest<CacheJdbcPojoStore<Object, Object>> { + /** DB connection URL. */ + private static final String DFLT_CONN_URL = "jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1"; + + /** Default config with mapping. */ + private static final String DFLT_MAPPING_CONFIG = "modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml"; + + /** Organization count. */ + protected static final int ORGANIZATION_CNT = 1000; + + /** Person count. */ + protected static final int PERSON_CNT = 100000; + + /** + * @throws Exception If failed. + */ + public CacheJdbcPojoStoreCompatibilityTest() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected CacheJdbcPojoStore<Object, Object> store() { + CacheJdbcPojoStore<Object, Object> store = new CacheJdbcPojoStore<>(); + +// PGPoolingDataSource ds = new PGPoolingDataSource(); +// ds.setUser("postgres"); +// ds.setPassword("postgres"); +// ds.setServerName("ip"); +// ds.setDatabaseName("postgres"); +// store.setDataSource(ds); + +// MysqlDataSource ds = new MysqlDataSource(); +// ds.setURL("jdbc:mysql://ip:port/dbname"); +// ds.setUser("mysql"); +// ds.setPassword("mysql"); + + store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", "")); + + URL cfgUrl; + + try { + cfgUrl = new URL(DFLT_MAPPING_CONFIG); + } + catch (MalformedURLException ignore) { + cfgUrl = U.resolveIgniteUrl(DFLT_MAPPING_CONFIG); + } + + if (cfgUrl == null) + throw new IgniteException("Failed to resolve metadata path: " + DFLT_MAPPING_CONFIG); + + try { + GenericApplicationContext springCtx = new GenericApplicationContext(); + + new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(new UrlResource(cfgUrl)); + + springCtx.refresh(); + + Collection<CacheJdbcPojoStoreType> typeMeta = springCtx.getBeansOfType(CacheJdbcPojoStoreType.class).values(); + + Map<Integer, Map<Object, CacheJdbcPojoStore.EntryMapping>> cacheMappings = new HashMap<>(); +// TODO-1753 fix tests +// JdbcDialect dialect = store.resolveDialect(); +// +// GridTestUtils.setFieldValue(store, CacheJdbcPojoStore.class, "dialect", dialect); +// +// Map<Object, CacheJdbcPojoStore.EntryMapping> entryMappings = U.newHashMap(typeMeta.size()); +// +// for (CacheJdbcPojoStoreType type : typeMeta) +// entryMappings.put(store.keyTypeId(type.getKeyType()), +// new CacheJdbcPojoStore.EntryMapping(null, dialect, type)); +// +// store.prepareBuilders(null, typeMeta); +// +// cacheMappings.put(null, entryMappings); +// +// GridTestUtils.setFieldValue(store, CacheJdbcPojoStore.class, "cacheMappings", cacheMappings); + } + catch (BeansException e) { + if (X.hasCause(e, ClassNotFoundException.class)) + throw new IgniteException("Failed to instantiate Spring XML application context " + + "(make sure all classes used in Spring configuration are present at CLASSPATH) " + + "[springUrl=" + cfgUrl + ']', e); + else + throw new IgniteException("Failed to instantiate Spring XML application context [springUrl=" + + cfgUrl + ", err=" + e.getMessage() + ']', e); + } + + return store; + } + + /** + * @param store Store. + * @throws Exception If failed. + */ + @Override protected void inject(CacheJdbcPojoStore<Object, Object> store) throws Exception { + getTestResources().inject(store); + + GridTestUtils.setFieldValue(store, CacheJdbcPojoStore.class, "ses", ses); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Connection conn = store.openConnection(false); + + Statement stmt = conn.createStatement(); + + try { + stmt.executeUpdate("delete from String_Entries"); + } + catch (SQLException ignore) { + // No-op. + } + + try { + stmt.executeUpdate("delete from UUID_Entries"); + } + catch (SQLException ignore) { + // No-op. + } + + try { + stmt.executeUpdate("delete from Organization"); + } + catch (SQLException ignore) { + // No-op. + } + + try { + stmt.executeUpdate("delete from Person"); + } + catch (SQLException ignore) { + // No-op. + } + + try { + stmt.executeUpdate("delete from Timestamp_Entries"); + } + catch (SQLException ignore) { + // No-op. + } + + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + + "String_Entries (key varchar(100) not null, val varchar(100), PRIMARY KEY(key))"); + + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + + "UUID_Entries (key binary(16) not null, val binary(16), PRIMARY KEY(key))"); + + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + + "Timestamp_Entries (key timestamp not null, val integer, PRIMARY KEY(key))"); + + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + + "Organization (id integer not null, name varchar(50), city varchar(50), PRIMARY KEY(id))"); + + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + + "Person (id integer not null, org_id integer, name varchar(50), PRIMARY KEY(id))"); + + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + + "Person_Complex (id integer not null, org_id integer not null, city_id integer not null, " + + "name varchar(50), salary integer, PRIMARY KEY(id))"); + + conn.commit(); + + U.closeQuiet(stmt); + + U.closeQuiet(conn); + + super.beforeTest(); + } + + + /** + * @throws Exception If failed. + */ + public void testLoadCache() throws Exception { + Connection conn = store.openConnection(false); + + PreparedStatement orgStmt = conn.prepareStatement("INSERT INTO Organization(id, name, city) VALUES (?, ?, ?)"); + + for (int i = 0; i < ORGANIZATION_CNT; i++) { + orgStmt.setInt(1, i); + orgStmt.setString(2, "name" + i); + orgStmt.setString(3, "city" + i % 10); + + orgStmt.addBatch(); + } + + orgStmt.executeBatch(); + + U.closeQuiet(orgStmt); + + conn.commit(); + + PreparedStatement prnStmt = conn.prepareStatement("INSERT INTO Person(id, org_id, name) VALUES (?, ?, ?)"); + + for (int i = 0; i < PERSON_CNT; i++) { + prnStmt.setInt(1, i); + prnStmt.setInt(2, i % 100); + prnStmt.setString(3, "name" + i); + + prnStmt.addBatch(); + } + + prnStmt.executeBatch(); + + conn.commit(); + + U.closeQuiet(prnStmt); + + PreparedStatement prnComplexStmt = conn.prepareStatement("INSERT INTO Person_Complex(id, org_id, city_id, name, salary) VALUES (?, ?, ?, ?, ?)"); + + for (int i = 0; i < PERSON_CNT; i++) { + prnComplexStmt.setInt(1, i); + prnComplexStmt.setInt(2, i % 500); + prnComplexStmt.setInt(3, i % 100); + prnComplexStmt.setString(4, "name" + i); + + if (i > 0) + prnComplexStmt.setInt(5, 1000 + i * 500); + else // Add person with null salary + prnComplexStmt.setNull(5, java.sql.Types.INTEGER); + + prnComplexStmt.addBatch(); + } + + prnComplexStmt.executeBatch(); + + U.closeQuiet(prnComplexStmt); + + conn.commit(); + + U.closeQuiet(conn); + + final Collection<OrganizationKey> orgKeys = new ConcurrentLinkedQueue<>(); + final Collection<PersonKey> prnKeys = new ConcurrentLinkedQueue<>(); + final Collection<PersonComplexKey> prnComplexKeys = new ConcurrentLinkedQueue<>(); + + IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() { + @Override public void apply(Object k, Object v) { + if (k instanceof OrganizationKey && v instanceof Organization) + orgKeys.add((OrganizationKey)k); + else if (k instanceof PersonKey && v instanceof Person) + prnKeys.add((PersonKey)k); + else if (k instanceof PersonComplexKey && v instanceof Person) { + PersonComplexKey key = (PersonComplexKey)k; + + Person val = (Person)v; + + assert key.getId() == val.getId(); + assert key.getOrgId() == val.getOrgId(); + assertEquals("name" + key.getId(), val.getName()); + + prnComplexKeys.add((PersonComplexKey)k); + } + } + }; + + store.loadCache(c); + + assertEquals(ORGANIZATION_CNT, orgKeys.size()); + assertEquals(PERSON_CNT, prnKeys.size()); + assertEquals(PERSON_CNT, prnComplexKeys.size()); + + Collection<OrganizationKey> tmpOrgKeys = new ArrayList<>(orgKeys); + Collection<PersonKey> tmpPrnKeys = new ArrayList<>(prnKeys); + Collection<PersonComplexKey> tmpPrnComplexKeys = new ArrayList<>(prnComplexKeys); + + orgKeys.clear(); + prnKeys.clear(); + prnComplexKeys.clear(); + + store.loadCache(c, OrganizationKey.class.getName(), "SELECT name, city, id FROM ORGANIZATION", + PersonKey.class.getName(), "SELECT org_id, id, name FROM Person WHERE id < 1000"); + + assertEquals(ORGANIZATION_CNT, orgKeys.size()); + assertEquals(1000, prnKeys.size()); + assertEquals(0, prnComplexKeys.size()); + + store.deleteAll(tmpOrgKeys); + store.deleteAll(tmpPrnKeys); + store.deleteAll(tmpPrnComplexKeys); + + orgKeys.clear(); + prnKeys.clear(); + prnComplexKeys.clear(); + + store.loadCache(c); + + assertTrue(orgKeys.isEmpty()); + assertTrue(prnKeys.isEmpty()); + assertTrue(prnComplexKeys.isEmpty()); + } + + /** + * @throws Exception If failed. + */ + public void testWriteRetry() throws Exception { + // Special dialect that will skip updates, to test write retry. + BasicJdbcDialect dialect = new BasicJdbcDialect() { + /** {@inheritDoc} */ + @Override public String updateQuery(String tblName, Collection<String> keyCols, Iterable<String> valCols) { + return super.updateQuery(tblName, keyCols, valCols) + " AND 1 = 0"; + } + }; + + store.setDialect(dialect); + + Map<String, Map<Object, CacheJdbcPojoStore.EntryMapping>> cacheMappings = + GridTestUtils.getFieldValue(store, CacheJdbcPojoStore.class, "cacheMappings"); + + CacheJdbcPojoStore.EntryMapping em = cacheMappings.get(null).get(OrganizationKey.class); + + CacheJdbcPojoStoreType typeMeta = GridTestUtils.getFieldValue(em, CacheJdbcPojoStore.EntryMapping.class, "typeMeta"); + + cacheMappings.get(null).put(OrganizationKey.class, + new CacheJdbcPojoStore.EntryMapping(null, dialect, typeMeta)); + + Connection conn = store.openConnection(false); + + PreparedStatement orgStmt = conn.prepareStatement("INSERT INTO Organization(id, name, city) VALUES (?, ?, ?)"); + + orgStmt.setInt(1, 1); + orgStmt.setString(2, "name" + 1); + orgStmt.setString(3, "city" + 1); + + orgStmt.executeUpdate(); + + U.closeQuiet(orgStmt); + + conn.commit(); + + OrganizationKey k1 = new OrganizationKey(1); + Organization v1 = new Organization(1, "Name1", "City1"); + + ses.newSession(null); + + try { + store.write(new CacheEntryImpl<>(k1, v1)); + } + catch (CacheWriterException e) { + if (!e.getMessage().startsWith("Failed insert entry in database, violate a unique index or primary key") || + e.getSuppressed().length != 2) + throw e; + } + } + + /** + * @throws Exception If failed. + */ + public void testTimestamp() throws Exception { + Timestamp k = new Timestamp(System.currentTimeMillis()); + + ses.newSession(null); + + Integer v = 5; + + store.write(new CacheEntryImpl<>(k, v)); + + assertEquals(v, store.load(k)); + + store.delete(k); + + assertNull(store.load(k)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e26c84d/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java index 746059d..af6f541 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java @@ -17,22 +17,21 @@ package org.apache.ignite.cache.store.jdbc; -import java.net.MalformedURLException; -import java.net.URL; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; +import java.sql.Types; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import javax.cache.integration.CacheWriterException; -import org.apache.ignite.IgniteException; + import org.apache.ignite.cache.store.jdbc.dialect.BasicJdbcDialect; -import org.apache.ignite.cache.store.jdbc.dialect.JdbcDialect; +import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect; import org.apache.ignite.cache.store.jdbc.model.Organization; import org.apache.ignite.cache.store.jdbc.model.OrganizationKey; import org.apache.ignite.cache.store.jdbc.model.Person; @@ -40,17 +39,11 @@ import org.apache.ignite.cache.store.jdbc.model.PersonComplexKey; import org.apache.ignite.cache.store.jdbc.model.PersonKey; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.cache.GridAbstractCacheStoreSelfTest; import org.h2.jdbcx.JdbcConnectionPool; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.xml.XmlBeanDefinitionReader; -import org.springframework.context.support.GenericApplicationContext; -import org.springframework.core.io.UrlResource; /** * Class for {@code PojoCacheStore} tests. @@ -77,7 +70,93 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache /** {@inheritDoc} */ @Override protected CacheJdbcPojoStore<Object, Object> store() { - CacheJdbcPojoStore<Object, Object> store = new CacheJdbcPojoStore<>(); + CacheJdbcPojoStoreFactory<Object, Object> storeFactory = new CacheJdbcPojoStoreFactory<>(); + + CacheJdbcPojoStoreConfiguration storeCfg = new CacheJdbcPojoStoreConfiguration(); + storeCfg.setDialect(new H2Dialect()); + + CacheJdbcPojoStoreType[] storeTypes = new CacheJdbcPojoStoreType[6]; + + storeTypes[0] = new CacheJdbcPojoStoreType(); + storeTypes[0].setDatabaseSchema("PUBLIC"); + storeTypes[0].setDatabaseTable("ORGANIZATION"); + storeTypes[0].setKeyType("org.apache.ignite.cache.store.jdbc.model.OrganizationKey"); + storeTypes[0].setKeyFields(new CacheJdbcPojoStoreTypeField[1]); + storeTypes[0].getKeyFields()[0] = new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"); + + storeTypes[0].setValueType("org.apache.ignite.cache.store.jdbc.model.Organization"); + storeTypes[0].setValueFields(new CacheJdbcPojoStoreTypeField[3]); + storeTypes[0].getValueFields()[0] = new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"); + storeTypes[0].getValueFields()[1] = new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name"); + storeTypes[0].getValueFields()[2] = new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "CITY", String.class, "city"); + + storeTypes[1] = new CacheJdbcPojoStoreType(); + storeTypes[1].setDatabaseSchema("PUBLIC"); + storeTypes[1].setDatabaseTable("PERSON"); + storeTypes[1].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonKey"); + storeTypes[1].setKeyFields(new CacheJdbcPojoStoreTypeField[1]); + storeTypes[1].getKeyFields()[0] = new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"); + + storeTypes[1].setValueType("org.apache.ignite.cache.store.jdbc.model.Person"); + storeTypes[1].setValueFields(new CacheJdbcPojoStoreTypeField[3]); + storeTypes[1].getValueFields()[0] = new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"); + storeTypes[1].getValueFields()[1] = new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"); + storeTypes[1].getValueFields()[2] = new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name"); + + storeTypes[2] = new CacheJdbcPojoStoreType(); + storeTypes[2].setDatabaseSchema("PUBLIC"); + storeTypes[2].setDatabaseTable("PERSON_COMPLEX"); + storeTypes[2].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonComplexKey"); + storeTypes[2].setKeyFields(new CacheJdbcPojoStoreTypeField[3]); + storeTypes[2].getKeyFields()[0] = new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", int.class, "id"); + storeTypes[2].getKeyFields()[1] = new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", int.class, "orgId"); + storeTypes[2].getKeyFields()[2] = new CacheJdbcPojoStoreTypeField(Types.INTEGER, "CITY_ID", int.class, "cityId"); + + storeTypes[2].setValueType("org.apache.ignite.cache.store.jdbc.model.Person"); + storeTypes[2].setValueFields(new CacheJdbcPojoStoreTypeField[4]); + storeTypes[2].getValueFields()[0] = new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id"); + storeTypes[2].getValueFields()[1] = new CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId"); + storeTypes[2].getValueFields()[2] = new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name"); + storeTypes[2].getValueFields()[3] = new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "SALARY", Integer.class, "salary"); + + storeTypes[3] = new CacheJdbcPojoStoreType(); + storeTypes[3].setDatabaseSchema("PUBLIC"); + storeTypes[3].setDatabaseTable("TIMESTAMP_ENTRIES"); + storeTypes[3].setKeyType("java.sql.Timestamp"); + storeTypes[3].setKeyFields(new CacheJdbcPojoStoreTypeField[1]); + storeTypes[3].getKeyFields()[0] = new CacheJdbcPojoStoreTypeField(Types.TIMESTAMP, "KEY", Timestamp.class, null); + + storeTypes[3].setValueType("java.lang.Integer"); + storeTypes[3].setValueFields(new CacheJdbcPojoStoreTypeField[1]); + storeTypes[3].getValueFields()[0] = new CacheJdbcPojoStoreTypeField(Types.INTEGER, "VAL", Integer.class, null); + + storeTypes[4] = new CacheJdbcPojoStoreType(); + storeTypes[4].setDatabaseSchema("PUBLIC"); + storeTypes[4].setDatabaseTable("STRING_ENTRIES"); + storeTypes[4].setKeyType("java.lang.String"); + storeTypes[4].setKeyFields(new CacheJdbcPojoStoreTypeField[1]); + storeTypes[4].getKeyFields()[0] = new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "KEY", String.class, null); + + storeTypes[4].setValueType("java.lang.String"); + storeTypes[4].setValueFields(new CacheJdbcPojoStoreTypeField[1]); + storeTypes[4].getValueFields()[0] = new CacheJdbcPojoStoreTypeField(Types.VARCHAR, "VAL", Integer.class, null); + + storeTypes[5] = new CacheJdbcPojoStoreType(); + storeTypes[5].setDatabaseSchema("PUBLIC"); + storeTypes[5].setDatabaseTable("UUID_ENTRIES"); + storeTypes[5].setKeyType("java.util.UUID"); + storeTypes[5].setKeyFields(new CacheJdbcPojoStoreTypeField[1]); + storeTypes[5].getKeyFields()[0] = new CacheJdbcPojoStoreTypeField(Types.BINARY, "KEY", UUID.class, null); + + storeTypes[5].setValueType("java.util.UUID"); + storeTypes[5].setValueFields(new CacheJdbcPojoStoreTypeField[1]); + storeTypes[5].getValueFields()[0] = new CacheJdbcPojoStoreTypeField(Types.BINARY, "VAL", UUID.class, null); + + storeCfg.setTypes(storeTypes); + + storeFactory.setConfiguration(storeCfg); + + CacheJdbcPojoStore<Object, Object> store = storeFactory.create(); // PGPoolingDataSource ds = new PGPoolingDataSource(); // ds.setUser("postgres"); @@ -91,57 +170,9 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache // ds.setUser("mysql"); // ds.setPassword("mysql"); + // H2 DataSource store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", "")); - URL cfgUrl; - - try { - cfgUrl = new URL(DFLT_MAPPING_CONFIG); - } - catch (MalformedURLException ignore) { - cfgUrl = U.resolveIgniteUrl(DFLT_MAPPING_CONFIG); - } - - if (cfgUrl == null) - throw new IgniteException("Failed to resolve metadata path: " + DFLT_MAPPING_CONFIG); - - try { - GenericApplicationContext springCtx = new GenericApplicationContext(); - - new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(new UrlResource(cfgUrl)); - - springCtx.refresh(); - - Collection<CacheJdbcPojoStoreType> typeMeta = springCtx.getBeansOfType(CacheJdbcPojoStoreType.class).values(); - - Map<Integer, Map<Object, CacheJdbcPojoStore.EntryMapping>> cacheMappings = new HashMap<>(); - - JdbcDialect dialect = store.resolveDialect(); - - GridTestUtils.setFieldValue(store, CacheJdbcPojoStore.class, "dialect", dialect); - - Map<Object, CacheJdbcPojoStore.EntryMapping> entryMappings = U.newHashMap(typeMeta.size()); - - for (CacheJdbcPojoStoreType type : typeMeta) - entryMappings.put(store.keyTypeId(type.getKeyType()), - new CacheJdbcPojoStore.EntryMapping(null, dialect, type)); - - store.prepareBuilders(null, typeMeta); - - cacheMappings.put(null, entryMappings); - - GridTestUtils.setFieldValue(store, CacheJdbcPojoStore.class, "cacheMappings", cacheMappings); - } - catch (BeansException e) { - if (X.hasCause(e, ClassNotFoundException.class)) - throw new IgniteException("Failed to instantiate Spring XML application context " + - "(make sure all classes used in Spring configuration are present at CLASSPATH) " + - "[springUrl=" + cfgUrl + ']', e); - else - throw new IgniteException("Failed to instantiate Spring XML application context [springUrl=" + - cfgUrl + ", err=" + e.getMessage() + ']', e); - } - return store; } @@ -224,7 +255,6 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache super.beforeTest(); } - /** * @throws Exception If failed. */ @@ -274,7 +304,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache if (i > 0) prnComplexStmt.setInt(5, 1000 + i * 500); else // Add person with null salary - prnComplexStmt.setNull(5, java.sql.Types.INTEGER); + prnComplexStmt.setNull(5, Types.INTEGER); prnComplexStmt.addBatch(); } @@ -302,9 +332,9 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache Person val = (Person)v; - assert key.getId() == val.getId(); - assert key.getOrgId() == val.getOrgId(); - assertEquals("name" + key.getId(), val.getName()); + assertTrue("Key ID should be the same as value ID", key.getId() == val.getId()); + assertTrue("Key orgID should be the same as value orgID", key.getOrgId() == val.getOrgId()); + assertEquals("name" + key.getId(), val.getName()); prnComplexKeys.add((PersonComplexKey)k); }