Repository: ignite
Updated Branches:
  refs/heads/master d9c482137 -> 5f568790a


minor refactoring - added javadoc - Fixes #3940.

Signed-off-by: Igor Rudyak <igor.rud...@rallyhealth.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f568790
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f568790
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f568790

Branch: refs/heads/master
Commit: 5f568790a5cee1d908b845ceef57c02305041c16
Parents: d9c4821
Author: Igor Rudyak <igor.rud...@rallyhealth.com>
Authored: Tue May 15 10:24:29 2018 -0700
Committer: Igor Rudyak <igor.rud...@rallyhealth.com>
Committed: Tue May 15 10:24:29 2018 -0700

----------------------------------------------------------------------
 .../cassandra/session/CassandraSessionImpl.java | 284 +++++++++++--------
 .../session/WrappedPreparedStatement.java       | 181 ++++++++++++
 .../store/cassandra/session/WrappedSession.java |  91 ++++++
 .../cassandra/session/pool/IdleSession.java     |  72 +++++
 .../cassandra/session/pool/SessionPool.java     |  22 +-
 .../cassandra/session/pool/SessionWrapper.java  |  74 -----
 .../ignite/tests/CassandraConfigTest.java       | 141 +++++++++
 .../ignite/tests/CassandraSessionImplTest.java  |   8 +-
 8 files changed, 671 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
 
b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
index 4fb0cb2..6b14f41 100644
--- 
a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
+++ 
b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
@@ -22,7 +22,9 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.cache.Cache;
 import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.BoundStatement;
