Repository: ignite Updated Branches: refs/heads/master d9c482137 -> 5f568790a
minor refactoring - added javadoc - Fixes #3940. Signed-off-by: Igor Rudyak <igor.rud...@rallyhealth.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f568790 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f568790 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f568790 Branch: refs/heads/master Commit: 5f568790a5cee1d908b845ceef57c02305041c16 Parents: d9c4821 Author: Igor Rudyak <igor.rud...@rallyhealth.com> Authored: Tue May 15 10:24:29 2018 -0700 Committer: Igor Rudyak <igor.rud...@rallyhealth.com> Committed: Tue May 15 10:24:29 2018 -0700 ---------------------------------------------------------------------- .../cassandra/session/CassandraSessionImpl.java | 284 +++++++++++-------- .../session/WrappedPreparedStatement.java | 181 ++++++++++++ .../store/cassandra/session/WrappedSession.java | 91 ++++++ .../cassandra/session/pool/IdleSession.java | 72 +++++ .../cassandra/session/pool/SessionPool.java | 22 +- .../cassandra/session/pool/SessionWrapper.java | 74 ----- .../ignite/tests/CassandraConfigTest.java | 141 +++++++++ .../ignite/tests/CassandraSessionImplTest.java | 8 +- 8 files changed, 671 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java index 4fb0cb2..6b14f41 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java @@ -22,7 +22,9 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import javax.cache.Cache; import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.BoundStatement; @@ -36,6 +38,7 @@ import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; import com.datastax.driver.core.exceptions.AlreadyExistsException; import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.core.querybuilder.Batch; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -65,14 +68,21 @@ public class CassandraSessionImpl implements CassandraSession { /** Cassandra cluster builder. */ private volatile Cluster.Builder builder; - /** Cassandra driver session. */ - private volatile Session ses; + /** + * Current generation number of Cassandra session. Each time session recreated its generation will be incremented. + * The main idea behind session generation is to track prepared statements created with old Cassandra + * session (which is not valid anymore) and avoid extra refresh of Cassandra session by multiple threads. + **/ + private volatile Long generation = 0L; + + /** Wrapped Cassandra session. **/ + private volatile WrappedSession wrapperSes; /** Number of references to Cassandra driver session (for multithreaded environment). */ private volatile int refCnt; /** Storage for the session prepared statements */ - private static final Map<String, PreparedStatement> sesStatements = new HashMap<>(); + private static final Map<String, WrappedPreparedStatement> sesStatements = new HashMap<>(); /** Number of records to immediately fetch in CQL statement execution. */ private Integer fetchSize; @@ -90,10 +100,10 @@ public class CassandraSessionImpl implements CassandraSession { private IgniteLogger log; /** Table absence error handlers counter. */ - private final AtomicInteger tblAbsenceHandlersCnt = new AtomicInteger(-1); + private final Map<String, AtomicInteger> tblAbsenceHandlersCnt = new ConcurrentHashMap<>(); - /** Prepared statement cluster disconnection error handlers counter. */ - private final AtomicInteger prepStatementHandlersCnt = new AtomicInteger(-1); + /** Lock used to synchronize multiple threads trying to do session refresh. **/ + private final ReentrantLock refreshLock = new ReentrantLock(); /** * Creates instance of Cassandra driver session wrapper. @@ -126,22 +136,26 @@ public class CassandraSessionImpl implements CassandraSession { try { while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { - error = null; - if (attempt != 0) { log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra CQL statement: " + assistant.getStatement()); } + WrappedPreparedStatement preparedSt = null; + WrappedSession ses = null; + try { - PreparedStatement preparedSt = prepareStatement(assistant.getTable(), assistant.getStatement(), + preparedSt = prepareStatement(assistant.getTable(), assistant.getStatement(), assistant.getPersistenceSettings(), assistant.tableExistenceRequired()); if (preparedSt == null) return null; Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt)); - ResultSet res = session().execute(statement); + + ses = session(); + + ResultSet res = ses.execute(statement); Row row = res == null || !res.iterator().hasNext() ? null : res.iterator().next(); @@ -159,9 +173,9 @@ public class CassandraSessionImpl implements CassandraSession { handleTableAbsenceError(assistant.getTable(), assistant.getPersistenceSettings()); } else if (CassandraHelper.isHostsAvailabilityError(e)) - handleHostsAvailabilityError(e, attempt, errorMsg); + handleHostsAvailabilityError(ses == null ? -1 : ses.generation, e, attempt, errorMsg); else if (CassandraHelper.isPreparedStatementClusterError(e)) - handlePreparedStatementClusterError(e); + handlePreparedStatementClusterError(preparedSt == null ? -1 : preparedSt.generation , e); else // For an error which we don't know how to handle, we will not try next attempts and terminate. throw new IgniteException(errorMsg, e); @@ -216,19 +230,22 @@ public class CassandraSessionImpl implements CassandraSession { List<Cache.Entry<Integer, ResultSetFuture>> futResults = new LinkedList<>(); - PreparedStatement preparedSt = prepareStatement(assistant.getTable(), assistant.getStatement(), + WrappedPreparedStatement preparedSt = prepareStatement(assistant.getTable(), assistant.getStatement(), assistant.getPersistenceSettings(), assistant.tableExistenceRequired()); if (preparedSt == null) return null; + WrappedSession ses = null; + int seqNum = 0; for (V obj : data) { if (!assistant.alreadyProcessed(seqNum)) { try { + ses = session(); Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt, obj)); - ResultSetFuture fut = session().executeAsync(statement); + ResultSetFuture fut = ses.executeAsync(statement); futResults.add(new CacheEntryImpl<>(seqNum, fut)); } catch (Throwable e) { @@ -245,13 +262,18 @@ public class CassandraSessionImpl implements CassandraSession { // Handle host availability only once. if (hostsAvailEx == null) - handleHostsAvailabilityError(e, attempt, errorMsg); + handleHostsAvailabilityError(ses == null ? 0 : ses.generation, e, attempt, errorMsg); } else if (CassandraHelper.isPreparedStatementClusterError(e)) { prepStatEx = e; - handlePreparedStatementClusterError(e); + + handlePreparedStatementClusterError(preparedSt.generation, e); + preparedSt = prepareStatement(assistant.getTable(), assistant.getStatement(), assistant.getPersistenceSettings(), assistant.tableExistenceRequired()); + + if (preparedSt == null) + return null; } else unknownEx = e; @@ -319,12 +341,12 @@ public class CassandraSessionImpl implements CassandraSession { if (hostsAvailEx != null) { error = hostsAvailEx; - handleHostsAvailabilityError(hostsAvailEx, attempt, errorMsg); + handleHostsAvailabilityError(ses.generation, hostsAvailEx, attempt, errorMsg); } if (prepStatEx != null) { error = prepStatEx; - handlePreparedStatementClusterError(prepStatEx); + handlePreparedStatementClusterError(preparedSt.generation, prepStatEx); } if (!CassandraHelper.isTableAbsenceError(error)) @@ -366,8 +388,12 @@ public class CassandraSessionImpl implements CassandraSession { Statement statement = tuneStatementExecutionOptions(assistant.getStatement()); + WrappedSession ses = null; + try { - ResultSetFuture fut = session().executeAsync(statement); + ses = session(); + + ResultSetFuture fut = ses.executeAsync(statement); ResultSet resSet = fut.getUninterruptibly(); if (resSet == null || !resSet.iterator().hasNext()) @@ -384,9 +410,7 @@ public class CassandraSessionImpl implements CassandraSession { if (CassandraHelper.isTableAbsenceError(e)) return; else if (CassandraHelper.isHostsAvailabilityError(e)) - handleHostsAvailabilityError(e, attempt, errorMsg); - else if (CassandraHelper.isPreparedStatementClusterError(e)) - handlePreparedStatementClusterError(e); + handleHostsAvailabilityError(ses == null ? 0 : ses.generation, e, attempt, errorMsg); else // For an error which we don't know how to handle, we will not try next attempts and terminate. throw new IgniteException(errorMsg, e); @@ -420,7 +444,7 @@ public class CassandraSessionImpl implements CassandraSession { int attempt = 0; boolean tableExistenceRequired = false; - Map<String, PreparedStatement> statements = new HashMap<>(); + Map<String, WrappedPreparedStatement> statements = new HashMap<>(); Map<String, KeyValuePersistenceSettings> tableSettings = new HashMap<>(); RandomSleeper sleeper = newSleeper(); @@ -428,31 +452,32 @@ public class CassandraSessionImpl implements CassandraSession { try { while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { - error = null; - if (attempt != 0) { log.warning("Trying " + (attempt + 1) + " attempt to apply " + mutations.size() + " mutations " + "performed withing Ignite transaction into Cassandra"); } + WrappedPreparedStatement prepStatement = null; + WrappedSession ses = null; + try { BatchStatement batch = new BatchStatement(); // accumulating all the mutations into one Cassandra logged batch for (Mutation mutation : mutations) { String key = mutation.getTable() + mutation.getClass().getName(); - PreparedStatement st = statements.get(key); + prepStatement = statements.get(key); - if (st == null) { - st = prepareStatement(mutation.getTable(), mutation.getStatement(), + if (prepStatement == null) { + prepStatement = prepareStatement(mutation.getTable(), mutation.getStatement(), mutation.getPersistenceSettings(), mutation.tableExistenceRequired()); - if (st != null) - statements.put(key, st); + if (prepStatement != null) + statements.put(key, prepStatement); } - if (st != null) - batch.add(mutation.bindStatement(st)); + if (prepStatement != null) + batch.add(mutation.bindStatement(prepStatement)); if (attempt == 0) { if (mutation.tableExistenceRequired()) { @@ -465,8 +490,10 @@ public class CassandraSessionImpl implements CassandraSession { } // committing logged batch into Cassandra - if (batch.size() > 0) - session().execute(tuneStatementExecutionOptions(batch)); + if (batch.size() > 0) { + ses = session(); + ses.execute(tuneStatementExecutionOptions(batch)); + } return; } catch (Throwable e) { @@ -480,10 +507,10 @@ public class CassandraSessionImpl implements CassandraSession { else return; } else if (CassandraHelper.isHostsAvailabilityError(e)) { - if (handleHostsAvailabilityError(e, attempt, errorMsg)) + if (handleHostsAvailabilityError(ses == null ? 0 : ses.generation, e, attempt, errorMsg)) statements.clear(); } else if (CassandraHelper.isPreparedStatementClusterError(e)) { - handlePreparedStatementClusterError(e); + handlePreparedStatementClusterError(prepStatement == null ? 0 : prepStatement.generation, e); statements.clear(); } else { // For an error which we don't know how to handle, we will not try next attempts and terminate. @@ -508,10 +535,9 @@ public class CassandraSessionImpl implements CassandraSession { /** {@inheritDoc} */ @Override public synchronized void close() throws IOException { - if (decrementSessionRefs() == 0 && ses != null) { - SessionPool.put(this, ses, expirationTimeout); - - ses = null; + if (decrementSessionRefs() == 0 && wrapperSes != null) { + SessionPool.put(this, wrapperSes.ses, expirationTimeout); + wrapperSes = null; } } @@ -523,37 +549,44 @@ public class CassandraSessionImpl implements CassandraSession { SessionPool.get(this); //closing and reopening session - CassandraHelper.closeSession(ses); - ses = null; - session(); + if (wrapperSes != null) + CassandraHelper.closeSession(wrapperSes.ses); - synchronized (sesStatements) { - sesStatements.clear(); - } + wrapperSes = null; + + session(); } /** - * @return Cassandra driver session. + * Returns Cassandra session and its generation number. + * + * @return Wrapper object providing Cassandra session and its generation number. */ - private synchronized Session session() { - if (ses != null) - return ses; + private synchronized WrappedSession session() { + if (wrapperSes != null) + return wrapperSes; - ses = SessionPool.get(this); + Session ses = SessionPool.get(this); - if (ses != null) - return ses; + if (ses != null) { + this.wrapperSes = new WrappedSession(ses, generation); + return this.wrapperSes; + } synchronized (sesStatements) { sesStatements.clear(); } try { - return ses = builder.build().connect(); + ses = builder.build().connect(); + generation++; + this.wrapperSes = new WrappedSession(ses, generation); } catch (Throwable e) { throw new IgniteException("Failed to establish session with Cassandra database", e); } + + return this.wrapperSes; } /** @@ -581,8 +614,8 @@ public class CassandraSessionImpl implements CassandraSession { * @param tblExistenceRequired Flag indicating if table existence is required for the statement. * @return Prepared statement. */ - private PreparedStatement prepareStatement(String table, String statement, KeyValuePersistenceSettings settings, - boolean tblExistenceRequired) { + private WrappedPreparedStatement prepareStatement(String table, String statement, KeyValuePersistenceSettings settings, + boolean tblExistenceRequired) { int attempt = 0; Throwable error = null; @@ -594,13 +627,25 @@ public class CassandraSessionImpl implements CassandraSession { try { synchronized (sesStatements) { - if (sesStatements.containsKey(statement)) - return sesStatements.get(statement); + WrappedPreparedStatement wrapper = sesStatements.get(statement); + + if (wrapper != null) { + // Prepared statement is still actual, cause it was created with the current Cassandra session. + if (generation == wrapper.generation) + return wrapper; + // Prepared statement is not actual anymore, cause it was created with the previous Cassandra session. + else + sesStatements.remove(statement); + } } while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { + WrappedSession ses = null; + try { - PreparedStatement prepStatement = session().prepare(statement); + ses = session(); + + WrappedPreparedStatement prepStatement = ses.prepare(statement); synchronized (sesStatements) { sesStatements.put(statement, prepStatement); @@ -616,7 +661,7 @@ public class CassandraSessionImpl implements CassandraSession { handleTableAbsenceError(table, settings); } else if (CassandraHelper.isHostsAvailabilityError(e)) - handleHostsAvailabilityError(e, attempt, errorMsg); + handleHostsAvailabilityError(ses == null ? 0 : ses.generation, e, attempt, errorMsg); else throw new IgniteException(errorMsg, e); @@ -647,13 +692,17 @@ public class CassandraSessionImpl implements CassandraSession { String errorMsg = "Failed to create Cassandra keyspace '" + settings.getKeyspace() + "'"; while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { + WrappedSession ses = null; + try { + ses = session(); + log.info("-----------------------------------------------------------------------"); log.info("Creating Cassandra keyspace '" + settings.getKeyspace() + "'"); log.info("-----------------------------------------------------------------------\n\n" + settings.getKeyspaceDDLStatement() + "\n"); log.info("-----------------------------------------------------------------------"); - session().execute(settings.getKeyspaceDDLStatement()); + ses.execute(settings.getKeyspaceDDLStatement()); log.info("Cassandra keyspace '" + settings.getKeyspace() + "' was successfully created"); return; } @@ -665,7 +714,7 @@ public class CassandraSessionImpl implements CassandraSession { if (!CassandraHelper.isHostsAvailabilityError(e)) throw new IgniteException(errorMsg, e); - handleHostsAvailabilityError(e, attempt, errorMsg); + handleHostsAvailabilityError(ses == null ? 0 : ses.generation, e, attempt, errorMsg); error = e; } @@ -688,13 +737,17 @@ public class CassandraSessionImpl implements CassandraSession { String errorMsg = "Failed to create Cassandra table '" + tableFullName + "'"; while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { + WrappedSession ses = null; + try { + ses = session(); + log.info("-----------------------------------------------------------------------"); log.info("Creating Cassandra table '" + tableFullName + "'"); log.info("-----------------------------------------------------------------------\n\n" + settings.getTableDDLStatement(table) + "\n"); log.info("-----------------------------------------------------------------------"); - session().execute(settings.getTableDDLStatement(table)); + ses.execute(settings.getTableDDLStatement(table)); log.info("Cassandra table '" + tableFullName + "' was successfully created"); return; } @@ -712,7 +765,7 @@ public class CassandraSessionImpl implements CassandraSession { createKeyspace(settings); } else if (CassandraHelper.isHostsAvailabilityError(e)) - handleHostsAvailabilityError(e, attempt, errorMsg); + handleHostsAvailabilityError(ses == null ? 0 : ses.generation, e, attempt, errorMsg); error = e; } @@ -740,7 +793,11 @@ public class CassandraSessionImpl implements CassandraSession { String errorMsg = "Failed to create indexes for Cassandra table " + tableFullName; while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { + WrappedSession ses = null; + try { + ses = session(); + log.info("-----------------------------------------------------------------------"); log.info("Creating indexes for Cassandra table '" + tableFullName + "'"); log.info("-----------------------------------------------------------------------"); @@ -749,7 +806,7 @@ public class CassandraSessionImpl implements CassandraSession { try { log.info(statement); log.info("-----------------------------------------------------------------------"); - session().execute(statement); + ses.execute(statement); } catch (AlreadyExistsException ignored) { } @@ -765,7 +822,7 @@ public class CassandraSessionImpl implements CassandraSession { } catch (Throwable e) { if (CassandraHelper.isHostsAvailabilityError(e)) - handleHostsAvailabilityError(e, attempt, errorMsg); + handleHostsAvailabilityError(ses == null ? 0 : ses.generation , e, attempt, errorMsg); else if (CassandraHelper.isTableAbsenceError(e)) createTable(table, settings); else @@ -816,12 +873,14 @@ public class CassandraSessionImpl implements CassandraSession { * @param settings Persistence settings. */ private void handleTableAbsenceError(String table, KeyValuePersistenceSettings settings) { - int hndNum = tblAbsenceHandlersCnt.incrementAndGet(); - String tableFullName = settings.getKeyspace() + "." + table; + AtomicInteger counter = tblAbsenceHandlersCnt.computeIfAbsent(tableFullName, k -> new AtomicInteger(-1)); + + int hndNum = counter.incrementAndGet(); + try { - synchronized (tblAbsenceHandlersCnt) { + synchronized (counter) { // Oooops... I am not the first thread who tried to handle table absence problem. if (hndNum != 0) { log.warning("Table " + tableFullName + " absence problem detected. " + @@ -832,77 +891,58 @@ public class CassandraSessionImpl implements CassandraSession { log.warning("Table " + tableFullName + " absence problem detected. " + "Trying to create table."); - IgniteException error = new IgniteException("Failed to create Cassandra table " + tableFullName); - - int attempt = 0; - - while (error != null && attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { - error = null; - - try { - createKeyspace(settings); - createTable(table, settings); - createTableIndexes(table, settings); - } - catch (Throwable e) { - if (CassandraHelper.isHostsAvailabilityError(e)) - handleHostsAvailabilityError(e, attempt, null); - else - throw new IgniteException("Failed to create Cassandra table " + tableFullName, e); - - error = (e instanceof IgniteException) ? (IgniteException)e : new IgniteException(e); - } - - attempt++; - } - - if (error != null) - throw error; + createKeyspace(settings); + createTable(table, settings); + createTableIndexes(table, settings); } } finally { if (hndNum == 0) - tblAbsenceHandlersCnt.set(-1); + counter.set(-1); } } /** * Handles situation when prepared statement execution failed cause session to the cluster was released. * + * @param sesGeneration Generation of Cassandra session used to create prepared statement. + * @param e Exception thrown during statement execution. */ - private void handlePreparedStatementClusterError(Throwable e) { - int hndNum = prepStatementHandlersCnt.incrementAndGet(); + private void handlePreparedStatementClusterError(long sesGeneration, Throwable e) { + if (sesGeneration < generation) { + log.warning("Prepared statement cluster error detected, another thread already fixed the problem", e); + return; + } + + refreshLock.lock(); try { - synchronized (prepStatementHandlersCnt) { - // Oooops... I am not the first thread who tried to handle prepared statement problem. - if (hndNum != 0) { - log.warning("Prepared statement cluster error detected, another thread already fixed the problem", e); - return; - } + if (sesGeneration < generation) { + log.warning("Prepared statement cluster error detected, another thread already fixed the problem", e); + return; + } - log.warning("Prepared statement cluster error detected, refreshing Cassandra session", e); + log.warning("Prepared statement cluster error detected, refreshing Cassandra session", e); - refresh(); + refresh(); - log.warning("Cassandra session refreshed"); - } + log.warning("Cassandra session refreshed"); } finally { - if (hndNum == 0) - prepStatementHandlersCnt.set(-1); + refreshLock.unlock(); } } /** * Handles situation when Cassandra host which is responsible for CQL query execution became unavailable. * + * @param sesGeneration Generation of Cassandra session used to run CQL statement. * @param e Exception to handle. * @param attempt Number of attempts. * @param msg Error message. * @return {@code true} if host unavailability was successfully handled. */ - private boolean handleHostsAvailabilityError(Throwable e, int attempt, String msg) { + private boolean handleHostsAvailabilityError(long sesGeneration, Throwable e, int attempt, String msg) { if (attempt >= CQL_EXECUTION_ATTEMPTS_COUNT) { log.error("Host availability problem detected. " + "Number of CQL execution attempts reached maximum " + CQL_EXECUTION_ATTEMPTS_COUNT + @@ -914,14 +954,26 @@ public class CassandraSessionImpl implements CassandraSession { attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 || attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 + CQL_EXECUTION_ATTEMPTS_COUNT / 4 || attempt == CQL_EXECUTION_ATTEMPTS_COUNT - 1) { - log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " + - "refreshing Cassandra session", e); - refresh(); + refreshLock.lock(); - log.warning("Cassandra session refreshed"); + try { + if (sesGeneration < generation) + log.warning("Host availability problem detected, but already handled by another thread"); + else { + log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " + + "refreshing Cassandra session", e); + + refresh(); - return true; + log.warning("Cassandra session refreshed"); + + return true; + } + } + finally { + refreshLock.unlock(); + } } log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " + http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedPreparedStatement.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedPreparedStatement.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedPreparedStatement.java new file mode 100644 index 0000000..be86516 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedPreparedStatement.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.CodecRegistry; +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedId; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.policies.RetryPolicy; + +import java.nio.ByteBuffer; +import java.util.Map; + +/** + * Simple wrapper providing access to Cassandra prepared statement and generation of Cassandra + * session which was used to create this statement + */ +public class WrappedPreparedStatement implements PreparedStatement { + /** Prepared statement. **/ + private final PreparedStatement st; + + /** Generation of Cassandra session which was used to prepare this statement. **/ + final long generation; + + /** + * Constructor. + * + * @param st Prepared statement. + * @param generation Generation of Cassandra session used to prepare this statement. + */ + WrappedPreparedStatement(PreparedStatement st, long generation) { + this.st = st; + this.generation = generation; + } + + /** + * Getter for wrapped statement. + * + * @return Wrapped original statement. + */ + public PreparedStatement getWrappedStatement() { + return st; + } + + /** {@inheritDoc} */ + @Override public ColumnDefinitions getVariables() { + return st.getVariables(); + } + + /** {@inheritDoc} */ + @Override public BoundStatement bind(Object... values) { + return st.bind(values); + } + + /** {@inheritDoc} */ + @Override public BoundStatement bind() { + return st.bind(); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement setRoutingKey(ByteBuffer routingKey) { + return st.setRoutingKey(routingKey); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement setRoutingKey(ByteBuffer... routingKeyComponents) { + return st.setRoutingKey(routingKeyComponents); + } + + /** {@inheritDoc} */ + @Override public ByteBuffer getRoutingKey() { + return st.getRoutingKey(); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement setConsistencyLevel(ConsistencyLevel consistency) { + return st.setConsistencyLevel(consistency); + } + + /** {@inheritDoc} */ + @Override public ConsistencyLevel getConsistencyLevel() { + return st.getConsistencyLevel(); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement setSerialConsistencyLevel(ConsistencyLevel serialConsistency) { + return st.setSerialConsistencyLevel(serialConsistency); + } + + /** {@inheritDoc} */ + @Override public ConsistencyLevel getSerialConsistencyLevel() { + return st.getSerialConsistencyLevel(); + } + + /** {@inheritDoc} */ + @Override public String getQueryString() { + return st.getQueryString(); + } + + /** {@inheritDoc} */ + @Override public String getQueryKeyspace() { + return st.getQueryKeyspace(); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement enableTracing() { + return st.enableTracing(); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement disableTracing() { + return st.disableTracing(); + } + + /** {@inheritDoc} */ + @Override public boolean isTracing() { + return st.isTracing(); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement setRetryPolicy(RetryPolicy policy) { + return st.setRetryPolicy(policy); + } + + /** {@inheritDoc} */ + @Override public RetryPolicy getRetryPolicy() { + return st.getRetryPolicy(); + } + + /** {@inheritDoc} */ + @Override public PreparedId getPreparedId() { + return st.getPreparedId(); + } + + /** {@inheritDoc} */ + @Override public Map<String, ByteBuffer> getIncomingPayload() { + return st.getIncomingPayload(); + } + + /** {@inheritDoc} */ + @Override public Map<String, ByteBuffer> getOutgoingPayload() { + return st.getOutgoingPayload(); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement setOutgoingPayload(Map<String, ByteBuffer> payload) { + return st.setOutgoingPayload(payload); + } + + /** {@inheritDoc} */ + @Override public CodecRegistry getCodecRegistry() { + return st.getCodecRegistry(); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement setIdempotent(Boolean idempotent) { + return st.setIdempotent(idempotent); + } + + /** {@inheritDoc} */ + @Override public Boolean isIdempotent() { + return st.isIdempotent(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedSession.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedSession.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedSession.java new file mode 100644 index 0000000..d9b7224 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/WrappedSession.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.NoHostAvailableException; + +/** + * Simple container for Cassandra session and its generation number. + */ +public class WrappedSession { + /** Cassandra driver session. **/ + final Session ses; + + /** Cassandra session generation number. **/ + final long generation; + + /** + * Constructor. + * + * @param ses Cassandra session. + * @param generation Cassandra session generation number. + */ + WrappedSession(Session ses, long generation) { + this.ses = ses; + this.generation = generation; + } + + /** + * Prepares the provided query string. + * + * @param query the CQL query string to prepare + * @return the prepared statement corresponding to {@code query}. + * @throws NoHostAvailableException if no host in the cluster can be + * contacted successfully to prepare this query. + */ + WrappedPreparedStatement prepare(String query) { + return new WrappedPreparedStatement(ses.prepare(query), generation); + } + + /** + * Executes the provided query. + * + * @param statement The CQL query to execute (that can be any {@link Statement}). + * + * @return The result of the query. That result will never be null but can + */ + ResultSet execute(Statement statement) { + return ses.execute(statement); + } + + /** + * Executes the provided query. + * + * @param query The CQL query to execute (that can be any {@link Statement}). + * + * @return The result of the query. That result will never be null but can + */ + ResultSet execute(String query) { + return ses.execute(query); + } + + /** + * Executes the provided query asynchronously. + * + * @param statement the CQL query to execute (that can be any {@code Statement}). + * + * @return a future on the result of the query. + */ + ResultSetFuture executeAsync(Statement statement) { + return ses.executeAsync(statement); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/IdleSession.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/IdleSession.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/IdleSession.java new file mode 100644 index 0000000..57eb5d1 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/IdleSession.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session.pool; + +import com.datastax.driver.core.Session; +import org.apache.ignite.cache.store.cassandra.common.CassandraHelper; + +/** + * Simple wrapper for idle Cassandra session returned to pool, responsible for monitoring session expiration and its closing. + */ +public class IdleSession { + /** Cassandra driver session. */ + private Session ses; + + /** Expiration timeout. */ + private long expirationTimeout; + + /** Wrapper creation time. */ + private long time; + + /** + * Creates instance of Cassandra driver session wrapper. + * + * @param ses Cassandra driver session. + */ + public IdleSession(Session ses, long expirationTimeout) { + this.ses = ses; + this.expirationTimeout = expirationTimeout; + this.time = System.currentTimeMillis(); + } + + /** + * Checks if Cassandra driver session expired. + * + * @return true if session expired. + */ + public boolean expired() { + return expirationTimeout > 0 && System.currentTimeMillis() - time > expirationTimeout; + } + + /** + * Returns wrapped Cassandra driver session. + * + * @return Cassandra driver session. + */ + public Session driverSession() { + return ses; + } + + /** + * Closes wrapped Cassandra driver session + */ + public void release() { + CassandraHelper.closeSession(ses); + ses = null; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java index 4de8516..cf72f8a 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java @@ -45,23 +45,23 @@ public class SessionPool { return; } - List<Map.Entry<CassandraSessionImpl, SessionWrapper>> expiredSessions = new LinkedList<>(); + List<Map.Entry<CassandraSessionImpl, IdleSession>> expiredSessions = new LinkedList<>(); int sessionsCnt; synchronized (sessions) { sessionsCnt = sessions.size(); - for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : sessions.entrySet()) { + for (Map.Entry<CassandraSessionImpl, IdleSession> entry : sessions.entrySet()) { if (entry.getValue().expired()) expiredSessions.add(entry); } - for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions) + for (Map.Entry<CassandraSessionImpl, IdleSession> entry : expiredSessions) sessions.remove(entry.getKey()); } - for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions) + for (Map.Entry<CassandraSessionImpl, IdleSession> entry : expiredSessions) entry.getValue().release(); // all sessions in the pool expired, thus we don't need additional thread to manage sessions in the pool @@ -79,7 +79,7 @@ public class SessionPool { private static final long SLEEP_TIMEOUT = 60000; // 1 minute. /** Sessions which were returned to pool. */ - private static final Map<CassandraSessionImpl, SessionWrapper> sessions = new HashMap<>(); + private static final Map<CassandraSessionImpl, IdleSession> sessions = new HashMap<>(); /** Singleton instance. */ private static SessionMonitor monitorSingleton; @@ -102,10 +102,10 @@ public class SessionPool { if (cassandraSes == null || driverSes == null) return; - SessionWrapper old; + IdleSession old; synchronized (sessions) { - old = sessions.put(cassandraSes, new SessionWrapper(driverSes, expirationTimeout)); + old = sessions.put(cassandraSes, new IdleSession(driverSes, expirationTimeout)); if (monitorSingleton == null || State.TERMINATED.equals(monitorSingleton.getState())) { monitorSingleton = new SessionMonitor(); @@ -129,7 +129,7 @@ public class SessionPool { if (cassandraSes == null) return null; - SessionWrapper wrapper; + IdleSession wrapper; synchronized (sessions) { wrapper = sessions.remove(cassandraSes); @@ -142,7 +142,7 @@ public class SessionPool { * Releases all session from pool and closes all their connections to Cassandra database. */ public static void release() { - Collection<SessionWrapper> wrappers; + Collection<IdleSession> wrappers; synchronized (sessions) { try { @@ -151,7 +151,7 @@ public class SessionPool { wrappers = new LinkedList<>(); - for (SessionWrapper wrapper : sessions.values()) + for (IdleSession wrapper : sessions.values()) wrappers.add(wrapper); sessions.clear(); @@ -167,7 +167,7 @@ public class SessionPool { } } - for (SessionWrapper wrapper : wrappers) + for (IdleSession wrapper : wrappers) wrapper.release(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java deleted file mode 100644 index 68b9dd4..0000000 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.session.pool; - -import com.datastax.driver.core.Session; -import org.apache.ignite.cache.store.cassandra.common.CassandraHelper; - -/** - * Wrapper for Cassandra driver session, responsible for monitoring session expiration and its closing. - */ -public class SessionWrapper { - /** Cassandra driver session. */ - private Session ses; - - /** Expiration timeout. */ - private long expirationTimeout; - - /** Wrapper creation time. */ - private long time; - - /** - * Creates instance of Cassandra driver session wrapper. - * - * @param ses Cassandra driver session. - */ - public SessionWrapper(Session ses, long expirationTimeout) { - this.ses = ses; - this.expirationTimeout = expirationTimeout; - - time = System.currentTimeMillis(); - } - - /** - * Checks if Cassandra driver session expired. - * - * @return true if session expired. - */ - public boolean expired() { - return expirationTimeout > 0 && System.currentTimeMillis() - time > expirationTimeout; - } - - /** - * Returns wrapped Cassandra driver session. - * - * @return Cassandra driver session. - */ - public Session driverSession() { - return ses; - } - - /** - * Closes wrapped Cassandra driver session - */ - public void release() { - CassandraHelper.closeSession(ses); - - ses = null; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraConfigTest.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraConfigTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraConfigTest.java new file mode 100644 index 0000000..98d7ef1 --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraConfigTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests; + +import junit.framework.TestCase; +import org.apache.ignite.cache.affinity.AffinityKeyMapped; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cache.store.cassandra.persistence.KeyPersistenceSettings; +import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; + +/** + * Simple test for DDL generator. + */ +public class CassandraConfigTest extends TestCase { + /** + * Check if same DDL generated for similar keys and same KeyPersistenceConfiguration. + * + * @throws Exception + */ + public void testDDLGeneration() throws Exception { + KeyPersistenceSettings keyPersistenceSettingsA = getKeyPersistenceSettings(KeyA.class); + KeyPersistenceSettings keyPersistenceSettingsB = getKeyPersistenceSettings(KeyB.class); + + assertEquals(keyPersistenceSettingsB.getPrimaryKeyDDL(), + keyPersistenceSettingsA.getPrimaryKeyDDL()); + + assertEquals(keyPersistenceSettingsB.getClusteringDDL(), + keyPersistenceSettingsA.getClusteringDDL()); + } + + /** + * @return KeyPersistenceSetting + */ + private KeyPersistenceSettings getKeyPersistenceSettings(Class keyClass) { + String cfg = "<persistence keyspace=\"public\">" + + " <keyPersistence class=\"" + keyClass.getName() + "\" strategy=\"POJO\"> \n" + + " <partitionKey>\n" + + " <field name=\"name\" column=\"name\"/>\n" + + " <field name=\"contextId\" column=\"context_id\"/>\n" + + " <field name=\"creationDate\" column=\"creation_date\"/>\n" + + " </partitionKey>\n" + + " <clusterKey>\n" + + " <field name=\"timestamp\" column=\"timestamp\"/>\n" + + " </clusterKey>\n" + + " </keyPersistence>" + + " <valuePersistence class=\"java.lang.Object\" strategy=\"BLOB\">" + + " </valuePersistence>" + + "</persistence>"; + + return new KeyValuePersistenceSettings(cfg).getKeyPersistenceSettings(); + } + + /** + * + */ + public static class BaseKey { + /** */ + @QuerySqlField + // Looks like next annotation is ignored when generating DDL, + // but Ignite supports this annotation in parent classes. +// @AffinityKeyMapped + private Integer contextId; + + /** */ + public Integer getContextId() { + return contextId; + } + + /** */ + public void setContextId(Integer contextId) { + this.contextId = contextId; + } + } + + /** + * + */ + public static class KeyA extends BaseKey { + /** */ + @QuerySqlField(index = true) + private String timestamp; + + /** */ + @QuerySqlField(index = true) + private String name; + + /** */ + @QuerySqlField + private String creationDate; + + /** + * Constructor. + */ + public KeyA() { + } + } + + /** + * + */ + public static class KeyB { + + /** */ + @QuerySqlField(index = true) + private String timestamp; + + /** */ + @QuerySqlField(index = true) + private String name; + + /** */ + @QuerySqlField + private String creationDate; + + /** */ + @QuerySqlField +// @AffinityKeyMapped + private Integer contextId; + + /** + * Constructor. + */ + public KeyB() { + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5f568790/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraSessionImplTest.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraSessionImplTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraSessionImplTest.java index 27fd741..ca54794 100644 --- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraSessionImplTest.java +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraSessionImplTest.java @@ -33,6 +33,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; import org.apache.ignite.cache.store.cassandra.session.BatchExecutionAssistant; import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl; +import org.apache.ignite.cache.store.cassandra.session.WrappedPreparedStatement; import org.junit.Test; import com.datastax.driver.core.BoundStatement; @@ -151,11 +152,16 @@ public class CassandraSessionImplTest { @Override public BoundStatement bindStatement(PreparedStatement statement, Object obj) { + if (statement instanceof WrappedPreparedStatement) + statement = ((WrappedPreparedStatement)statement).getWrappedStatement(); + if (statement == preparedStatement1) { return boundStatement1; - } else if (statement == preparedStatement2) { + } + else if (statement == preparedStatement2) { return boundStatement2; } + throw new RuntimeException("unexpected"); }