Repository: ignite
Updated Branches:
  refs/heads/ignite-1753 aee462759 -> 8e26c84d1


IGNITE-1753 WIP on POJO store refactoring + fixed first POJO store test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e26c84d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e26c84d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e26c84d

Branch: refs/heads/ignite-1753
Commit: 8e26c84d1a5739ed97c90c28e65bd075fcab1ade
Parents: aee4627
Author: AKuznetsov <akuznet...@gridgain.com>
Authored: Fri Oct 23 23:24:32 2015 +0700
Committer: AKuznetsov <akuznet...@gridgain.com>
Committed: Fri Oct 23 23:24:32 2015 +0700

----------------------------------------------------------------------
 .../cache/store/jdbc/CacheJdbcPojoStore.java    |  30 +-
 .../jdbc/CacheJdbcPojoStoreConfiguration.java   |   2 +-
 .../store/jdbc/CacheJdbcPojoStoreFactory.java   |   1 +
 .../store/jdbc/CacheJdbcPojoStoreType.java      |  14 +-
 .../CacheJdbcPojoStoreCompatibilityTest.java    | 407 +++++++++++++++++++
 .../store/jdbc/CacheJdbcPojoStoreTest.java      | 162 +++++---
 6 files changed, 527 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8e26c84d/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
index 044f79d..a238f6e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
@@ -246,7 +246,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      * @return Key type id.
      * @throws CacheException If failed to get type key id from object.
      */