@@ -36,6 +38,7 @@ import com.datastax.driver.core.Session;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.core.querybuilder.Batch;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -65,14 +68,21 @@ public class CassandraSessionImpl implements 
CassandraSession {
     /** Cassandra cluster builder. */
     private volatile Cluster.Builder builder;
 
-    /** Cassandra driver session. */
-    private volatile Session ses;
+    /**
+     * Current generation number of Cassandra session. Each time session 
recreated its generation will be incremented.
+     * The main idea behind session generation is to track prepared statements 
created with old Cassandra
+     * session (which is not valid anymore) and avoid extra refresh of 
Cassandra session by multiple threads.
+     **/
+    private volatile Long generation = 0L;
+
+    /** Wrapped Cassandra session. **/
+    private volatile WrappedSession wrapperSes;
 
     /** Number of references to Cassandra driver session (for multithreaded 
environment). */
     private volatile int refCnt;
 
     /** Storage for the session prepared statements */
-    private static final Map<String, PreparedStatement> sesStatements = new 
HashMap<>();
+    private static final Map<String, WrappedPreparedStatement> sesStatements = 
new HashMap<>();
 
     /** Number of records to immediately fetch in CQL statement execution. */
     private Integer fetchSize;
@@ -90,10 +100,10 @@ public class CassandraSessionImpl implements 
CassandraSession {
     private IgniteLogger log;
 
     /** Table absence error handlers counter. */
-    private final AtomicInteger tblAbsenceHandlersCnt = new AtomicInteger(-1);
+    private final Map<String, AtomicInteger> tblAbsenceHandlersCnt = new 
ConcurrentHashMap<>();
 
-    /** Prepared statement cluster disconnection error handlers counter. */
-    private final AtomicInteger prepStatementHandlersCnt = new 
AtomicInteger(-1);
+    /** Lock used to synchronize multiple threads trying to do session 
refresh. **/
+    private final ReentrantLock refreshLock = new ReentrantLock();
 
     /**
      * Creates instance of Cassandra driver session wrapper.
@@ -126,22 +136,26 @@ public class CassandraSessionImpl implements 
CassandraSession {
 
         try {
             while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
-                error = null;
-
                 if (attempt != 0) {
                     log.warning("Trying " + (attempt + 1) + " attempt to 
execute Cassandra CQL statement: " +
                             assistant.getStatement());
                 }
 
+                WrappedPreparedStatement preparedSt = null;
+                WrappedSession ses = null;
+
                 try {
-                    PreparedStatement preparedSt = 
prepareStatement(assistant.getTable(), assistant.getStatement(),
+                    preparedSt = prepareStatement(assistant.getTable(), 
assistant.getStatement(),
                         assistant.getPersistenceSettings(), 
assistant.tableExistenceRequired());
 
                     if (preparedSt == null)
                         return null;
 
                     Statement statement = 
tuneStatementExecutionOptions(assistant.bindStatement(preparedSt));
-                    ResultSet res = session().execute(statement);
+
+                    ses = session();
+
+                    ResultSet res = ses.execute(statement);
 
                     Row row = res == null || !res.iterator().hasNext() ? null 
: res.iterator().next();
 
@@ -159,9 +173,9 @@ public class CassandraSessionImpl implements 
CassandraSession {
                         handleTableAbsenceError(assistant.getTable(), 
assistant.getPersistenceSettings());
                     }
                     else if (CassandraHelper.isHostsAvailabilityError(e))
-                        handleHostsAvailabilityError(e, attempt, errorMsg);
+                        handleHostsAvailabilityError(ses == null ? -1 : 
ses.generation, e, attempt, errorMsg);
                     else if 
(CassandraHelper.isPreparedStatementClusterError(e))
-                        handlePreparedStatementClusterError(e);
+                        handlePreparedStatementClusterError(preparedSt == null 
? -1 : preparedSt.generation , e);
                     else
                         // For an error which we don't know how to handle, we 
will not try next attempts and terminate.
                         throw new IgniteException(errorMsg, e);
@@ -216,19 +230,22 @@ public class CassandraSessionImpl implements 
CassandraSession {
 
                 List<Cache.Entry<Integer, ResultSetFuture>> futResults = new 
LinkedList<>();
 
-                PreparedStatement preparedSt = 
prepareStatement(assistant.getTable(), assistant.getStatement(),
+                WrappedPreparedStatement preparedSt = 
prepareStatement(assistant.getTable(), assistant.getStatement(),
                     assistant.getPersistenceSettings(), 
assistant.tableExistenceRequired());
 
                 if (preparedSt == null)
                     return null;
 
+                WrappedSession ses = null;
+
                 int seqNum = 0;
 
                 for (V obj : data) {
                     if (!assistant.alreadyProcessed(seqNum)) {
                         try {
+                            ses = session();
                             Statement statement = 
tuneStatementExecutionOptions(assistant.bindStatement(preparedSt, obj));
-                            ResultSetFuture fut = 
session().executeAsync(statement);
+                            ResultSetFuture fut = ses.executeAsync(statement);
                             futResults.add(new CacheEntryImpl<>(seqNum, fut));
                         }
                         catch (Throwable e) {
@@ -245,13 +262,18 @@ public class CassandraSessionImpl implements 
CassandraSession {
 
                                 // Handle host availability only once.
                                 if (hostsAvailEx == null)
-                                    handleHostsAvailabilityError(e, attempt, 
errorMsg);
+                                    handleHostsAvailabilityError(ses == null ? 
0 : ses.generation, e, attempt, errorMsg);
                             }
                             else if 
(CassandraHelper.isPreparedStatementClusterError(e)) {
                                 prepStatEx = e;
-                                handlePreparedStatementClusterError(e);
+
+                                
handlePreparedStatementClusterError(preparedSt.generation, e);
+
                                 preparedSt = 
prepareStatement(assistant.getTable(), assistant.getStatement(),
                                         assistant.getPersistenceSettings(), 
assistant.tableExistenceRequired());
+
+                                if (preparedSt == null)
+                                    return null;
                             }
                             else
                                 unknownEx = e;
@@ -319,12 +341,12 @@ public class CassandraSessionImpl implements 
CassandraSession {
 
                 if (hostsAvailEx != null) {
                     error = hostsAvailEx;
-                    handleHostsAvailabilityError(hostsAvailEx, attempt, 
errorMsg);
+                    handleHostsAvailabilityError(ses.generation, hostsAvailEx, 
attempt, errorMsg);
                 }
 
                 if (prepStatEx != null) {
                     error = prepStatEx;
-                    handlePreparedStatementClusterError(prepStatEx);
+                    handlePreparedStatementClusterError(preparedSt.generation, 
prepStatEx);
                 }
 
                 if (!CassandraHelper.isTableAbsenceError(error))
@@ -366,8 +388,12 @@ public class CassandraSessionImpl implements 
CassandraSession {
 
                 Statement statement = 
tuneStatementExecutionOptions(assistant.getStatement());
 
+                WrappedSession ses = null;
+
                 try {
-                    ResultSetFuture fut = session().executeAsync(statement);
+                    ses = session();
+
+                    ResultSetFuture fut = ses.executeAsync(statement);
                     ResultSet resSet = fut.getUninterruptibly();
 
                     if (resSet == null || !resSet.iterator().hasNext())
@@ -384,9 +410,7 @@ public class CassandraSessionImpl implements 
CassandraSession {
                     if (CassandraHelper.isTableAbsenceError(e))
                         return;
                     else if (CassandraHelper.isHostsAvailabilityError(e))
-                        handleHostsAvailabilityError(e, attempt, errorMsg);
-                    else if 
(CassandraHelper.isPreparedStatementClusterError(e))
-                        handlePreparedStatementClusterError(e);
+                        handleHostsAvailabilityError(ses == null ? 0 : 
ses.generation, e, attempt, errorMsg);
                     else
                         // For an error which we don't know how to handle, we 
will not try next attempts and terminate.
                         throw new IgniteException(errorMsg, e);
@@ -420,7 +444,7 @@ public class CassandraSessionImpl implements 
CassandraSession {
 
         int attempt = 0;
         boolean tableExistenceRequired = false;
-        Map<String, PreparedStatement> statements = new HashMap<>();
+        Map<String, WrappedPreparedStatement> statements = new HashMap<>();
         Map<String, KeyValuePersistenceSettings> tableSettings = new 
HashMap<>();
         RandomSleeper sleeper = newSleeper();
 
@@ -428,31 +452,32 @@ public class CassandraSessionImpl implements 
CassandraSession {
 
         try {
             while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
-                error = null;
-
                 if (attempt != 0) {
                     log.warning("Trying " + (attempt + 1) + " attempt to apply 
" + mutations.size() + " mutations " +
                             "performed withing Ignite transaction into 
Cassandra");
                 }
 
+                WrappedPreparedStatement prepStatement = null;
+                WrappedSession ses = null;
+
                 try {
                     BatchStatement batch = new BatchStatement();
 
                     // accumulating all the mutations into one Cassandra 
logged batch
                     for (Mutation mutation : mutations) {
                         String key = mutation.getTable() + 
mutation.getClass().getName();
-                        PreparedStatement st = statements.get(key);
+                        prepStatement = statements.get(key);
 
-                        if (st == null) {
-                            st = prepareStatement(mutation.getTable(), 
mutation.getStatement(),
+                        if (prepStatement == null) {
+                            prepStatement = 
prepareStatement(mutation.getTable(), mutation.getStatement(),
                                     mutation.getPersistenceSettings(), 
mutation.tableExistenceRequired());
 
-                            if (st != null)
-                                statements.put(key, st);
+                            if (prepStatement != null)
+                                statements.put(key, prepStatement);
                         }
 
-                        if (st != null)
-                            batch.add(mutation.bindStatement(st));
+                        if (prepStatement != null)
+                            batch.add(mutation.bindStatement(prepStatement));
 
                         if (attempt == 0) {
                             if (mutation.tableExistenceRequired()) {
@@ -465,8 +490,10 @@ public class CassandraSessionImpl implements 
CassandraSession {
                     }
 
                     // committing logged batch into Cassandra
-                    if (batch.size() > 0)
-                        
session().execute(tuneStatementExecutionOptions(batch));
+                    if (batch.size() > 0) {
+                        ses = session();
+                        ses.execute(tuneStatementExecutionOptions(batch));
+                    }
 
                     return;
                 } catch (Throwable e) {
@@ -480,10 +507,10 @@ public class CassandraSessionImpl implements 
CassandraSession {
                         else
                             return;
                     } else if (CassandraHelper.isHostsAvailabilityError(e)) {
-                        if (handleHostsAvailabilityError(e, attempt, errorMsg))
+                        if (handleHostsAvailabilityError(ses == null ? 0 : 
ses.generation, e, attempt, errorMsg))
                             statements.clear();
                     } else if 
(CassandraHelper.isPreparedStatementClusterError(e)) {
-                        handlePreparedStatementClusterError(e);
+                        handlePreparedStatementClusterError(prepStatement == 
null ? 0 : prepStatement.generation, e);
                         statements.clear();
                     } else {
                         // For an error which we don't know how to handle, we 
will not try next attempts and terminate.
@@ -508,10 +535,9 @@ public class CassandraSessionImpl implements 
CassandraSession {
 
     /** {@inheritDoc} */
     @Override public synchronized void close() throws IOException {
-        if (decrementSessionRefs() == 0 && ses != null) {
-            SessionPool.put(this, ses, expirationTimeout);
-
-            ses = null;
+        if (decrementSessionRefs() == 0 && wrapperSes != null) {
+            SessionPool.put(this, wrapperSes.ses, expirationTimeout);
+            wrapperSes = null;
         }
     }
 
@@ -523,37 +549,44 @@ public class CassandraSessionImpl implements 
CassandraSession {
         SessionPool.get(this);
 
         //closing and reopening session
-        CassandraHelper.closeSession(ses);
-        ses = null;
-        session();
+        if (wrapperSes != null)
+            CassandraHelper.closeSession(wrapperSes.ses);
 
-        synchronized (sesStatements) {
-            sesStatements.clear();
-        }
+        wrapperSes = null;
+
+        session();
     }
 
     /**
-     * @return Cassandra driver session.
+     * Returns Cassandra session and its generation number.
+     *
+     * @return Wrapper object providing Cassandra session and its generation 
number.
      */
-    private synchronized Session session() {
-        if (ses != null)
-            return ses;
+    private synchronized WrappedSession session() {
+        if (wrapperSes != null)
+            return wrapperSes;
 
-        ses = SessionPool.get(this);
+        Session ses = SessionPool.get(this);
 
-        if (ses != null)
-            return ses;
+        if (ses != null) {
+            this.wrapperSes = new WrappedSession(ses, generation);
+            return this.wrapperSes;
+        }
 
         synchronized (sesStatements) {
             sesStatements.clear();
         }
 
         try {
-            return ses = builder.build().connect();
+            ses = builder.build().connect();
+            generation++;
+            this.wrapperSes = new WrappedSession(ses, generation);
         }
         catch (Throwable e) {
             throw new IgniteException("Failed to establish session with 
Cassandra database", e);
         }
+
+        return this.wrapperSes;
     }
 
     /**
@@ -581,8 +614,8 @@ public class CassandraSessionImpl implements 
CassandraSession {
      * @param tblExistenceRequired Flag indicating if table existence is 
required for the statement.
      * @return Prepared statement.
      */
-    private PreparedStatement prepareStatement(String table, String statement, 
KeyValuePersistenceSettings settings,
-        boolean tblExistenceRequired) {
+    private WrappedPreparedStatement prepareStatement(String table, String 
statement, KeyValuePersistenceSettings settings,
+                                                      boolean 
tblExistenceRequired) {
 
         int attempt = 0;
         Throwable error = null;
@@ -594,13 +627,25 @@ public class CassandraSessionImpl implements 
CassandraSession {
 
         try {
             synchronized (sesStatements) {
-                if (sesStatements.containsKey(statement))
-                    return sesStatements.get(statement);
+                WrappedPreparedStatement wrapper = 
sesStatements.get(statement);
+
+                if (wrapper != null) {
+                    // Prepared statement is still actual, cause it was 
created with the current Cassandra session.
+                    if (generation == wrapper.generation)
+                        return wrapper;
+                    // Prepared statement is not actual anymore, cause it was 
created with the previous Cassandra session.
+                    else
+                        sesStatements.remove(statement);
+                }
             }
 
             while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+                WrappedSession ses = null;
+
                 try {
-                    PreparedStatement prepStatement = 
session().prepare(statement);
+                    ses = session();
+
+                    WrappedPreparedStatement prepStatement = 
ses.prepare(statement);
 
                     synchronized (sesStatements) {
                         sesStatements.put(statement, prepStatement);
@@ -616,7 +661,7 @@ public class CassandraSessionImpl implements 
CassandraSession {
                         handleTableAbsenceError(table, settings);
                     }
                     else if (CassandraHelper.isHostsAvailabilityError(e))
-                        handleHostsAvailabilityError(e, attempt, errorMsg);
+                        handleHostsAvailabilityError(ses == null ? 0 : 
ses.generation, e, attempt, errorMsg);
                     else
                         throw new IgniteException(errorMsg, e);
 
@@ -647,13 +692,17 @@ public class CassandraSessionImpl implements 
CassandraSession {
         String errorMsg = "Failed to create Cassandra keyspace '" + 
settings.getKeyspace() + "'";
 
         while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+            WrappedSession ses = null;
+
             try {
+                ses = session();
+
                 
log.info("-----------------------------------------------------------------------");
                 log.info("Creating Cassandra keyspace '" + 
settings.getKeyspace() + "'");
                 
log.info("-----------------------------------------------------------------------\n\n"
 +
                     settings.getKeyspaceDDLStatement() + "\n");
                 
log.info("-----------------------------------------------------------------------");
-                session().execute(settings.getKeyspaceDDLStatement());
+                ses.execute(settings.getKeyspaceDDLStatement());
                 log.info("Cassandra keyspace '" + settings.getKeyspace() + "' 
was successfully created");
                 return;
             }
@@ -665,7 +714,7 @@ public class CassandraSessionImpl implements 
CassandraSession {
                 if (!CassandraHelper.isHostsAvailabilityError(e))
                     throw new IgniteException(errorMsg, e);
 
-                handleHostsAvailabilityError(e, attempt, errorMsg);
+                handleHostsAvailabilityError(ses == null ? 0 : ses.generation, 
e, attempt, errorMsg);
 
                 error = e;
             }
@@ -688,13 +737,17 @@ public class CassandraSessionImpl implements 
CassandraSession {
         String errorMsg = "Failed to create Cassandra table '" + tableFullName 
+ "'";
 
         while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+            WrappedSession ses = null;
+
             try {
+                ses = session();
+
                 
log.info("-----------------------------------------------------------------------");
                 log.info("Creating Cassandra table '" + tableFullName + "'");
                 
log.info("-----------------------------------------------------------------------\n\n"
 +
                         settings.getTableDDLStatement(table) + "\n");
                 
log.info("-----------------------------------------------------------------------");
-                session().execute(settings.getTableDDLStatement(table));
+                ses.execute(settings.getTableDDLStatement(table));
                 log.info("Cassandra table '" + tableFullName + "' was 
successfully created");
                 return;
             }
@@ -712,7 +765,7 @@ public class CassandraSessionImpl implements 
CassandraSession {
                     createKeyspace(settings);
                 }
                 else if (CassandraHelper.isHostsAvailabilityError(e))
-                    handleHostsAvailabilityError(e, attempt, errorMsg);
+                    handleHostsAvailabilityError(ses == null ? 0 : 
ses.generation, e, attempt, errorMsg);
 
                 error = e;
             }
@@ -740,7 +793,11 @@ public class CassandraSessionImpl implements 
CassandraSession {
         String errorMsg = "Failed to create indexes for Cassandra table " + 
tableFullName;
 
         while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+            WrappedSession ses = null;
+
             try {
+                ses = session();
+
                 
log.info("-----------------------------------------------------------------------");
                 log.info("Creating indexes for Cassandra table '" + 
tableFullName + "'");
                 
log.info("-----------------------------------------------------------------------");
@@ -749,7 +806,7 @@ public class CassandraSessionImpl implements 
CassandraSession {
                     try {
                         log.info(statement);
                         
log.info("-----------------------------------------------------------------------");
-                        session().execute(statement);
+                        ses.execute(statement);
                     }
                     catch (AlreadyExistsException ignored) {
                     }
@@ -765,7 +822,7 @@ public class CassandraSessionImpl implements 
CassandraSession {
             }
             catch (Throwable e) {
                 if (CassandraHelper.isHostsAvailabilityError(e))
-                    handleHostsAvailabilityError(e, attempt, errorMsg);
+                    handleHostsAvailabilityError(ses == null ? 0 : 
ses.generation , e, attempt, errorMsg);
                 else if (CassandraHelper.isTableAbsenceError(e))
                     createTable(table, settings);
                 else
@@ -816,12 +873,14 @@ public class CassandraSessionImpl implements 
CassandraSession {
      * @param settings Persistence settings.
      */
     private void handleTableAbsenceError(String table, 
KeyValuePersistenceSettings settings) {
-        int hndNum = tblAbsenceHandlersCnt.incrementAndGet();
-
         String tableFullName = settings.getKeyspace() + "." + table;
 
+        AtomicInteger counter = 
tblAbsenceHandlersCnt.computeIfAbsent(tableFullName, k -> new 
AtomicInteger(-1));
+
+        int hndNum = counter.incrementAndGet();
+
         try {
-            synchronized (tblAbsenceHandlersCnt) {
+            synchronized (counter) {
                 // Oooops... I am not the first thread who tried to handle 
table absence problem.
                 if (hndNum != 0) {
                     log.warning("Table " + tableFullName + " absence problem 
detected. " +
@@ -832,77 +891,58 @@ public class CassandraSessionImpl implements 
CassandraSession {
                 log.warning("Table " + tableFullName + " absence problem 
detected. " +
                         "Trying to create table.");
 
-                IgniteException error = new IgniteException("Failed to create 
Cassandra table " + tableFullName);
-
-                int attempt = 0;
-
-                while (error != null && attempt < 
CQL_EXECUTION_ATTEMPTS_COUNT) {
-                    error = null;
-
-                    try {
-                        createKeyspace(settings);
-                        createTable(table, settings);
-                        createTableIndexes(table, settings);
-                    }
-                    catch (Throwable e) {
-                        if (CassandraHelper.isHostsAvailabilityError(e))
-                            handleHostsAvailabilityError(e, attempt, null);
-                        else
-                            throw new IgniteException("Failed to create 
Cassandra table " + tableFullName, e);
-
-                        error = (e instanceof IgniteException) ? 
(IgniteException)e : new IgniteException(e);
-                    }
-
-                    attempt++;
-                }
-
-                if (error != null)
-                    throw error;
+                createKeyspace(settings);
+                createTable(table, settings);
+                createTableIndexes(table, settings);
             }
         }
         finally {
             if (hndNum == 0)
-                tblAbsenceHandlersCnt.set(-1);
+                counter.set(-1);
         }
     }
 
     /**
      * Handles situation when prepared statement execution failed cause 
session to the cluster was released.
      *
+     * @param sesGeneration Generation of Cassandra session used to create 
prepared statement.
+     * @param e Exception thrown during statement execution.
      */
-    private void handlePreparedStatementClusterError(Throwable e) {
-        int hndNum = prepStatementHandlersCnt.incrementAndGet();
+    private void handlePreparedStatementClusterError(long sesGeneration, 
Throwable e) {
+        if (sesGeneration < generation) {
+            log.warning("Prepared statement cluster error detected, another 
thread already fixed the problem", e);
+            return;
+        }
+
+        refreshLock.lock();
 
         try {
-            synchronized (prepStatementHandlersCnt) {
-                // Oooops... I am not the first thread who tried to handle 
prepared statement problem.
-                if (hndNum != 0) {
-                    log.warning("Prepared statement cluster error detected, 
another thread already fixed the problem", e);
-                    return;
-                }
+            if (sesGeneration < generation) {
+                log.warning("Prepared statement cluster error detected, 
another thread already fixed the problem", e);
+                return;
+            }
 
-                log.warning("Prepared statement cluster error detected, 
refreshing Cassandra session", e);
+            log.warning("Prepared statement cluster error detected, refreshing 
Cassandra session", e);
 
-                refresh();
+            refresh();
 
-                log.warning("Cassandra session refreshed");
-            }
+            log.warning("Cassandra session refreshed");
         }
         finally {
-            if (hndNum == 0)
-                prepStatementHandlersCnt.set(-1);
+            refreshLock.unlock();
         }
     }
 
     /**
      * Handles situation when Cassandra host which is responsible for CQL 
query execution became unavailable.
      *
+     * @param sesGeneration Generation of Cassandra session used to run CQL 
statement.
      * @param e Exception to handle.
      * @param attempt Number of attempts.
      * @param msg Error message.
      * @return {@code true} if host unavailability was successfully handled.
      */
-    private boolean handleHostsAvailabilityError(Throwable e, int attempt, 
String msg) {
+    private boolean handleHostsAvailabilityError(long sesGeneration, Throwable 
e, int attempt, String msg) {
         if (attempt >= CQL_EXECUTION_ATTEMPTS_COUNT) {
             log.error("Host availability problem detected. " +
                     "Number of CQL execution attempts reached maximum " + 
CQL_EXECUTION_ATTEMPTS_COUNT +
@@ -914,14 +954,26 @@ public class CassandraSessionImpl implements 
CassandraSession {
             attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2  ||
             attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 + 
CQL_EXECUTION_ATTEMPTS_COUNT / 4  ||
             attempt == CQL_EXECUTION_ATTEMPTS_COUNT - 1) {
-            log.warning("Host availability problem detected, CQL execution 
attempt  " + (attempt + 1) + ", " +
-                    "refreshing Cassandra session", e);
 
-            refresh();
+            refreshLock.lock();
 
-            log.warning("Cassandra session refreshed");
+            try {
+                if (sesGeneration < generation)
+                    log.warning("Host availability problem detected, but 
already handled by another thread");
+                else {
+                    log.warning("Host availability problem detected, CQL 
execution attempt  " + (attempt + 1) + ", " +
+                            "refreshing Cassandra session", e);
+
+                    refresh();
 
-            return true;
+                    log.warning("Cassandra session refreshed");
+
+                    return true;
+                }
+            }
+            finally {
+                refreshLock.unlock();
+            }
         }
 
         log.warning("Host availability problem detected, CQL execution attempt 
" + (attempt + 1) + ", " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedPreparedStatement.java
----------------------------------------------------------------------
diff --git 
a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedPreparedStatement.java
 
b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedPreparedStatement.java
new file mode 100644
index 0000000..be86516
--- /dev/null
+++ 
b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedPreparedStatement.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.CodecRegistry;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedId;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.policies.RetryPolicy;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+/**
+ * Simple wrapper providing access to Cassandra prepared statement and 
generation of Cassandra
+ * session which was used to create this statement
+ */
+public class WrappedPreparedStatement implements PreparedStatement {
+    /** Prepared statement. **/
+    private final PreparedStatement st;
+
+    /** Generation of Cassandra session which was used to prepare this 
statement. **/
+    final long generation;
+
+    /**
+     * Constructor.
+     *
+     * @param st Prepared statement.
+     * @param generation Generation of Cassandra session used to prepare this 
statement.
+     */
+    WrappedPreparedStatement(PreparedStatement st, long generation) {
+        this.st = st;
+        this.generation = generation;
+    }
+
+    /**
+     * Getter for wrapped statement.
+     *
+     * @return Wrapped original statement.
+     */
+    public PreparedStatement getWrappedStatement() {
+        return st;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ColumnDefinitions getVariables() {
+        return st.getVariables();
+    }
+
+    /** {@inheritDoc} */
+    @Override public BoundStatement bind(Object... values) {
+        return st.bind(values);
+    }
+
+    /** {@inheritDoc} */
+    @Override public BoundStatement bind() {
+        return st.bind();
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement setRoutingKey(ByteBuffer routingKey) {
+        return st.setRoutingKey(routingKey);
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement setRoutingKey(ByteBuffer... 
routingKeyComponents) {
+        return st.setRoutingKey(routingKeyComponents);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer getRoutingKey() {
+        return st.getRoutingKey();
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement setConsistencyLevel(ConsistencyLevel 
consistency) {
+        return st.setConsistencyLevel(consistency);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ConsistencyLevel getConsistencyLevel() {
+        return st.getConsistencyLevel();
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement 
setSerialConsistencyLevel(ConsistencyLevel serialConsistency) {
+        return st.setSerialConsistencyLevel(serialConsistency);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ConsistencyLevel getSerialConsistencyLevel() {
+        return st.getSerialConsistencyLevel();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getQueryString() {
+        return st.getQueryString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getQueryKeyspace() {
+        return st.getQueryKeyspace();
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement enableTracing() {
+        return st.enableTracing();
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement disableTracing() {
+        return st.disableTracing();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isTracing() {
+        return st.isTracing();
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement setRetryPolicy(RetryPolicy policy) {
+        return st.setRetryPolicy(policy);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RetryPolicy getRetryPolicy() {
+        return st.getRetryPolicy();
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedId getPreparedId() {
+        return st.getPreparedId();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, ByteBuffer> getIncomingPayload() {
+        return st.getIncomingPayload();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, ByteBuffer> getOutgoingPayload() {
+        return st.getOutgoingPayload();
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement setOutgoingPayload(Map<String, 
ByteBuffer> payload) {
+        return st.setOutgoingPayload(payload);
+    }
+
+    /** {@inheritDoc} */
+    @Override public CodecRegistry getCodecRegistry() {
+        return st.getCodecRegistry();
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement setIdempotent(Boolean idempotent) {
+        return st.setIdempotent(idempotent);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean isIdempotent() {
+        return st.isIdempotent();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedSession.java
----------------------------------------------------------------------
diff --git 
a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedSession.java
 
b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedSession.java
new file mode 100644
index 0000000..d9b7224
--- /dev/null
+++ 
b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedSession.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+
+/**
+ * Simple container for Cassandra session and its generation number.
+ */
+public class WrappedSession {
+    /** Cassandra driver session. **/
+    final Session ses;
+
+    /** Cassandra session generation number. **/
+    final long generation;
+
+    /**
+     * Constructor.
+     *
+     * @param ses Cassandra session.
+     * @param generation Cassandra session generation number.
+     */
+    WrappedSession(Session ses, long generation) {
+        this.ses = ses;
+        this.generation = generation;
+    }
+
+    /**
+     * Prepares the provided query string.
+     *
+     * @param query the CQL query string to prepare
+     * @return the prepared statement corresponding to {@code query}.
+     * @throws NoHostAvailableException if no host in the cluster can be
+     *                                  contacted successfully to prepare this 
query.
+     */
+    WrappedPreparedStatement prepare(String query) {
+        return new WrappedPreparedStatement(ses.prepare(query), generation);
+    }
+
+    /**
+     * Executes the provided query.
+     *
+     * @param statement The CQL query to execute (that can be any {@link 
Statement}).
+     *
+     * @return The result of the query. That result will never be null but can
+     */
+    ResultSet execute(Statement statement) {
+        return ses.execute(statement);
+    }
+
+    /**
+     * Executes the provided query.
+     *
+     * @param query The CQL query to execute (that can be any {@link 
Statement}).
+     *
+     * @return The result of the query. That result will never be null but can
+     */
+    ResultSet execute(String query) {
+        return ses.execute(query);
+    }
+
+    /**
+     * Executes the provided query asynchronously.
+     *
+     * @param statement the CQL query to execute (that can be any {@code 
Statement}).
+     *
+     * @return a future on the result of the query.
+     */
+    ResultSetFuture executeAsync(Statement statement) {
+        return ses.executeAsync(statement);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/IdleSession.java
----------------------------------------------------------------------
diff --git 
a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/IdleSession.java
 
b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/IdleSession.java
new file mode 100644
index 0000000..57eb5d1
--- /dev/null
+++ 
b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/IdleSession.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.pool;
+
+import com.datastax.driver.core.Session;
+import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
+
+/**
+ * Simple wrapper for idle Cassandra session returned to pool, responsible for 
monitoring session expiration and its closing.
+ */
+public class IdleSession {
+    /** Cassandra driver session. */
+    private Session ses;
+
+    /** Expiration timeout. */
+    private long expirationTimeout;
+
+    /** Wrapper creation time.  */
+    private long time;
+
+    /**
+     * Creates instance of Cassandra driver session wrapper.
+     *
+     * @param ses Cassandra driver session.
+     */
+    public IdleSession(Session ses, long expirationTimeout) {
+        this.ses = ses;
+        this.expirationTimeout = expirationTimeout;
+        this.time = System.currentTimeMillis();
+    }
+
+    /**
+     * Checks if Cassandra driver session expired.
+     *
+     * @return true if session expired.
+     */
+    public boolean expired() {
+        return expirationTimeout > 0 && System.currentTimeMillis() - time > 
expirationTimeout;
+    }
+
+    /**
+     * Returns wrapped Cassandra driver session.
+     *
+     * @return Cassandra driver session.
+     */
+    public Session driverSession() {
+        return ses;
+    }
+
+    /**
+     * Closes wrapped Cassandra driver session
+     */
+    public void release() {
+        CassandraHelper.closeSession(ses);
+        ses = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
----------------------------------------------------------------------
diff --git 
a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
 
b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
index 4de8516..cf72f8a 100644
--- 
a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
+++ 
b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
@@ -45,23 +45,23 @@ public class SessionPool {
                         return;
                     }
 
-                    List<Map.Entry<CassandraSessionImpl, SessionWrapper>> 
expiredSessions = new LinkedList<>();
+                    List<Map.Entry<CassandraSessionImpl, IdleSession>> 
expiredSessions = new LinkedList<>();
 
                     int sessionsCnt;
 
                     synchronized (sessions) {
                         sessionsCnt = sessions.size();
 
-                        for (Map.Entry<CassandraSessionImpl, SessionWrapper> 
entry : sessions.entrySet()) {
+                        for (Map.Entry<CassandraSessionImpl, IdleSession> 
entry : sessions.entrySet()) {
                             if (entry.getValue().expired())
                                 expiredSessions.add(entry);
                         }
 
-                        for (Map.Entry<CassandraSessionImpl, SessionWrapper> 
entry : expiredSessions)
+                        for (Map.Entry<CassandraSessionImpl, IdleSession> 
entry : expiredSessions)
                             sessions.remove(entry.getKey());
                     }
 
-                    for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry 
: expiredSessions)
+                    for (Map.Entry<CassandraSessionImpl, IdleSession> entry : 
expiredSessions)
                         entry.getValue().release();
 
                     // all sessions in the pool expired, thus we don't need 
additional thread to manage sessions in the pool
@@ -79,7 +79,7 @@ public class SessionPool {
     private static final long SLEEP_TIMEOUT = 60000; // 1 minute.
 
     /** Sessions which were returned to pool. */
-    private static final Map<CassandraSessionImpl, SessionWrapper> sessions = 
new HashMap<>();
+    private static final Map<CassandraSessionImpl, IdleSession> sessions = new 
HashMap<>();
 
     /** Singleton instance. */
     private static SessionMonitor monitorSingleton;
@@ -102,10 +102,10 @@ public class SessionPool {
         if (cassandraSes == null || driverSes == null)
             return;
 
-        SessionWrapper old;
+        IdleSession old;
 
         synchronized (sessions) {
-            old = sessions.put(cassandraSes, new SessionWrapper(driverSes, 
expirationTimeout));
+            old = sessions.put(cassandraSes, new IdleSession(driverSes, 
expirationTimeout));
 
             if (monitorSingleton == null || 
State.TERMINATED.equals(monitorSingleton.getState())) {
                 monitorSingleton = new SessionMonitor();
@@ -129,7 +129,7 @@ public class SessionPool {
         if (cassandraSes == null)
             return null;
 
-        SessionWrapper wrapper;
+        IdleSession wrapper;
 
         synchronized (sessions) {
             wrapper = sessions.remove(cassandraSes);
@@ -142,7 +142,7 @@ public class SessionPool {
      * Releases all session from pool and closes all their connections to 
Cassandra database.
      */
     public static void release() {
-        Collection<SessionWrapper> wrappers;
+        Collection<IdleSession> wrappers;
 
         synchronized (sessions) {
             try {
@@ -151,7 +151,7 @@ public class SessionPool {
 
                 wrappers = new LinkedList<>();
 
-                for (SessionWrapper wrapper : sessions.values())
+                for (IdleSession wrapper : sessions.values())
                     wrappers.add(wrapper);
 
                 sessions.clear();
@@ -167,7 +167,7 @@ public class SessionPool {
             }
         }
 
-        for (SessionWrapper wrapper : wrappers)
+        for (IdleSession wrapper : wrappers)
             wrapper.release();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
 
b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
deleted file mode 100644
index 68b9dd4..0000000
--- 
a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.session.pool;
-
-import com.datastax.driver.core.Session;
-import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
-
-/**
- * Wrapper for Cassandra driver session, responsible for monitoring session 
expiration and its closing.
- */
-public class SessionWrapper {
-    /** Cassandra driver session. */
-    private Session ses;
-
-    /** Expiration timeout. */
-    private long expirationTimeout;
-
-    /** Wrapper creation time.  */
-    private long time;
-
-    /**
-     * Creates instance of Cassandra driver session wrapper.
-     *
-     * @param ses Cassandra driver session.
-     */
-    public SessionWrapper(Session ses, long expirationTimeout) {
-        this.ses = ses;
-        this.expirationTimeout = expirationTimeout;
-
-        time = System.currentTimeMillis();
-    }
-
-    /**
-     * Checks if Cassandra driver session expired.
-     *
-     * @return true if session expired.
-     */
-    public boolean expired() {
-        return expirationTimeout > 0 && System.currentTimeMillis() - time > 
expirationTimeout;
-    }
-
-    /**
-     * Returns wrapped Cassandra driver session.
-     *
-     * @return Cassandra driver session.
-     */
-    public Session driverSession() {
-        return ses;
-    }
-
-    /**
-     * Closes wrapped Cassandra driver session
-     */
-    public void release() {
-        CassandraHelper.closeSession(ses);
-
-        ses = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraConfigTest.java
----------------------------------------------------------------------
diff --git 
a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraConfigTest.java
 
b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraConfigTest.java
new file mode 100644
index 0000000..98d7ef1
--- /dev/null
+++ 
b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraConfigTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests;
+
+import junit.framework.TestCase;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import 
org.apache.ignite.cache.store.cassandra.persistence.KeyPersistenceSettings;
+import 
org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+
+/**
+ * Simple test for DDL generator.
+ */
+public class CassandraConfigTest extends TestCase {
+    /**
+     * Check if same DDL generated for similar keys and same 
KeyPersistenceConfiguration.
+     *
+     * @throws Exception
+     */
+    public void testDDLGeneration() throws Exception {
+        KeyPersistenceSettings keyPersistenceSettingsA = 
getKeyPersistenceSettings(KeyA.class);
+        KeyPersistenceSettings keyPersistenceSettingsB = 
getKeyPersistenceSettings(KeyB.class);
+
+        assertEquals(keyPersistenceSettingsB.getPrimaryKeyDDL(),
+            keyPersistenceSettingsA.getPrimaryKeyDDL());
+
+        assertEquals(keyPersistenceSettingsB.getClusteringDDL(),
+            keyPersistenceSettingsA.getClusteringDDL());
+    }
+
+    /**
+     * @return KeyPersistenceSetting
+     */
+    private KeyPersistenceSettings getKeyPersistenceSettings(Class keyClass) {
+        String cfg = "<persistence keyspace=\"public\">" +
+            " <keyPersistence class=\"" + keyClass.getName() + "\"  
strategy=\"POJO\"> \n" +
+            "        <partitionKey>\n" +
+            "            <field name=\"name\" column=\"name\"/>\n" +
+            "            <field name=\"contextId\" column=\"context_id\"/>\n" +
+            "            <field name=\"creationDate\" 
column=\"creation_date\"/>\n" +
+            "        </partitionKey>\n" +
+            "        <clusterKey>\n" +
+            "            <field name=\"timestamp\" column=\"timestamp\"/>\n" +
+            "        </clusterKey>\n" +
+            "    </keyPersistence>" +
+            " <valuePersistence class=\"java.lang.Object\"  
strategy=\"BLOB\">" +
+            " </valuePersistence>" +
+            "</persistence>";
+
+        return new 
KeyValuePersistenceSettings(cfg).getKeyPersistenceSettings();
+    }
+
+    /**
+     *
+     */
+    public static class BaseKey {
+        /** */
+        @QuerySqlField
+        // Looks like next annotation is ignored when generating DDL,
+        // but Ignite supports this annotation in parent classes.
+//        @AffinityKeyMapped
+        private Integer contextId;
+
+        /** */
+        public Integer getContextId() {
+            return contextId;
+        }
+
+        /** */
+        public void setContextId(Integer contextId) {
+            this.contextId = contextId;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class KeyA extends BaseKey {
+        /** */
+        @QuerySqlField(index = true)
+        private String timestamp;
+
+        /** */
+        @QuerySqlField(index = true)
+        private String name;
+
+        /** */
+        @QuerySqlField
+        private String creationDate;
+
+        /**
+         * Constructor.
+         */
+        public KeyA() {
+        }
+    }
+
+    /**
+     *
+     */
+    public static class KeyB {
+
+        /** */
+        @QuerySqlField(index = true)
+        private String timestamp;
+
+        /** */
+        @QuerySqlField(index = true)
+        private String name;
+
+        /** */
+        @QuerySqlField
+        private String creationDate;
+
+        /** */
+        @QuerySqlField
+//        @AffinityKeyMapped
+        private Integer contextId;
+
+        /**
+         * Constructor.
+         */
+        public KeyB() {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraSessionImplTest.java
----------------------------------------------------------------------
diff --git 
a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraSessionImplTest.java
 
b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraSessionImplTest.java
index 27fd741..ca54794 100644
--- 
a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraSessionImplTest.java
+++ 
b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraSessionImplTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.IgniteLogger;
 import 
org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
 import org.apache.ignite.cache.store.cassandra.session.BatchExecutionAssistant;
 import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
+import 
org.apache.ignite.cache.store.cassandra.session.WrappedPreparedStatement;
 import org.junit.Test;
 
 import com.datastax.driver.core.BoundStatement;
@@ -151,11 +152,16 @@ public class CassandraSessionImplTest {
 
         @Override
         public BoundStatement bindStatement(PreparedStatement statement, 
Object obj) {
+            if (statement instanceof WrappedPreparedStatement)
+                statement = 
((WrappedPreparedStatement)statement).getWrappedStatement();
+
             if (statement == preparedStatement1) {
                 return boundStatement1;
-            } else if (statement == preparedStatement2) {
+            }
+            else if (statement == preparedStatement2) {
                 return boundStatement2;
             }
+
             throw new RuntimeException("unexpected");
         }
 

Reply via email to