-    protected Object keyTypeId(String type) throws CacheException {
+    private Object keyTypeId(String type) throws CacheException {
         try {
             return Class.forName(type);
         }
@@ -262,7 +262,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      * @param types Collection of types.
      * @throws CacheException If failed to prepare internal builders for types.
      */
-    protected void prepareBuilders(@Nullable String cacheName, 
Collection<CacheJdbcPojoStoreType> types)
+    private void prepareBuilders(@Nullable String cacheName, 
Collection<CacheJdbcPojoStoreType> types)
         throws CacheException {
         Map<String, PojoMethodsCache> typeMethods = U.newHashMap(types.size() 
* 2);
 
@@ -287,7 +287,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      * @return The resolved dialect.
      * @throws CacheException Indicates problems accessing the metadata.
      */
-    protected JdbcDialect resolveDialect() throws CacheException {
+    private JdbcDialect resolveDialect() throws CacheException {
         Connection conn = null;
 
         String dbProductName = null;
@@ -361,7 +361,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      * @return Connection.
      * @throws SQLException In case of error.
      */
-    protected Connection connection() throws SQLException {
+    private Connection connection() throws SQLException {
         CacheStoreSession ses = session();
 
         if (ses.transaction() != null) {
@@ -388,7 +388,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      *
      * @param conn Connection to close.
      */
-    protected void closeConnection(@Nullable Connection conn) {
+    private void closeConnection(@Nullable Connection conn) {
         CacheStoreSession ses = session();
 
         // Close connection right away if there is no transaction.
@@ -402,7 +402,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      * @param conn Allocated connection.
      * @param st Created statement,
      */
-    protected void end(@Nullable Connection conn, @Nullable Statement st) {
+    private void end(@Nullable Connection conn, @Nullable Statement st) {
         U.closeQuiet(st);
 
         closeConnection(conn);
@@ -448,7 +448,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      * @return Value in column.
      * @throws SQLException If a database access error occurs or this method 
is called.
      */
-    protected Object getColumnValue(ResultSet rs, int colIdx, Class<?> type) 
throws SQLException {
+    private Object getColumnValue(ResultSet rs, int colIdx, Class<?> type) 
throws SQLException {
         Object val = rs.getObject(colIdx);
 
         if (val == null)
@@ -589,7 +589,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      * @param cls Class.
      * @return {@code True} if object is a simple type.
      */
-    protected static boolean simpleType(Class<?> cls) {
+    private static boolean simpleType(Class<?> cls) {
         return (Number.class.isAssignableFrom(cls) || 
String.class.isAssignableFrom(cls) ||
             java.util.Date.class.isAssignableFrom(cls) || 
Boolean.class.isAssignableFrom(cls) ||
             UUID.class.isAssignableFrom(cls));
@@ -1394,7 +1394,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      * @param fieldVal Field value.
      * @throws CacheException If failed to set statement parameter.
      */
-    protected void fillParameter(PreparedStatement stmt, int i, 
CacheJdbcPojoStoreTypeField field, @Nullable Object fieldVal)
+    private void fillParameter(PreparedStatement stmt, int i, 
CacheJdbcPojoStoreTypeField field, @Nullable Object fieldVal)
         throws CacheException {
         try {
             if (fieldVal != null) {
@@ -1430,7 +1430,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      * @return Next index for parameters.
      * @throws CacheException If failed to set statement parameters.
      */
-    protected int fillKeyParameters(PreparedStatement stmt, int idx, 
EntryMapping em,
+    private int fillKeyParameters(PreparedStatement stmt, int idx, 
EntryMapping em,
         Object key) throws CacheException {
         for (CacheJdbcPojoStoreTypeField field : em.keyColumns()) {
             Object fieldVal = extractParameter(em.cacheName, em.keyType(), 
field.getJavaName(), key);
@@ -1448,7 +1448,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      * @return Next index for parameters.
      * @throws CacheException If failed to set statement parameters.
      */
-    protected int fillKeyParameters(PreparedStatement stmt, EntryMapping m, 
Object key) throws CacheException {
+    private int fillKeyParameters(PreparedStatement stmt, EntryMapping m, 
Object key) throws CacheException {
         return fillKeyParameters(stmt, 1, m, key);
     }
 
@@ -1460,7 +1460,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      * @return Next index for parameters.
      * @throws CacheException If failed to set statement parameters.
      */
-    protected int fillValueParameters(PreparedStatement stmt, int idx, 
EntryMapping em, Object val)
+    private int fillValueParameters(PreparedStatement stmt, int idx, 
EntryMapping em, Object val)
         throws CacheWriterException {
         for (CacheJdbcPojoStoreTypeField field : em.uniqValFields) {
             Object fieldVal = extractParameter(em.cacheName, em.valueType(), 
field.getJavaName(), val);
@@ -1618,7 +1618,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
      */
     private static class PojoMethodsCache {
         /** POJO class. */
-        protected final Class<?> cls;
+        private final Class<?> cls;
 
         /** Constructor for POJO object. */
         private Constructor ctor;
@@ -1836,7 +1836,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
          * @param keyCnt Key count.
          * @return Load query statement text.
          */
-        protected String loadQuery(int keyCnt) {
+        private String loadQuery(int keyCnt) {
             assert keyCnt <= maxKeysPerStmt;
 
             if (keyCnt == maxKeysPerStmt)
@@ -1855,7 +1855,7 @@ public class CacheJdbcPojoStore<K, V> implements 
CacheStore<K, V>, LifecycleAwar
          * @param appendUpperBound Need add upper bound for range.
          * @return Query with range.
          */
-        protected String loadCacheRangeQuery(boolean appendLowerBound, boolean 
appendUpperBound) {
+        private String loadCacheRangeQuery(boolean appendLowerBound, boolean 
appendUpperBound) {
             return dialect.loadCacheRangeQuery(fullTblName, keyCols, cols, 
appendLowerBound, appendUpperBound);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e26c84d/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java
index 7322279..b333bc7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreConfiguration.java
@@ -44,7 +44,7 @@ public class CacheJdbcPojoStoreConfiguration implements 
Serializable {
     private String dataSrcBean;
 
     /** Database dialect. */
-    protected JdbcDialect dialect;
+    private JdbcDialect dialect;
 
     /** Max workers thread count. These threads are responsible for load 
cache. */
     private int maxPoolSz = Runtime.getRuntime().availableProcessors();

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e26c84d/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java
index 003cfdb..6d8f8af 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreFactory.java
@@ -145,6 +145,7 @@ public class CacheJdbcPojoStoreFactory<K, V> implements 
Factory<CacheJdbcPojoSto
      * @return {@code This} for chaining.
      * @see CacheJdbcPojoStore#setDataSource(DataSource)
      */
+    @Deprecated
     public CacheJdbcPojoStoreFactory<K, V> setDataSourceBean(String 
dataSrcBean) {
         this.dataSrcBean = dataSrcBean;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e26c84d/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java
index 85d5ed5..e755165 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreType.java
@@ -81,21 +81,21 @@ public class CacheJdbcPojoStoreType implements Serializable 
{
     }
 
     /**
-     * Gets database schema name.
+     * Gets associated cache name.
      *
-     * @return Schema name.
+     * @return Сache name.
      */
     public String getCacheName() {
-        return dbSchema;
+        return cacheName;
     }
 
     /**
-     * Sets database schema name.
+     * Sets associated cache name.
      *
-     * @param dbSchema Schema name.
+     * @param cacheName Cache name.
      */
-    public CacheJdbcPojoStoreType setCacheName(String dbSchema) {
-        this.dbSchema = dbSchema;
+    public CacheJdbcPojoStoreType setCacheName(String cacheName) {
+        this.cacheName = cacheName;
 
         return this;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e26c84d/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreCompatibilityTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreCompatibilityTest.java
new file mode 100644
index 0000000..021a89d
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreCompatibilityTest.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.jdbc.dialect.*;
+import org.apache.ignite.cache.store.jdbc.model.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.cache.*;
+import org.h2.jdbcx.*;
+import org.springframework.beans.*;
+import org.springframework.beans.factory.xml.*;
+import org.springframework.context.support.*;
+import org.springframework.core.io.*;
+
+import javax.cache.integration.*;
+import java.net.*;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Class for {@code PojoCacheStore} tests.
+ */
+public class CacheJdbcPojoStoreCompatibilityTest extends 
GridAbstractCacheStoreSelfTest<CacheJdbcPojoStore<Object, Object>> {
+    /** DB connection URL. */
+    private static final String DFLT_CONN_URL = 
"jdbc:h2:mem:autoCacheStore;DB_CLOSE_DELAY=-1";
+
+    /** Default config with mapping. */
+    private static final String DFLT_MAPPING_CONFIG = 
"modules/core/src/test/config/store/jdbc/ignite-type-metadata.xml";
+
+    /** Organization count. */
+    protected static final int ORGANIZATION_CNT = 1000;
+
+    /** Person count. */
+    protected static final int PERSON_CNT = 100000;
+
+    /**
+     * @throws Exception If failed.
+     */
+    public CacheJdbcPojoStoreCompatibilityTest() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheJdbcPojoStore<Object, Object> store() {
+        CacheJdbcPojoStore<Object, Object> store = new CacheJdbcPojoStore<>();
+
+//        PGPoolingDataSource ds = new PGPoolingDataSource();
+//        ds.setUser("postgres");
+//        ds.setPassword("postgres");
+//        ds.setServerName("ip");
+//        ds.setDatabaseName("postgres");
+//        store.setDataSource(ds);
+
+//        MysqlDataSource ds = new MysqlDataSource();
+//        ds.setURL("jdbc:mysql://ip:port/dbname");
+//        ds.setUser("mysql");
+//        ds.setPassword("mysql");
+
+        store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", 
""));
+
+        URL cfgUrl;
+
+        try {
+            cfgUrl = new URL(DFLT_MAPPING_CONFIG);
+        }
+        catch (MalformedURLException ignore) {
+            cfgUrl = U.resolveIgniteUrl(DFLT_MAPPING_CONFIG);
+        }
+
+        if (cfgUrl == null)
+            throw new IgniteException("Failed to resolve metadata path: " + 
DFLT_MAPPING_CONFIG);
+
+        try {
+            GenericApplicationContext springCtx = new 
GenericApplicationContext();
+
+            new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(new 
UrlResource(cfgUrl));
+
+            springCtx.refresh();
+
+            Collection<CacheJdbcPojoStoreType> typeMeta = 
springCtx.getBeansOfType(CacheJdbcPojoStoreType.class).values();
+
+            Map<Integer, Map<Object, CacheJdbcPojoStore.EntryMapping>> 
cacheMappings = new HashMap<>();
+// TODO-1753 fix tests
+//            JdbcDialect dialect = store.resolveDialect();
+//
+//            GridTestUtils.setFieldValue(store, CacheJdbcPojoStore.class, 
"dialect", dialect);
+//
+//            Map<Object, CacheJdbcPojoStore.EntryMapping> entryMappings = 
U.newHashMap(typeMeta.size());
+//
+//            for (CacheJdbcPojoStoreType type : typeMeta)
+//                entryMappings.put(store.keyTypeId(type.getKeyType()),
+//                    new CacheJdbcPojoStore.EntryMapping(null, dialect, 
type));
+//
+//            store.prepareBuilders(null, typeMeta);
+//
+//            cacheMappings.put(null, entryMappings);
+//
+//            GridTestUtils.setFieldValue(store, CacheJdbcPojoStore.class, 
"cacheMappings", cacheMappings);
+        }
+        catch (BeansException e) {
+            if (X.hasCause(e, ClassNotFoundException.class))
+                throw new IgniteException("Failed to instantiate Spring XML 
application context " +
+                    "(make sure all classes used in Spring configuration are 
present at CLASSPATH) " +
+                    "[springUrl=" + cfgUrl + ']', e);
+            else
+                throw new IgniteException("Failed to instantiate Spring XML 
application context [springUrl=" +
+                    cfgUrl + ", err=" + e.getMessage() + ']', e);
+        }
+
+        return store;
+    }
+
+    /**
+     * @param store Store.
+     * @throws Exception If failed.
+     */
+    @Override protected void inject(CacheJdbcPojoStore<Object, Object> store) 
throws Exception {
+        getTestResources().inject(store);
+
+        GridTestUtils.setFieldValue(store, CacheJdbcPojoStore.class, "ses", 
ses);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        Connection conn = store.openConnection(false);
+
+        Statement stmt = conn.createStatement();
+
+        try {
+            stmt.executeUpdate("delete from String_Entries");
+        }
+        catch (SQLException ignore) {
+            // No-op.
+        }
+
+        try {
+            stmt.executeUpdate("delete from UUID_Entries");
+        }
+        catch (SQLException ignore) {
+            // No-op.
+        }
+
+        try {
+            stmt.executeUpdate("delete from Organization");
+        }
+        catch (SQLException ignore) {
+            // No-op.
+        }
+
+        try {
+            stmt.executeUpdate("delete from Person");
+        }
+        catch (SQLException ignore) {
+            // No-op.
+        }
+
+        try {
+            stmt.executeUpdate("delete from Timestamp_Entries");
+        }
+        catch (SQLException ignore) {
+            // No-op.
+        }
+
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+            "String_Entries (key varchar(100) not null, val varchar(100), 
PRIMARY KEY(key))");
+
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+            "UUID_Entries (key binary(16) not null, val binary(16), PRIMARY 
KEY(key))");
+
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+            "Timestamp_Entries (key timestamp not null, val integer, PRIMARY 
KEY(key))");
+
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+            "Organization (id integer not null, name varchar(50), city 
varchar(50), PRIMARY KEY(id))");
+
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+            "Person (id integer not null, org_id integer, name varchar(50), 
PRIMARY KEY(id))");
+
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " +
+            "Person_Complex (id integer not null, org_id integer not null, 
city_id integer not null, " +
+            "name varchar(50), salary integer, PRIMARY KEY(id))");
+
+        conn.commit();
+
+        U.closeQuiet(stmt);
+
+        U.closeQuiet(conn);
+
+        super.beforeTest();
+    }
+
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCache() throws Exception {
+        Connection conn = store.openConnection(false);
+
+        PreparedStatement orgStmt = conn.prepareStatement("INSERT INTO 
Organization(id, name, city) VALUES (?, ?, ?)");
+
+        for (int i = 0; i < ORGANIZATION_CNT; i++) {
+            orgStmt.setInt(1, i);
+            orgStmt.setString(2, "name" + i);
+            orgStmt.setString(3, "city" + i % 10);
+
+            orgStmt.addBatch();
+        }
+
+        orgStmt.executeBatch();
+
+        U.closeQuiet(orgStmt);
+
+        conn.commit();
+
+        PreparedStatement prnStmt = conn.prepareStatement("INSERT INTO 
Person(id, org_id, name) VALUES (?, ?, ?)");
+
+        for (int i = 0; i < PERSON_CNT; i++) {
+            prnStmt.setInt(1, i);
+            prnStmt.setInt(2, i % 100);
+            prnStmt.setString(3, "name" + i);
+
+            prnStmt.addBatch();
+        }
+
+        prnStmt.executeBatch();
+
+        conn.commit();
+
+        U.closeQuiet(prnStmt);
+
+        PreparedStatement prnComplexStmt = conn.prepareStatement("INSERT INTO 
Person_Complex(id, org_id, city_id, name, salary) VALUES (?, ?, ?, ?, ?)");
+
+        for (int i = 0; i < PERSON_CNT; i++) {
+            prnComplexStmt.setInt(1, i);
+            prnComplexStmt.setInt(2, i % 500);
+            prnComplexStmt.setInt(3, i % 100);
+            prnComplexStmt.setString(4, "name" + i);
+
+            if (i > 0)
+                prnComplexStmt.setInt(5, 1000 + i * 500);
+            else // Add person with null salary
+                prnComplexStmt.setNull(5, java.sql.Types.INTEGER);
+
+            prnComplexStmt.addBatch();
+        }
+
+        prnComplexStmt.executeBatch();
+
+        U.closeQuiet(prnComplexStmt);
+
+        conn.commit();
+
+        U.closeQuiet(conn);
+
+        final Collection<OrganizationKey> orgKeys = new 
ConcurrentLinkedQueue<>();
+        final Collection<PersonKey> prnKeys = new ConcurrentLinkedQueue<>();
+        final Collection<PersonComplexKey> prnComplexKeys = new 
ConcurrentLinkedQueue<>();
+
+        IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() {
+            @Override public void apply(Object k, Object v) {
+                if (k instanceof OrganizationKey && v instanceof Organization)
+                    orgKeys.add((OrganizationKey)k);
+                else if (k instanceof PersonKey && v instanceof Person)
+                    prnKeys.add((PersonKey)k);
+                else if (k instanceof PersonComplexKey && v instanceof Person) 
{
+                    PersonComplexKey key = (PersonComplexKey)k;
+
+                    Person val = (Person)v;
+
+                    assert key.getId() == val.getId();
+                    assert key.getOrgId() == val.getOrgId();
+                    assertEquals("name"  + key.getId(), val.getName());
+
+                    prnComplexKeys.add((PersonComplexKey)k);
+                }
+            }
+        };
+
+        store.loadCache(c);
+
+        assertEquals(ORGANIZATION_CNT, orgKeys.size());
+        assertEquals(PERSON_CNT, prnKeys.size());
+        assertEquals(PERSON_CNT, prnComplexKeys.size());
+
+        Collection<OrganizationKey> tmpOrgKeys = new ArrayList<>(orgKeys);
+        Collection<PersonKey> tmpPrnKeys = new ArrayList<>(prnKeys);
+        Collection<PersonComplexKey> tmpPrnComplexKeys = new 
ArrayList<>(prnComplexKeys);
+
+        orgKeys.clear();
+        prnKeys.clear();
+        prnComplexKeys.clear();
+
+        store.loadCache(c, OrganizationKey.class.getName(), "SELECT name, 
city, id FROM ORGANIZATION",
+            PersonKey.class.getName(), "SELECT org_id, id, name FROM Person 
WHERE id < 1000");
+
+        assertEquals(ORGANIZATION_CNT, orgKeys.size());
+        assertEquals(1000, prnKeys.size());
+        assertEquals(0, prnComplexKeys.size());
+
+        store.deleteAll(tmpOrgKeys);
+        store.deleteAll(tmpPrnKeys);
+        store.deleteAll(tmpPrnComplexKeys);
+
+        orgKeys.clear();
+        prnKeys.clear();
+        prnComplexKeys.clear();
+
+        store.loadCache(c);
+
+        assertTrue(orgKeys.isEmpty());
+        assertTrue(prnKeys.isEmpty());
+        assertTrue(prnComplexKeys.isEmpty());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWriteRetry() throws Exception {
+        // Special dialect that will skip updates, to test write retry.
+        BasicJdbcDialect dialect = new BasicJdbcDialect() {
+            /** {@inheritDoc} */
+            @Override public String updateQuery(String tblName, 
Collection<String> keyCols, Iterable<String> valCols) {
+                return super.updateQuery(tblName, keyCols, valCols) + " AND 1 
= 0";
+            }
+        };
+
+        store.setDialect(dialect);
+
+        Map<String, Map<Object, CacheJdbcPojoStore.EntryMapping>> 
cacheMappings =
+            GridTestUtils.getFieldValue(store, CacheJdbcPojoStore.class, 
"cacheMappings");
+
+        CacheJdbcPojoStore.EntryMapping em = 
cacheMappings.get(null).get(OrganizationKey.class);
+
+        CacheJdbcPojoStoreType typeMeta = GridTestUtils.getFieldValue(em, 
CacheJdbcPojoStore.EntryMapping.class, "typeMeta");
+
+        cacheMappings.get(null).put(OrganizationKey.class,
+            new CacheJdbcPojoStore.EntryMapping(null, dialect, typeMeta));
+
+        Connection conn = store.openConnection(false);
+
+        PreparedStatement orgStmt = conn.prepareStatement("INSERT INTO 
Organization(id, name, city) VALUES (?, ?, ?)");
+
+        orgStmt.setInt(1, 1);
+        orgStmt.setString(2, "name" + 1);
+        orgStmt.setString(3, "city" + 1);
+
+        orgStmt.executeUpdate();
+
+        U.closeQuiet(orgStmt);
+
+        conn.commit();
+
+        OrganizationKey k1 = new OrganizationKey(1);
+        Organization v1 = new Organization(1, "Name1", "City1");
+
+        ses.newSession(null);
+
+        try {
+            store.write(new CacheEntryImpl<>(k1, v1));
+        }
+        catch (CacheWriterException e) {
+            if (!e.getMessage().startsWith("Failed insert entry in database, 
violate a unique index or primary key") ||
+                e.getSuppressed().length != 2)
+                throw e;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTimestamp() throws Exception {
+        Timestamp k = new Timestamp(System.currentTimeMillis());
+
+        ses.newSession(null);
+
+        Integer v = 5;
+
+        store.write(new CacheEntryImpl<>(k, v));
+
+        assertEquals(v, store.load(k));
+
+        store.delete(k);
+
+        assertNull(store.load(k));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8e26c84d/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
index 746059d..af6f541 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java
@@ -17,22 +17,21 @@
 
 package org.apache.ignite.cache.store.jdbc;
 
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import javax.cache.integration.CacheWriterException;
-import org.apache.ignite.IgniteException;
+
 import org.apache.ignite.cache.store.jdbc.dialect.BasicJdbcDialect;
-import org.apache.ignite.cache.store.jdbc.dialect.JdbcDialect;
+import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect;
 import org.apache.ignite.cache.store.jdbc.model.Organization;
 import org.apache.ignite.cache.store.jdbc.model.OrganizationKey;
 import org.apache.ignite.cache.store.jdbc.model.Person;
@@ -40,17 +39,11 @@ import 
org.apache.ignite.cache.store.jdbc.model.PersonComplexKey;
 import org.apache.ignite.cache.store.jdbc.model.PersonKey;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.testframework.GridTestUtils;
 import 
org.apache.ignite.testframework.junits.cache.GridAbstractCacheStoreSelfTest;
 import org.h2.jdbcx.JdbcConnectionPool;
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.xml.XmlBeanDefinitionReader;
-import org.springframework.context.support.GenericApplicationContext;
-import org.springframework.core.io.UrlResource;
 
 /**
  * Class for {@code PojoCacheStore} tests.
@@ -77,7 +70,93 @@ public class CacheJdbcPojoStoreTest extends 
GridAbstractCacheStoreSelfTest<Cache
 
     /** {@inheritDoc} */
     @Override protected CacheJdbcPojoStore<Object, Object> store() {
-        CacheJdbcPojoStore<Object, Object> store = new CacheJdbcPojoStore<>();
+        CacheJdbcPojoStoreFactory<Object, Object> storeFactory = new 
CacheJdbcPojoStoreFactory<>();
+
+        CacheJdbcPojoStoreConfiguration storeCfg = new 
CacheJdbcPojoStoreConfiguration();
+        storeCfg.setDialect(new H2Dialect());
+
+        CacheJdbcPojoStoreType[] storeTypes = new CacheJdbcPojoStoreType[6];
+
+        storeTypes[0] = new CacheJdbcPojoStoreType();
+        storeTypes[0].setDatabaseSchema("PUBLIC");
+        storeTypes[0].setDatabaseTable("ORGANIZATION");
+        
storeTypes[0].setKeyType("org.apache.ignite.cache.store.jdbc.model.OrganizationKey");
+        storeTypes[0].setKeyFields(new CacheJdbcPojoStoreTypeField[1]);
+        storeTypes[0].getKeyFields()[0] = new 
CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id");
+
+        
storeTypes[0].setValueType("org.apache.ignite.cache.store.jdbc.model.Organization");
+        storeTypes[0].setValueFields(new CacheJdbcPojoStoreTypeField[3]);
+        storeTypes[0].getValueFields()[0] = new 
CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id");
+        storeTypes[0].getValueFields()[1] = new 
CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name");
+        storeTypes[0].getValueFields()[2] = new 
CacheJdbcPojoStoreTypeField(Types.VARCHAR, "CITY", String.class, "city");
+
+        storeTypes[1] = new CacheJdbcPojoStoreType();
+        storeTypes[1].setDatabaseSchema("PUBLIC");
+        storeTypes[1].setDatabaseTable("PERSON");
+        
storeTypes[1].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonKey");
+        storeTypes[1].setKeyFields(new CacheJdbcPojoStoreTypeField[1]);
+        storeTypes[1].getKeyFields()[0] = new 
CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id");
+
+        
storeTypes[1].setValueType("org.apache.ignite.cache.store.jdbc.model.Person");
+        storeTypes[1].setValueFields(new CacheJdbcPojoStoreTypeField[3]);
+        storeTypes[1].getValueFields()[0] = new 
CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id");
+        storeTypes[1].getValueFields()[1] = new 
CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId");
+        storeTypes[1].getValueFields()[2] = new 
CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name");
+
+        storeTypes[2] = new CacheJdbcPojoStoreType();
+        storeTypes[2].setDatabaseSchema("PUBLIC");
+        storeTypes[2].setDatabaseTable("PERSON_COMPLEX");
+        
storeTypes[2].setKeyType("org.apache.ignite.cache.store.jdbc.model.PersonComplexKey");
+        storeTypes[2].setKeyFields(new CacheJdbcPojoStoreTypeField[3]);
+        storeTypes[2].getKeyFields()[0] = new 
CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", int.class, "id");
+        storeTypes[2].getKeyFields()[1] = new 
CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", int.class, "orgId");
+        storeTypes[2].getKeyFields()[2] = new 
CacheJdbcPojoStoreTypeField(Types.INTEGER, "CITY_ID", int.class, "cityId");
+
+        
storeTypes[2].setValueType("org.apache.ignite.cache.store.jdbc.model.Person");
+        storeTypes[2].setValueFields(new CacheJdbcPojoStoreTypeField[4]);
+        storeTypes[2].getValueFields()[0] = new 
CacheJdbcPojoStoreTypeField(Types.INTEGER, "ID", Integer.class, "id");
+        storeTypes[2].getValueFields()[1] = new 
CacheJdbcPojoStoreTypeField(Types.INTEGER, "ORG_ID", Integer.class, "orgId");
+        storeTypes[2].getValueFields()[2] = new 
CacheJdbcPojoStoreTypeField(Types.VARCHAR, "NAME", String.class, "name");
+        storeTypes[2].getValueFields()[3] = new 
CacheJdbcPojoStoreTypeField(Types.VARCHAR, "SALARY", Integer.class, "salary");
+
+        storeTypes[3] = new CacheJdbcPojoStoreType();
+        storeTypes[3].setDatabaseSchema("PUBLIC");
+        storeTypes[3].setDatabaseTable("TIMESTAMP_ENTRIES");
+        storeTypes[3].setKeyType("java.sql.Timestamp");
+        storeTypes[3].setKeyFields(new CacheJdbcPojoStoreTypeField[1]);
+        storeTypes[3].getKeyFields()[0] = new 
CacheJdbcPojoStoreTypeField(Types.TIMESTAMP, "KEY", Timestamp.class, null);
+
+        storeTypes[3].setValueType("java.lang.Integer");
+        storeTypes[3].setValueFields(new CacheJdbcPojoStoreTypeField[1]);
+        storeTypes[3].getValueFields()[0] = new 
CacheJdbcPojoStoreTypeField(Types.INTEGER, "VAL", Integer.class, null);
+
+        storeTypes[4] = new CacheJdbcPojoStoreType();
+        storeTypes[4].setDatabaseSchema("PUBLIC");
+        storeTypes[4].setDatabaseTable("STRING_ENTRIES");
+        storeTypes[4].setKeyType("java.lang.String");
+        storeTypes[4].setKeyFields(new CacheJdbcPojoStoreTypeField[1]);
+        storeTypes[4].getKeyFields()[0] = new 
CacheJdbcPojoStoreTypeField(Types.VARCHAR, "KEY", String.class, null);
+
+        storeTypes[4].setValueType("java.lang.String");
+        storeTypes[4].setValueFields(new CacheJdbcPojoStoreTypeField[1]);
+        storeTypes[4].getValueFields()[0] = new 
CacheJdbcPojoStoreTypeField(Types.VARCHAR, "VAL", Integer.class, null);
+
+        storeTypes[5] = new CacheJdbcPojoStoreType();
+        storeTypes[5].setDatabaseSchema("PUBLIC");
+        storeTypes[5].setDatabaseTable("UUID_ENTRIES");
+        storeTypes[5].setKeyType("java.util.UUID");
+        storeTypes[5].setKeyFields(new CacheJdbcPojoStoreTypeField[1]);
+        storeTypes[5].getKeyFields()[0] = new 
CacheJdbcPojoStoreTypeField(Types.BINARY, "KEY", UUID.class, null);
+
+        storeTypes[5].setValueType("java.util.UUID");
+        storeTypes[5].setValueFields(new CacheJdbcPojoStoreTypeField[1]);
+        storeTypes[5].getValueFields()[0] = new 
CacheJdbcPojoStoreTypeField(Types.BINARY, "VAL", UUID.class, null);
+
+        storeCfg.setTypes(storeTypes);
+
+        storeFactory.setConfiguration(storeCfg);
+
+        CacheJdbcPojoStore<Object, Object> store = storeFactory.create();
 
 //        PGPoolingDataSource ds = new PGPoolingDataSource();
 //        ds.setUser("postgres");
@@ -91,57 +170,9 @@ public class CacheJdbcPojoStoreTest extends 
GridAbstractCacheStoreSelfTest<Cache
 //        ds.setUser("mysql");
 //        ds.setPassword("mysql");
 
+        // H2 DataSource
         store.setDataSource(JdbcConnectionPool.create(DFLT_CONN_URL, "sa", 
""));
 
-        URL cfgUrl;
-
-        try {
-            cfgUrl = new URL(DFLT_MAPPING_CONFIG);
-        }
-        catch (MalformedURLException ignore) {
-            cfgUrl = U.resolveIgniteUrl(DFLT_MAPPING_CONFIG);
-        }
-
-        if (cfgUrl == null)
-            throw new IgniteException("Failed to resolve metadata path: " + 
DFLT_MAPPING_CONFIG);
-
-        try {
-            GenericApplicationContext springCtx = new 
GenericApplicationContext();
-
-            new XmlBeanDefinitionReader(springCtx).loadBeanDefinitions(new 
UrlResource(cfgUrl));
-
-            springCtx.refresh();
-
-            Collection<CacheJdbcPojoStoreType> typeMeta = 
springCtx.getBeansOfType(CacheJdbcPojoStoreType.class).values();
-
-            Map<Integer, Map<Object, CacheJdbcPojoStore.EntryMapping>> 
cacheMappings = new HashMap<>();
-
-            JdbcDialect dialect = store.resolveDialect();
-
-            GridTestUtils.setFieldValue(store, CacheJdbcPojoStore.class, 
"dialect", dialect);
-
-            Map<Object, CacheJdbcPojoStore.EntryMapping> entryMappings = 
U.newHashMap(typeMeta.size());
-
-            for (CacheJdbcPojoStoreType type : typeMeta)
-                entryMappings.put(store.keyTypeId(type.getKeyType()),
-                    new CacheJdbcPojoStore.EntryMapping(null, dialect, type));
-
-            store.prepareBuilders(null, typeMeta);
-
-            cacheMappings.put(null, entryMappings);
-
-            GridTestUtils.setFieldValue(store, CacheJdbcPojoStore.class, 
"cacheMappings", cacheMappings);
-        }
-        catch (BeansException e) {
-            if (X.hasCause(e, ClassNotFoundException.class))
-                throw new IgniteException("Failed to instantiate Spring XML 
application context " +
-                    "(make sure all classes used in Spring configuration are 
present at CLASSPATH) " +
-                    "[springUrl=" + cfgUrl + ']', e);
-            else
-                throw new IgniteException("Failed to instantiate Spring XML 
application context [springUrl=" +
-                    cfgUrl + ", err=" + e.getMessage() + ']', e);
-        }
-
         return store;
     }
 
@@ -224,7 +255,6 @@ public class CacheJdbcPojoStoreTest extends 
GridAbstractCacheStoreSelfTest<Cache
         super.beforeTest();
     }
 
-
     /**
      * @throws Exception If failed.
      */
@@ -274,7 +304,7 @@ public class CacheJdbcPojoStoreTest extends 
GridAbstractCacheStoreSelfTest<Cache
             if (i > 0)
                 prnComplexStmt.setInt(5, 1000 + i * 500);
             else // Add person with null salary
-                prnComplexStmt.setNull(5, java.sql.Types.INTEGER);
+                prnComplexStmt.setNull(5, Types.INTEGER);
 
             prnComplexStmt.addBatch();
         }
@@ -302,9 +332,9 @@ public class CacheJdbcPojoStoreTest extends 
GridAbstractCacheStoreSelfTest<Cache
 
                     Person val = (Person)v;
 
-                    assert key.getId() == val.getId();
-                    assert key.getOrgId() == val.getOrgId();
-                    assertEquals("name"  + key.getId(), val.getName());
+                    assertTrue("Key ID should be the same as value ID", 
key.getId() == val.getId());
+                    assertTrue("Key orgID should be the same as value orgID", 
key.getOrgId() == val.getOrgId());
+                    assertEquals("name" + key.getId(), val.getName());
 
                     prnComplexKeys.add((PersonComplexKey)k);
                 }

Reply via email to