This is an automated email from the ASF dual-hosted git repository. isapego pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 7b443b4 IGNITE-13308: CPP Thin client Transactions 7b443b4 is described below commit 7b443b4698628c808e9e0f2810b19b021da5b04c Author: zstan <stanilov...@gmail.com> AuthorDate: Mon Aug 31 17:41:35 2020 +0300 IGNITE-13308: CPP Thin client Transactions This closes #8118 --- .../cpp/common/include/ignite/ignite_error.h | 6 + .../cpp/core/src/transactions/transactions.cpp | 1 + .../platforms/cpp/thin-client-test/CMakeLists.txt | 4 +- .../project/vs/thin-client-test.vcxproj | 1 + .../platforms/cpp/thin-client-test/src/tx_test.cpp | 400 +++++++++++++++++++++ modules/platforms/cpp/thin-client/CMakeLists.txt | 2 + .../impl/thin/transactions/transactions_proxy.h | 145 ++++++++ .../include/ignite/thin/ignite_client.h | 12 + .../include/ignite/thin/transactions/transaction.h | 100 ++++++ .../ignite/thin/transactions/transaction_consts.h | 119 ++++++ .../ignite/thin/transactions/transactions.h | 130 +++++++ .../cpp/thin-client/project/vs/thin-client.vcxproj | 7 + .../cpp/thin-client/src/ignite_client.cpp | 5 + .../src/impl/cache/cache_client_impl.cpp | 86 +++++ .../thin-client/src/impl/cache/cache_client_impl.h | 8 + .../src/impl/cache/cache_client_proxy.cpp | 2 - .../thin-client/src/impl/ignite_client_impl.cpp | 13 +- .../cpp/thin-client/src/impl/ignite_client_impl.h | 11 + .../platforms/cpp/thin-client/src/impl/message.cpp | 9 +- .../platforms/cpp/thin-client/src/impl/message.h | 201 ++++++++++- .../src/impl/transactions/transaction_impl.h | 183 ++++++++++ .../src/impl/transactions/transactions_impl.cpp | 214 +++++++++++ .../src/impl/transactions/transactions_impl.h | 129 +++++++ .../src/impl/transactions/transactions_proxy.cpp | 82 +++++ 24 files changed, 1858 insertions(+), 12 deletions(-) diff --git a/modules/platforms/cpp/common/include/ignite/ignite_error.h b/modules/platforms/cpp/common/include/ignite/ignite_error.h index 0f46d5a..0c402d4 100644 --- a/modules/platforms/cpp/common/include/ignite/ignite_error.h +++ b/modules/platforms/cpp/common/include/ignite/ignite_error.h @@ -207,6 +207,12 @@ namespace ignite /** SSL/TLS error. */ static const int IGNITE_ERR_SECURE_CONNECTION_FAILURE = 2026; + + /** Transaction already started by current thread. */ + static const int IGNITE_ERR_TX_THIS_THREAD = 2027; + + /** Generic transaction error. */ + static const int IGNITE_ERR_TX = 2028; /** Unknown error. */ diff --git a/modules/platforms/cpp/core/src/transactions/transactions.cpp b/modules/platforms/cpp/core/src/transactions/transactions.cpp index 4c1ea70..54a9fa2 100644 --- a/modules/platforms/cpp/core/src/transactions/transactions.cpp +++ b/modules/platforms/cpp/core/src/transactions/transactions.cpp @@ -16,6 +16,7 @@ */ #include "ignite/transactions/transactions.h" +#include "ignite/transactions/transaction.h" using namespace ignite::common::concurrent; using namespace ignite::impl::transactions; diff --git a/modules/platforms/cpp/thin-client-test/CMakeLists.txt b/modules/platforms/cpp/thin-client-test/CMakeLists.txt index d0ebe58..535cb1a 100644 --- a/modules/platforms/cpp/thin-client-test/CMakeLists.txt +++ b/modules/platforms/cpp/thin-client-test/CMakeLists.txt @@ -33,7 +33,9 @@ set(SOURCES src/teamcity/teamcity_boost.cpp src/test_utils.cpp src/ignite_client_test.cpp src/auth_test.cpp - src/ssl_test.cpp) + src/tx_test.cpp + src/ssl_test.cpp + ) add_executable(${TARGET} ${SOURCES}) diff --git a/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj b/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj index ad13f6f..118c490 100644 --- a/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj +++ b/modules/platforms/cpp/thin-client-test/project/vs/thin-client-test.vcxproj @@ -26,6 +26,7 @@ <ClCompile Include="..\..\src\teamcity\teamcity_boost.cpp" /> <ClCompile Include="..\..\src\teamcity\teamcity_messages.cpp" /> <ClCompile Include="..\..\src\test_utils.cpp" /> + <ClCompile Include="..\..\src\tx_test.cpp" /> </ItemGroup> <ItemGroup> <ClInclude Include="..\..\include\ignite\complex_type.h" /> diff --git a/modules/platforms/cpp/thin-client-test/src/tx_test.cpp b/modules/platforms/cpp/thin-client-test/src/tx_test.cpp new file mode 100644 index 0000000..19757a2 --- /dev/null +++ b/modules/platforms/cpp/thin-client-test/src/tx_test.cpp @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <boost/chrono.hpp> +#include <boost/thread.hpp> +#include <boost/test/unit_test.hpp> + +#include <ignite/ignition.h> + +#include "ignite/thin/ignite_client_configuration.h" +#include "ignite/thin/ignite_client.h" +#include "ignite/thin/cache/cache_peek_mode.h" +#include "ignite/thin/transactions/transaction_consts.h" + +#include <test_utils.h> +#include <ignite/ignite_error.h> + +using namespace ignite::thin; +using namespace boost::unit_test; +using namespace ignite::thin::transactions; +using namespace ignite::common::concurrent; + +class IgniteTxTestSuiteFixture +{ +public: + IgniteTxTestSuiteFixture() + { + serverNode = ignite_test::StartCrossPlatformServerNode("cache.xml", "ServerNode"); + } + + ~IgniteTxTestSuiteFixture() + { + ignite::Ignition::StopAll(false); + } + +private: + /** Server node. */ + ignite::Ignite serverNode; +}; + +BOOST_FIXTURE_TEST_SUITE(IgniteTxTestSuite, IgniteTxTestSuiteFixture) + +bool correctCloseMessage(const ignite::IgniteError& ex) +{ + BOOST_CHECK_EQUAL(ex.what(), std::string(TX_ALREADY_CLOSED)); + + return true; +} + +bool separateThreadMessage(const ignite::IgniteError& ex) +{ + BOOST_CHECK_EQUAL(ex.what(), std::string(TX_DIFFERENT_THREAD)); + + return true; +} + +bool checkTxTimeoutMessage(const ignite::IgniteError& ex) +{ + return std::string(ex.what()).find("Cache transaction timed out") != std::string::npos; +} + +BOOST_AUTO_TEST_CASE(TestCacheOpsWithTx) +{ + IgniteClientConfiguration cfg; + + cfg.SetEndPoints("127.0.0.1:11110"); + + IgniteClient client = IgniteClient::Start(cfg); + + cache::CacheClient<int, int> cache = + client.GetCache<int, int>("partitioned"); + + cache.Put(1, 1); + + transactions::ClientTransactions transactions = client.ClientTransactions(); + + transactions::ClientTransaction tx = transactions.TxStart(); + + cache.Put(1, 10); + + BOOST_CHECK_EQUAL(10, cache.Get(1)); + + tx.Rollback(); + + BOOST_CHECK_EQUAL(1, cache.Get(1)); + + //--- + + tx = transactions.TxStart(); + + cache.Put(1, 10); + + tx.Commit(); + + BOOST_CHECK_EQUAL(10, cache.Get(1)); + + cache.Put(1, 1); + + //--- + + tx = transactions.TxStart(); + + cache.Put(1, 10); + + BOOST_CHECK_EQUAL(10, cache.Get(1)); + + tx.Close(); + + BOOST_CHECK_EQUAL(1, cache.Get(1)); + + //--- + + tx = transactions.TxStart(TransactionConcurrency::OPTIMISTIC, TransactionIsolation::SERIALIZABLE); + + cache.Put(1, 10); + + BOOST_CHECK_EQUAL(10, cache.Get(1)); + + tx.Close(); + + BOOST_CHECK_EQUAL(1, cache.Get(1)); + + //--- + + tx = transactions.TxStart(TransactionConcurrency::OPTIMISTIC, TransactionIsolation::SERIALIZABLE, 1000, 100); + + cache.Put(1, 10); + + BOOST_CHECK_EQUAL(10, cache.Get(1)); + + tx.Close(); + + BOOST_CHECK_EQUAL(1, cache.Get(1)); + + //--- + + tx = transactions.TxStart(); + + cache.Replace(1, 10); + + BOOST_CHECK_EQUAL(10, cache.Get(1)); + + tx.Rollback(); + + BOOST_CHECK_EQUAL(1, cache.Get(1)); + + //--- + + tx = transactions.TxStart(); + + cache.Replace(1, 1, 10); + + BOOST_CHECK_EQUAL(10, cache.Get(1)); + + tx.Rollback(); + + BOOST_CHECK_EQUAL(1, cache.Get(1)); + + //--- + + tx = transactions.TxStart(); + + cache.Put(2, 20); + + BOOST_CHECK_EQUAL(cache.ContainsKey(2), true); + + tx.Rollback(); + + BOOST_CHECK_EQUAL(cache.ContainsKey(2), false); + + //--- + + tx = transactions.TxStart(); + + cache.Put(2, 20); + + tx.Rollback(); + + BOOST_CHECK_EQUAL(cache.GetSize(cache::CachePeekMode::ALL), 1); + + //--- + + tx = transactions.TxStart(); + + int res1 = cache.GetAndPutIfAbsent(1, 10); + + int res2 = cache.GetAndPutIfAbsent(2, 20); + + BOOST_CHECK_EQUAL(1, res1); + + BOOST_CHECK_EQUAL(cache.Get(2), 20); + + BOOST_CHECK_EQUAL(0, res2); + + tx.Rollback(); + + BOOST_CHECK_EQUAL(cache.Get(2), 0); + + //--- + + tx = transactions.TxStart(); + + cache.Remove(1); + + tx.Rollback(); + + BOOST_CHECK_EQUAL(cache.ContainsKey(1), true); + + // Test transaction with a timeout. + + const uint32_t TX_TIMEOUT = 200; + + tx = transactions.TxStart(TransactionConcurrency::OPTIMISTIC, TransactionIsolation::SERIALIZABLE, TX_TIMEOUT); + + cache.Put(1, 10); + + boost::this_thread::sleep_for(boost::chrono::milliseconds(2 * TX_TIMEOUT)); + + BOOST_CHECK_EXCEPTION(cache.Put(1, 20), ignite::IgniteError, checkTxTimeoutMessage); + + BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, checkTxTimeoutMessage); + + tx.Close(); + + BOOST_CHECK_EQUAL(1, cache.Get(1)); +} + +void startAnotherClientAndTx(SharedPointer<SingleLatch>& l) +{ + IgniteClientConfiguration cfg; + + cfg.SetEndPoints("127.0.0.1:11110"); + + IgniteClient client = IgniteClient::Start(cfg); + + cache::CacheClient<int, int> cache = + client.GetCache<int, int>("partitioned"); + + transactions::ClientTransactions transactions = client.ClientTransactions(); + + transactions::ClientTransaction tx = transactions.TxStart(); + + l.Get()->CountDown(); + + cache.Put(2, 20); + + tx.Commit(); +} + +BOOST_AUTO_TEST_CASE(TestTxOps) +{ + IgniteClientConfiguration cfg; + + cfg.SetEndPoints("127.0.0.1:11110"); + + IgniteClient client = IgniteClient::Start(cfg); + + cache::CacheClient<int, int> cache = + client.GetCache<int, int>("partitioned"); + + cache.Put(1, 1); + + transactions::ClientTransactions transactions = client.ClientTransactions(); + + transactions::ClientTransaction tx = transactions.TxStart(); + + BOOST_CHECK_THROW( transactions.TxStart(), ignite::IgniteError ); + + tx.Close(); + + //Test end of already completed transaction. + + tx = transactions.TxStart(); + + tx.Close(); + + BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, correctCloseMessage); + + BOOST_CHECK_EXCEPTION(tx.Rollback(), ignite::IgniteError, correctCloseMessage); + + // Test end of outdated transaction. + + transactions::ClientTransaction tx1 = transactions.TxStart(); + + BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, separateThreadMessage); + + tx1.Close(); + + // Test end of outdated transaction. + + tx = transactions.TxStart(); + + tx.Commit(); + + BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, correctCloseMessage); + + tx.Close(); + + // Check multi threads. + + SharedPointer<SingleLatch> latch = SharedPointer<SingleLatch>(new SingleLatch()); + + tx = transactions.TxStart(); + + cache.Put(1, 10); + + boost::thread t2(startAnotherClientAndTx, latch); + + latch.Get()->Await(); + + tx.Rollback(); + + t2.join(); + + BOOST_CHECK_EQUAL(1, cache.Get(1)); + + BOOST_CHECK_EQUAL(20, cache.Get(2)); +} + +const std::string label = std::string("label_2_check"); + +std::string label1 = std::string("label_2_check1"); + +bool checkTxLabelMessage(const ignite::IgniteError& ex) +{ + return std::string(ex.what()).find(label) != std::string::npos; +} + +bool checkTxLabel1Message(const ignite::IgniteError& ex) +{ + return std::string(ex.what()).find("label_2_check1") != std::string::npos; +} + +BOOST_AUTO_TEST_CASE(TestTxWithLabel) +{ + IgniteClientConfiguration cfg; + + cfg.SetEndPoints("127.0.0.1:11110"); + + IgniteClient client = IgniteClient::Start(cfg); + + cache::CacheClient<int, int> cache = + client.GetCache<int, int>("partitioned"); + + const uint32_t TX_TIMEOUT = 200; + + transactions::ClientTransactions transactions = client.ClientTransactions(); + + transactions::ClientTransaction tx = transactions.withLabel(label).TxStart(TransactionConcurrency::OPTIMISTIC, TransactionIsolation::SERIALIZABLE, TX_TIMEOUT); + + cache.Put(1, 10); + + boost::this_thread::sleep_for(boost::chrono::milliseconds(2 * TX_TIMEOUT)); + + BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, checkTxLabelMessage); + + tx.Close(); + + // New label + + tx = transactions.TxStart(TransactionConcurrency::OPTIMISTIC, TransactionIsolation::SERIALIZABLE, TX_TIMEOUT); + + cache.Put(1, 10); + + boost::this_thread::sleep_for(boost::chrono::milliseconds(2 * TX_TIMEOUT)); + + BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, !checkTxLabelMessage); + + tx.Close(); + + // Label is gone + + tx = transactions.withLabel(label1).TxStart(TransactionConcurrency::OPTIMISTIC, TransactionIsolation::SERIALIZABLE, TX_TIMEOUT); + + label1 = "NULL"; + + cache.Put(1, 10); + + boost::this_thread::sleep_for(boost::chrono::milliseconds(2 * TX_TIMEOUT)); + + BOOST_CHECK_EXCEPTION(tx.Commit(), ignite::IgniteError, checkTxLabel1Message); + + tx.Close(); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/modules/platforms/cpp/thin-client/CMakeLists.txt b/modules/platforms/cpp/thin-client/CMakeLists.txt index 5cd121f..988e13f 100644 --- a/modules/platforms/cpp/thin-client/CMakeLists.txt +++ b/modules/platforms/cpp/thin-client/CMakeLists.txt @@ -34,6 +34,8 @@ set(SOURCES src/impl/data_channel.cpp src/impl/message.cpp src/impl/cache/cache_client_proxy.cpp src/impl/cache/cache_client_impl.cpp + src/impl/transactions/transactions_impl.cpp + src/impl/transactions/transactions_proxy.cpp src/ignite_client.cpp) add_library(${TARGET} SHARED ${SOURCES}) diff --git a/modules/platforms/cpp/thin-client/include/ignite/impl/thin/transactions/transactions_proxy.h b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/transactions/transactions_proxy.h new file mode 100644 index 0000000..ce0802c --- /dev/null +++ b/modules/platforms/cpp/thin-client/include/ignite/impl/thin/transactions/transactions_proxy.h @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_IMPL_THIN_TRANSACTIONS_PROXY +#define _IGNITE_IMPL_THIN_TRANSACTIONS_PROXY + +#include "ignite/common/concurrent.h" +#include <ignite/common/fixed_size_array.h> +#include "ignite/thin/transactions/transaction_consts.h" + +namespace ignite +{ + namespace impl + { + namespace thin + { + namespace transactions + { + /** + * Ignite transaction class proxy. + */ + class IGNITE_IMPORT_EXPORT TransactionProxy { + + public: + /** + * Default constructor. + */ + TransactionProxy() {} + + /** + * Constructor. + * @param impl Transaction implementation. + */ + TransactionProxy(const ignite::common::concurrent::SharedPointer<void>& impl) : + impl(impl) + {} + + /** + * Assignment operator. + * + * @param other Another instance. + * @return *this. + */ + TransactionProxy& operator=(const TransactionProxy& other) + { + impl = other.impl; + + return *this; + } + + /** + * Destructor. + */ + ~TransactionProxy() {}; + + /** + * Commit the transaction. + */ + void commit(); + + /** + * Rollback the transaction. + */ + void rollback(); + + /** + * Close the transaction. + */ + void close(); + + private: + /** Implementation. */ + ignite::common::concurrent::SharedPointer<void> impl; + }; + + /** + * Ignite transactions class proxy. + */ + class IGNITE_IMPORT_EXPORT TransactionsProxy + { + #define DEFAULT_CONCURRENCY TransactionConcurrency::PESSIMISTIC + #define DEFAULT_ISOLATION TransactionIsolation::READ_COMMITTED + #define DEFAULT_TIMEOUT 0 + #define DEFAULT_TX_SIZE 0 + public: + /** + * Constructor. + */ + TransactionsProxy(const ignite::common::concurrent::SharedPointer<void>& impl) : + impl(impl) + { + // No-op. + } + + /** + * Default constructor. + */ + TransactionsProxy() {} + + /** + * Destructor. + */ + ~TransactionsProxy() {} + + /** + * Start new transaction with completely clarify parameters. + * + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @param timeout Transaction timeout. + * @param txSize Number of entries participating in transaction (may be approximate). + * @param lbl Transaction specific label. + * + * @return Proxy implementation. + */ + TransactionProxy txStart( + ignite::thin::transactions::TransactionConcurrency::Type concurrency = ignite::thin::transactions::TransactionConcurrency::PESSIMISTIC, + ignite::thin::transactions::TransactionIsolation::Type isolation = ignite::thin::transactions::TransactionIsolation::READ_COMMITTED, + int64_t timeout = 0, + int32_t txSize = 0, + ignite::common::concurrent::SharedPointer<ignite::common::FixedSizeArray<char> > lbl = NULL); + private: + /** Implementation. */ + ignite::common::concurrent::SharedPointer<void> impl; + }; + } + } + } +} + +#endif // _IGNITE_IMPL_THIN_TRANSACTIONS_PROXY diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client.h b/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client.h index 245accb..b540edb 100644 --- a/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client.h +++ b/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client.h @@ -29,6 +29,7 @@ #include <ignite/thin/ignite_client_configuration.h> #include <ignite/thin/cache/cache_client.h> +#include <ignite/thin/transactions/transactions.h> namespace ignite { @@ -121,6 +122,14 @@ namespace ignite */ void GetCacheNames(std::vector<std::string>& cacheNames); + /** + * Starts transactions. + */ + transactions::ClientTransactions ClientTransactions() + { + return transactions::ClientTransactions(InternalTransactions()); + } + private: /** * Get cache. @@ -149,6 +158,9 @@ namespace ignite */ SP_Void InternalCreateCache(const char* name); + /** */ + SP_Void InternalTransactions(); + /** * Constructor. * diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/transactions/transaction.h b/modules/platforms/cpp/thin-client/include/ignite/thin/transactions/transaction.h new file mode 100644 index 0000000..97d75e6 --- /dev/null +++ b/modules/platforms/cpp/thin-client/include/ignite/thin/transactions/transaction.h @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_THIN_TRANSACTIONS_CLIENT_TRANSACTION +#define _IGNITE_THIN_TRANSACTIONS_CLIENT_TRANSACTION + +#include <ignite/impl/thin/transactions/transactions_proxy.h> + +namespace ignite +{ + namespace thin + { + namespace transactions + { + /** + * Transaction client. + * + * Implements main transactionsl API. + * + * This class implemented as a reference to an implementation so copying of this class instance will only + * create another reference to the same underlying object. Underlying object released automatically once all + * the instances are destructed. + */ + class ClientTransaction { + + public: + /** + * Constructor. + * + * @param impl Implementation. + */ + ClientTransaction(ignite::impl::thin::transactions::TransactionProxy impl) : + proxy(impl) + { + // No-op. + } + + /** + * Default constructor. + */ + ClientTransaction() + { + // No-op. + } + + /** + * Destructor. + */ + ~ClientTransaction() + { + // No-op. + } + + /** + * Commits this transaction. + */ + void Commit() + { + proxy.commit(); + } + + /** + * Rolls back this transaction. + */ + void Rollback() + { + proxy.rollback(); + } + + /** + * Ends the transaction. Transaction will be rolled back if it has not been committed. + */ + void Close() + { + proxy.close(); + } + + private: + /** Implementation. */ + ignite::impl::thin::transactions::TransactionProxy proxy; + }; + } + } +} + +#endif // _IGNITE_THIN_TRANSACTIONS_CLIENT_TRANSACTION diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/transactions/transaction_consts.h b/modules/platforms/cpp/thin-client/include/ignite/thin/transactions/transaction_consts.h new file mode 100644 index 0000000..0b01763 --- /dev/null +++ b/modules/platforms/cpp/thin-client/include/ignite/thin/transactions/transaction_consts.h @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TRANSACTION_CONSTS_H +#define TRANSACTION_CONSTS_H + +namespace ignite +{ + namespace thin + { + namespace transactions + { + #define TX_ALREADY_CLOSED "The transaction is already closed." + #define TX_ALREADY_STARTED "A transaction has already been started by the current thread." + #define TX_DIFFERENT_THREAD "You can commit transaction only from the thread it was started." + + /** + * Transaction concurrency control model. + */ + struct TransactionConcurrency + { + enum Type + { + /** + * Optimistic concurrency model. In this mode all cache operations + * are not distributed to other nodes until Transaction::Commit() + * is called. In this mode one @c 'PREPARE' message will be sent to + * participating cache nodes to start acquiring per-transaction + * locks, and once all nodes reply @c 'OK', a one-way @c 'COMMIT' + * message is sent without waiting for reply. + * + * Note that in this mode, optimistic failures are only possible in + * conjunction with ::IGNITE_TX_ISOLATION_SERIALIZABLE isolation + * level. In all other cases, optimistic transactions will never + * fail optimistically and will always be identically ordered on all + * participating grid nodes. + */ + OPTIMISTIC = 0, + + /** + * Pessimistic concurrency model. In this mode a lock is acquired + * on all cache operations with exception of read operations in + * ::IGNITE_TX_ISOLATION_READ_COMMITTED mode. All optional filters + * passed into cache operations will be evaluated after successful + * lock acquisition. Whenever Transaction::Commit() is called, a + * single one-way @c 'COMMIT' message is sent to participating cache + * nodes without waiting for reply. Note that there is no reason for + * distributed @c 'PREPARE' step, as all locks have been already + * acquired. + */ + PESSIMISTIC = 1 + }; + }; + + /** + * Defines different cache transaction isolation levels. + */ + struct TransactionIsolation + { + enum Type + { + /** + * Read committed isolation level. This isolation level means that + * always a committed value will be provided for read operations. + * With this isolation level values are always read from cache + * global memory or persistent store every time a value is accessed. + * In other words, if the same key is accessed more than once within + * the same transaction, it may have different value every time + * since global cache memory may be updated concurrently by other + * threads. + */ + READ_COMMITTED = 0, + + /** + * Repeatable read isolation level. This isolation level means that + * if a value was read once within transaction, then all consecutive + * reads will provide the same in-transaction value. With this + * isolation level accessed values are stored within in-transaction + * memory, so consecutive access to the same key within the same + * transaction will always return the value that was previously read + * or updated within this transaction. If concurrency is + * ::IGNITE_TX_CONCURRENCY_PESSIMISTIC, then a lock on the key will + * be acquired prior to accessing the value. + */ + REPEATABLE_READ = 1, + + /** + * Serializable isolation level. This isolation level means that all + * transactions occur in a completely isolated fashion, as if all + * transactions in the system had executed serially, one after the + * other. Read access with this level happens the same way as with + * ::IGNITE_TX_ISOLATION_REPEATABLE_READ level. However, in + * ::IGNITE_TX_CONCURRENCY_OPTIMISTIC mode, if some transactions + * cannot be serially isolated from each other, then one winner will + * be picked and the other transactions in conflict will result in + * IgniteError being thrown. + */ + SERIALIZABLE = 2 + }; + }; + } + } +} + +#endif // TRANSACTION_CONSTS_H diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/transactions/transactions.h b/modules/platforms/cpp/thin-client/include/ignite/thin/transactions/transactions.h new file mode 100644 index 0000000..2b740c6 --- /dev/null +++ b/modules/platforms/cpp/thin-client/include/ignite/thin/transactions/transactions.h @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_THIN_TRANSACTIONS_CLIENT_TRANSACTIONS +#define _IGNITE_THIN_TRANSACTIONS_CLIENT_TRANSACTIONS + +#include <string> + +#include <ignite/common/concurrent.h> +#include <ignite/common/fixed_size_array.h> +#include <ignite/impl/thin/transactions/transactions_proxy.h> +#include "ignite/thin/transactions/transaction.h" + +namespace ignite +{ + namespace thin + { + namespace transactions + { + /** + * Transactions client. + * + * This is an entry point for Thin C++ Ignite transactions. + * + * This class implemented as a reference to an implementation so copying of this class instance will only + * create another reference to the same underlying object. Underlying object released automatically once all + * the instances are destructed. + */ + class ClientTransactions { + public: + /** + * Constructor. + * + * @param impl Implementation. + */ + ClientTransactions(ignite::common::concurrent::SharedPointer<void> impl) : + proxy(impl), + label(ignite::common::concurrent::SharedPointer<ignite::common::FixedSizeArray<char> >()) + { + // No-op. + } + + /** + * Default constructor. + */ + ClientTransactions() + { + // No-op. + } + + /** + * Destructor. + */ + ~ClientTransactions() + { + // No-op. + } + + /** + * Start new transaction with completely clarify parameters. + * + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @param timeout Transaction timeout. + * @param txSize Number of entries participating in transaction (may be approximate). + * + * @return ClientTransaction implementation. + */ + ClientTransaction TxStart( + TransactionConcurrency::Type concurrency = TransactionConcurrency::PESSIMISTIC, + TransactionIsolation::Type isolation = TransactionIsolation::READ_COMMITTED, + int64_t timeout = 0, + int32_t txSize = 0) + { + return ClientTransaction(proxy.txStart(concurrency, isolation, timeout, txSize, label)); + } + + /** + * Returns instance of {@code ClientTransactions} to mark each new transaction with a specified label. + * + * @param label Transaction label. + * @return ClientTransactions implementation. + */ + ClientTransactions withLabel(const std::string& lbl) + { + ClientTransactions copy = ClientTransactions(proxy, lbl); + + return copy; + } + private: + /** Implementation. */ + ignite::impl::thin::transactions::TransactionsProxy proxy; + + /** Transaction specific label. */ + ignite::common::concurrent::SharedPointer<ignite::common::FixedSizeArray<char> > label; + + /** + * Constructor. + * + * @param impl Implementation. + */ + ClientTransactions(ignite::impl::thin::transactions::TransactionsProxy& impl, const std::string& lbl) : + proxy(impl) + { + ignite::common::FixedSizeArray<char> *label0 = new ignite::common::FixedSizeArray<char>(lbl.size() + 1); + + strcpy(label0->GetData(), lbl.c_str()); + + label = ignite::common::concurrent::SharedPointer<ignite::common::FixedSizeArray<char> >(label0); + } + }; + } + } +} + +#endif // _IGNITE_THIN_TRANSACTIONS_CLIENT_TRANSACTION diff --git a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj index 629de37..a4b8490 100644 --- a/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj +++ b/modules/platforms/cpp/thin-client/project/vs/thin-client.vcxproj @@ -167,6 +167,8 @@ <ClCompile Include="..\..\src\impl\protocol_version.cpp" /> <ClCompile Include="..\..\src\impl\remote_type_updater.cpp" /> <ClCompile Include="..\..\src\impl\utility.cpp" /> + <ClCompile Include="..\..\src\impl\transactions\transactions_impl.cpp" /> + <ClCompile Include="..\..\src\impl\transactions\transactions_proxy.cpp" /> </ItemGroup> <ItemGroup> <ClInclude Include="..\..\include\ignite\impl\thin\cache\cache_client_proxy.h" /> @@ -194,6 +196,11 @@ <ClInclude Include="..\..\src\impl\remote_type_updater.h" /> <ClInclude Include="..\..\src\impl\response_status.h" /> <ClInclude Include="..\..\src\impl\utility.h" /> + <ClInclude Include="..\..\include\ignite\thin\transactions\transactions.h" /> + <ClInclude Include="..\..\include\ignite\thin\transactions\transaction.h" /> + <ClInclude Include="..\..\include\ignite\thin\transactions\transaction_consts.h" /> + <ClInclude Include="..\..\include\ignite\thin\transactions\transactions_proxy.h" /> + <ClInclude Include="..\..\src\impl\transactions\transactions_impl.h" /> </ItemGroup> <ItemGroup> <ProjectReference Include="..\..\..\binary\project\vs\binary.vcxproj"> diff --git a/modules/platforms/cpp/thin-client/src/ignite_client.cpp b/modules/platforms/cpp/thin-client/src/ignite_client.cpp index 57909e4..79d2987 100644 --- a/modules/platforms/cpp/thin-client/src/ignite_client.cpp +++ b/modules/platforms/cpp/thin-client/src/ignite_client.cpp @@ -77,6 +77,11 @@ namespace ignite return static_cast<SP_Void>(GetClientImpl(impl).CreateCache(name)); } + IgniteClient::SP_Void IgniteClient::InternalTransactions() + { + return static_cast<SP_Void>(GetClientImpl(impl).ClientTransactions()); + } + IgniteClient::IgniteClient(SP_Void& impl) { this->impl.Swap(impl); diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp index 1422707..b128cc0 100644 --- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp +++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp @@ -20,6 +20,10 @@ #include "impl/response_status.h" #include "impl/message.h" #include "impl/cache/cache_client_impl.h" +#include "impl/transactions/transactions_impl.h" + +using namespace ignite::impl::thin::transactions; +using namespace ignite::common::concurrent; namespace ignite { @@ -29,11 +33,15 @@ namespace ignite { namespace cache { + typedef SharedPointer<TransactionImpl> SP_TransactionImpl; + CacheClientImpl::CacheClientImpl( const SP_DataRouter& router, + const transactions::SP_TransactionsImpl& tx, const std::string& name, int32_t id) : router(router), + tx(tx), name(name), id(id), binary(false) @@ -91,9 +99,24 @@ namespace ignite throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str()); } + template<typename ReqT> + void CacheClientImpl::checkTransactional(ReqT& req) + { + SP_TransactionImpl activeTx = tx.Get()->GetCurrent(); + + bool isUnderTx = activeTx.IsValid(); + + int32_t txId = isUnderTx ? activeTx.Get()->TxId() : 0; + + req.activeTx(isUnderTx, txId); + } + void CacheClientImpl::Put(const WritableKey& key, const Writable& value) { Cache2ValueRequest<RequestType::CACHE_PUT> req(id, binary, key, value); + + checkTransactional(req); + Response rsp; SyncCacheKeyMessage(key, req, rsp); @@ -102,6 +125,9 @@ namespace ignite void CacheClientImpl::Get(const WritableKey& key, Readable& value) { CacheValueRequest<RequestType::CACHE_GET> req(id, binary, key); + + checkTransactional(req); + CacheValueResponse rsp(value); SyncCacheKeyMessage(key, req, rsp); @@ -110,6 +136,9 @@ namespace ignite void CacheClientImpl::PutAll(const Writable & pairs) { CacheValueRequest<RequestType::CACHE_PUT_ALL> req(id, binary, pairs); + + checkTransactional(req); + Response rsp; SyncMessage(req, rsp); @@ -118,6 +147,9 @@ namespace ignite void CacheClientImpl::GetAll(const Writable& keys, Readable& pairs) { CacheValueRequest<RequestType::CACHE_GET_ALL> req(id, binary, keys); + + checkTransactional(req); + CacheValueResponse rsp(pairs); SyncMessage(req, rsp); @@ -126,6 +158,9 @@ namespace ignite bool CacheClientImpl::Replace(const WritableKey& key, const Writable& value) { Cache2ValueRequest<RequestType::CACHE_REPLACE> req(id, binary, key, value); + + checkTransactional(req); + BoolResponse rsp; SyncCacheKeyMessage(key, req, rsp); @@ -136,6 +171,9 @@ namespace ignite bool CacheClientImpl::ContainsKey(const WritableKey& key) { CacheValueRequest<RequestType::CACHE_CONTAINS_KEY> req(id, binary, key); + + checkTransactional(req); + BoolResponse rsp; SyncCacheKeyMessage(key, req, rsp); @@ -146,6 +184,9 @@ namespace ignite bool CacheClientImpl::ContainsKeys(const Writable& keys) { CacheValueRequest<RequestType::CACHE_CONTAINS_KEYS> req(id, binary, keys); + + checkTransactional(req); + BoolResponse rsp; SyncMessage(req, rsp); @@ -156,6 +197,9 @@ namespace ignite int64_t CacheClientImpl::GetSize(int32_t peekModes) { CacheGetSizeRequest req(id, binary, peekModes); + + checkTransactional(req); + Int64Response rsp; SyncMessage(req, rsp); @@ -166,6 +210,9 @@ namespace ignite bool CacheClientImpl::Remove(const WritableKey& key) { CacheValueRequest<RequestType::CACHE_REMOVE_KEY> req(id, binary, key); + + checkTransactional(req); + BoolResponse rsp; SyncCacheKeyMessage(key, req, rsp); @@ -176,6 +223,9 @@ namespace ignite bool CacheClientImpl::Remove(const WritableKey& key, const Writable& val) { Cache2ValueRequest<RequestType::CACHE_REMOVE_IF_EQUALS> req(id, binary, key, val); + + checkTransactional(req); + BoolResponse rsp; SyncCacheKeyMessage(key, req, rsp); @@ -186,6 +236,9 @@ namespace ignite void CacheClientImpl::RemoveAll(const Writable& keys) { CacheValueRequest<RequestType::CACHE_REMOVE_KEYS> req(id, binary, keys); + + checkTransactional(req); + Response rsp; SyncMessage(req, rsp); @@ -194,6 +247,9 @@ namespace ignite void CacheClientImpl::RemoveAll() { CacheRequest<RequestType::CACHE_REMOVE_ALL> req(id, binary); + + checkTransactional(req); + Response rsp; SyncMessage(req, rsp); @@ -202,6 +258,9 @@ namespace ignite void CacheClientImpl::Clear(const WritableKey& key) { CacheValueRequest<RequestType::CACHE_CLEAR_KEY> req(id, binary, key); + + checkTransactional(req); + Response rsp; SyncCacheKeyMessage(key, req, rsp); @@ -210,6 +269,9 @@ namespace ignite void CacheClientImpl::Clear() { CacheRequest<RequestType::CACHE_CLEAR> req(id, binary); + + checkTransactional(req); + Response rsp; SyncMessage(req, rsp); @@ -218,6 +280,9 @@ namespace ignite void CacheClientImpl::ClearAll(const Writable& keys) { CacheValueRequest<RequestType::CACHE_CLEAR_KEYS> req(id, binary, keys); + + checkTransactional(req); + Response rsp; SyncMessage(req, rsp); @@ -226,6 +291,9 @@ namespace ignite void CacheClientImpl::LocalPeek(const WritableKey& key, Readable& value) { CacheValueRequest<RequestType::CACHE_LOCAL_PEEK> req(id, binary, key); + + checkTransactional(req); + CacheValueResponse rsp(value); SyncCacheKeyMessage(key, req, rsp); @@ -234,6 +302,9 @@ namespace ignite bool CacheClientImpl::Replace(const WritableKey& key, const Writable& oldVal, const Writable& newVal) { Cache3ValueRequest<RequestType::CACHE_REPLACE_IF_EQUALS> req(id, binary, key, oldVal, newVal); + + checkTransactional(req); + BoolResponse rsp; SyncCacheKeyMessage(key, req, rsp); @@ -244,6 +315,9 @@ namespace ignite void CacheClientImpl::GetAndPut(const WritableKey& key, const Writable& valIn, Readable& valOut) { Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT> req(id, binary, key, valIn); + + checkTransactional(req); + CacheValueResponse rsp(valOut); SyncCacheKeyMessage(key, req, rsp); @@ -252,6 +326,9 @@ namespace ignite void CacheClientImpl::GetAndRemove(const WritableKey& key, Readable& valOut) { CacheValueRequest<RequestType::CACHE_GET_AND_REMOVE> req(id, binary, key); + + checkTransactional(req); + CacheValueResponse rsp(valOut); SyncCacheKeyMessage(key, req, rsp); @@ -260,6 +337,9 @@ namespace ignite void CacheClientImpl::GetAndReplace(const WritableKey& key, const Writable& valIn, Readable& valOut) { Cache2ValueRequest<RequestType::CACHE_GET_AND_REPLACE> req(id, binary, key, valIn); + + checkTransactional(req); + CacheValueResponse rsp(valOut); SyncCacheKeyMessage(key, req, rsp); @@ -268,6 +348,9 @@ namespace ignite bool CacheClientImpl::PutIfAbsent(const WritableKey& key, const Writable& val) { Cache2ValueRequest<RequestType::CACHE_PUT_IF_ABSENT> req(id, binary, key, val); + + checkTransactional(req); + BoolResponse rsp; SyncCacheKeyMessage(key, req, rsp); @@ -278,6 +361,9 @@ namespace ignite void CacheClientImpl::GetAndPutIfAbsent(const WritableKey& key, const Writable& valIn, Readable& valOut) { Cache2ValueRequest<RequestType::CACHE_GET_AND_PUT_IF_ABSENT> req(id, binary, key, valIn); + + checkTransactional(req); + CacheValueResponse rsp(valOut); SyncCacheKeyMessage(key, req, rsp); diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h index 805f9e7..e97fe17 100644 --- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h +++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h @@ -22,6 +22,7 @@ #include <string> #include "impl/data_router.h" +#include "impl/transactions/transactions_impl.h" namespace ignite { @@ -58,6 +59,7 @@ namespace ignite */ CacheClientImpl( const SP_DataRouter& router, + const transactions::SP_TransactionsImpl& tx, const std::string& name, int32_t id); @@ -297,6 +299,9 @@ namespace ignite template<typename ReqT, typename RspT> void SyncCacheKeyMessage(const WritableKey& key, const ReqT& req, RspT& rsp); + template<typename ReqT> + void checkTransactional(ReqT& req); + /** * Synchronously send message and receive response. * @@ -310,6 +315,9 @@ namespace ignite /** Data router. */ SP_DataRouter router; + /** Transactions. */ + transactions::SP_TransactionsImpl tx; + /** Cache name. */ std::string name; diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp index 13c256b..2caa976 100644 --- a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp +++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp @@ -16,8 +16,6 @@ */ #include <ignite/impl/thin/cache/cache_client_proxy.h> - -#include <ignite/impl/thin/cache/cache_client_proxy.h> #include <impl/cache/cache_client_impl.h> using namespace ignite::impl::thin; diff --git a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp index ab45047..d531a46 100644 --- a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp +++ b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp @@ -21,6 +21,7 @@ #include "impl/response_status.h" #include "impl/ignite_client_impl.h" +#include "impl/transactions/transactions_impl.h" namespace ignite { @@ -30,7 +31,8 @@ namespace ignite { IgniteClientImpl::IgniteClientImpl(const ignite::thin::IgniteClientConfiguration& cfg) : cfg(cfg), - router(new DataRouter(cfg)) + router(new DataRouter(cfg)), + txImpl(new transactions::TransactionsImpl(router)) { // No-op. } @@ -51,7 +53,7 @@ namespace ignite int32_t cacheId = utility::GetCacheId(name); - return MakeCacheImpl(router, name, cacheId); + return MakeCacheImpl(router, txImpl, name, cacheId); } cache::SP_CacheClientImpl IgniteClientImpl::GetOrCreateCache(const char* name) @@ -68,7 +70,7 @@ namespace ignite if (rsp.GetStatus() != ResponseStatus::SUCCESS) throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, rsp.GetError().c_str()); - return MakeCacheImpl(router, name, cacheId); + return MakeCacheImpl(router, txImpl, name, cacheId); } cache::SP_CacheClientImpl IgniteClientImpl::CreateCache(const char* name) @@ -85,7 +87,7 @@ namespace ignite if (rsp.GetStatus() != ResponseStatus::SUCCESS) throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, rsp.GetError().c_str()); - return MakeCacheImpl(router, name, cacheId); + return MakeCacheImpl(router, txImpl, name, cacheId); } void IgniteClientImpl::DestroyCache(const char* name) @@ -116,10 +118,11 @@ namespace ignite common::concurrent::SharedPointer<cache::CacheClientImpl> IgniteClientImpl::MakeCacheImpl( const SP_DataRouter& router, + const transactions::SP_TransactionsImpl& tx, const std::string& name, int32_t id) { - cache::SP_CacheClientImpl cache(new cache::CacheClientImpl(router, name, id)); + cache::SP_CacheClientImpl cache(new cache::CacheClientImpl(router, tx, name, id)); return cache; } diff --git a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h index 4ca67fe..24d32a4 100644 --- a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h +++ b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h @@ -23,6 +23,7 @@ #include "impl/data_router.h" #include "impl/cache/cache_client_impl.h" +#include "impl/transactions/transactions_impl.h" namespace ignite { @@ -82,6 +83,12 @@ namespace ignite */ common::concurrent::SharedPointer<cache::CacheClientImpl> CreateCache(const char* name); + /** */ + transactions::SP_TransactionsImpl ClientTransactions() const + { + return txImpl; + } + /** * Destroy cache by name. * @@ -109,6 +116,7 @@ namespace ignite */ static common::concurrent::SharedPointer<cache::CacheClientImpl> MakeCacheImpl( const SP_DataRouter& router, + const transactions::SP_TransactionsImpl& tx, const std::string& name, int32_t id); @@ -125,6 +133,9 @@ namespace ignite /** Data router. */ SP_DataRouter router; + + /** Transactions. */ + transactions::SP_TransactionsImpl txImpl; }; } } diff --git a/modules/platforms/cpp/thin-client/src/impl/message.cpp b/modules/platforms/cpp/thin-client/src/impl/message.cpp index 39723bf..5025f37 100644 --- a/modules/platforms/cpp/thin-client/src/impl/message.cpp +++ b/modules/platforms/cpp/thin-client/src/impl/message.cpp @@ -195,12 +195,12 @@ namespace ignite value.Read(reader); } - void BinaryTypeGetRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const + void BinaryTypeGetRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const { writer.WriteInt32(typeId); } - void BinaryTypePutRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const + void BinaryTypePutRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const { writer.WriteInt32(snapshot.GetTypeId()); writer.WriteString(snapshot.GetTypeName()); @@ -338,6 +338,11 @@ namespace ignite { value = reader.ReadInt64(); } + + void Int32Response::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&) + { + value = reader.ReadInt32(); + } } } } diff --git a/modules/platforms/cpp/thin-client/src/impl/message.h b/modules/platforms/cpp/thin-client/src/impl/message.h index 8fdb867..efeae21 100644 --- a/modules/platforms/cpp/thin-client/src/impl/message.h +++ b/modules/platforms/cpp/thin-client/src/impl/message.h @@ -28,6 +28,8 @@ #include <ignite/impl/thin/writable.h> #include <ignite/impl/thin/readable.h> +#include <ignite/thin/transactions/transaction_consts.h> + #include "impl/protocol_version.h" #include "impl/affinity/affinity_topology_version.h" #include "impl/affinity/partition_awareness_group.h" @@ -38,6 +40,11 @@ namespace ignite { namespace thin { + /** "Transactional" flag mask. */ + #define TRANSACTIONAL_FLAG_MASK 0x02; + + #define KEEP_BINARY_FLAG_MASK 0x01; + /* Forward declaration. */ class Readable; @@ -151,6 +158,12 @@ namespace ignite /** Put binary type info. */ PUT_BINARY_TYPE = 3003, + + /** Start new transaction. */ + OP_TX_START = 4000, + + /** Commit transaction. */ + OP_TX_END = 4001, }; }; @@ -344,7 +357,8 @@ namespace ignite */ CacheRequest(int32_t cacheId, bool binary) : cacheId(cacheId), - binary(binary) + binary(binary), + actTx(false) { // No-op. } @@ -358,13 +372,36 @@ namespace ignite } /** + * Sets transaction active flag and appropriate txId. + * @param active Transaction activity flag. + * @param id Transaction id. + */ + void activeTx(bool active, int32_t id) { + actTx = active; + + txId = id; + } + + /** * Write request using provided writer. * @param writer Writer. */ virtual void Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const { writer.WriteInt32(cacheId); - writer.WriteBool(binary); + + int8_t flags = 0; + + if (binary) + flags |= KEEP_BINARY_FLAG_MASK; + + if (actTx) + flags |= TRANSACTIONAL_FLAG_MASK; + + writer.WriteInt8(flags); + + if (actTx) + writer.WriteInt32(txId); } private: @@ -373,6 +410,10 @@ namespace ignite /** Binary flag. */ bool binary; + + bool actTx; + + int32_t txId; }; /** @@ -490,6 +531,15 @@ namespace ignite } /** + * Sets transaction active flag and appropriate txId. + * @param active Transaction activity flag. + * @param id Transaction id. + */ + void activeTx(bool active, int32_t id) { + CacheRequest<OpCode>::activeTx(active, id); + } + + /** * Write request using provided writer. * @param writer Writer. * @param ver Version. @@ -570,6 +620,108 @@ namespace ignite }; /** + * Tx start request. + */ + class TxStartRequest : public Request<RequestType::OP_TX_START> + { + public: + /** + * Constructor. + */ + TxStartRequest(ignite::thin::transactions::TransactionConcurrency::Type conc, + ignite::thin::transactions::TransactionIsolation::Type isolationLvl, + int64_t tmOut, int32_t sz, ignite::common::concurrent::SharedPointer<common::FixedSizeArray<char> > lbl) : + concurrency(conc), + isolation(isolationLvl), + timeout(tmOut), + size(sz), + label(lbl) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~TxStartRequest() + { + // No-op. + } + + /** + * Write request using provided writer. + * @param writer Writer. + * @param ver Version. + */ + virtual void Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const { + writer.WriteInt8(concurrency); + writer.WriteInt8(isolation); + writer.WriteInt64(timeout); + label.IsValid() ? writer.WriteString(label.Get()->GetData()) : writer.WriteNull(); + } + + private: + /** Cncurrency. */ + ignite::thin::transactions::TransactionConcurrency::Type concurrency; + + /** Isolation. */ + ignite::thin::transactions::TransactionIsolation::Type isolation; + + /** Timeout. */ + const int64_t timeout; + + /** Size. */ + const int32_t size; + + /** Tx label. */ + ignite::common::concurrent::SharedPointer<common::FixedSizeArray<char> > label; + }; + + /** + * Tx end request. + */ + class TxEndRequest : public Request<RequestType::OP_TX_END> + { + public: + /** + * Constructor. + * + * @param id Transaction id. + * @param comm Need to commit flag. + */ + TxEndRequest(int32_t id, bool comm) : + commited(comm), + txId(id) + { + } + + /** + * Destructor. + */ + virtual ~TxEndRequest() + { + // No-op. + } + + /** + * Write request using provided writer. + * @param writer Writer. + * @param ver Version. + */ + virtual void Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const { + writer.WriteInt32(txId); + writer.WriteBool(commited); + } + + private: + /** Tx id. */ + const int32_t txId; + + /** Need to commit flag. */ + const bool commited; + }; + + /** * Cache get binary type request. */ class BinaryTypeGetRequest : public Request<RequestType::GET_BINARY_TYPE> @@ -1010,6 +1162,51 @@ namespace ignite /** Value. */ int64_t value; }; + + /** + * Get cache names response. + */ + class Int32Response : public Response + { + public: + /** + * Constructor. + */ + Int32Response() : + value(0) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~Int32Response() + { + // No-op. + } + + /** + * Get received value. + * + * @return Received bool value. + */ + int32_t GetValue() const + { + return value; + } + + /** + * Read data if response status is ResponseStatus::SUCCESS. + * + * @param reader Reader. + */ + virtual void ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&); + + private: + /** Value. */ + int32_t value; + }; } } } diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h new file mode 100644 index 0000000..fc8d2b7 --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transaction_impl.h @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_IMPL_THIN_TRANSACTION_IMPL +#define _IGNITE_IMPL_THIN_TRANSACTION_IMPL + +#include "impl/data_router.h" +#include <ignite/common/fixed_size_array.h> +#include "ignite/thin/transactions/transaction_consts.h" +#include "impl/transactions/transactions_impl.h" + +namespace ignite +{ + namespace impl + { + namespace thin + { + namespace transactions + { + /* Forward declaration. */ + class TransactionImpl; + + /* Forward declaration. */ + class TransactionsImpl; + + typedef ignite::common::concurrent::SharedPointer<TransactionImpl> SP_TransactionImpl; + + /** + * Ignite transactions implementation. + * + * This is an entry point for Thin C++ Ignite transactions. + */ + class TransactionImpl + { + public: + /** + * Constructor. + * + * @param txImpl Transactions implementation. + * @param txid Transaction Id. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @param timeout Transaction timeout. + * @param size Number of entries participating in transaction (may be approximate). + */ + TransactionImpl( + TransactionsImpl& txImpl, + int32_t txid, + ignite::thin::transactions::TransactionConcurrency::Type concurrency, + ignite::thin::transactions::TransactionIsolation::Type isolation, + int64_t timeout, + int32_t size) : + txs(txImpl), + txId(txid), + concurrency(concurrency), + isolation(isolation), + timeout(timeout), + txSize(size), + closed(false) + { + // No-op. + } + + /** + * Destructor. + */ + ~TransactionImpl() + { + if (!IsClosed()) + Close(); + } + + /** + * Commits this transaction. + */ + void Commit(); + + /** + * Rolls back this transaction. + */ + void Rollback(); + + /** + * Ends the transaction. Transaction will be rolled back if it has not been committed. + */ + void Close(); + + /** + * @return Current transaction Id. + */ + int32_t TxId() const + { + return txId; + } + + /** + * Check if the transaction has been closed. + * + * @return True if the transaction has been closed. + */ + bool IsClosed() const; + + /** + * Sets close flag to tx. + */ + void Closed(); + + /** + * @return Current transaction. + */ + static SP_TransactionImpl GetCurrent(); + + /** + * Starts transaction. + * + * @param txs Transactions implementation. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @param timeout Transaction timeout. + * @param txSize Number of entries participating in transaction (may be approximate). + * @param label Transaction specific label. + */ + static SP_TransactionImpl Create( + TransactionsImpl& txs, + ignite::thin::transactions::TransactionConcurrency::Type concurrency, + ignite::thin::transactions::TransactionIsolation::Type isolation, + int64_t timeout, + int32_t txSize, + ignite::common::concurrent::SharedPointer<common::FixedSizeArray<char> > label); + protected: + /** Checks current thread state. */ + static void txThreadCheck(const TransactionImpl& tx); + + /** Completes tc and clear state from storage. */ + static void txThreadEnd(TransactionImpl& tx); + + private: + /** Transactions implementation. */ + TransactionsImpl& txs; + + /** Current transaction Id. */ + int32_t txId; + + /** Thread local instance of the transaction. */ + static ignite::common::concurrent::ThreadLocalInstance<SP_TransactionImpl> threadTx; + + /** Concurrency. */ + int concurrency; + + /** Isolation. */ + int isolation; + + /** Timeout in milliseconds. */ + int64_t timeout; + + /** Transaction size. */ + int32_t txSize; + + /** Closed flag. */ + bool closed; + + IGNITE_NO_COPY_ASSIGNMENT(TransactionImpl) + }; + } + } + } +} + +#endif // _IGNITE_IMPL_THIN_TRANSACTION_IMPL diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp new file mode 100644 index 0000000..a4ff32e --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.cpp @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "impl/message.h" +#include "impl/transactions/transactions_impl.h" +#include "impl/transactions/transaction_impl.h" +#include "impl/response_status.h" + +using namespace ignite::common::concurrent; +using namespace ignite::impl::thin; +using namespace ignite::thin::transactions; + +namespace ignite +{ + namespace impl + { + namespace thin + { + namespace transactions + { + ThreadLocalInstance<SP_TransactionImpl> TransactionImpl::threadTx; + + TransactionsImpl::TransactionsImpl(const SP_DataRouter& router) : + router(router) + { + } + + template<typename ReqT, typename RspT> + void TransactionsImpl::SendTxMessage(const ReqT& req, RspT& rsp) + { + router.Get()->SyncMessage(req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_TX, rsp.GetError().c_str()); + } + + SharedPointer<TransactionImpl> TransactionsImpl::TxStart( + TransactionConcurrency::Type concurrency, + TransactionIsolation::Type isolation, + int64_t timeout, + int32_t txSize, + SharedPointer<common::FixedSizeArray<char> > label) + { + SP_TransactionImpl tx = TransactionImpl::Create(*this, concurrency, isolation, timeout, txSize, label); + + return tx; + } + + SP_TransactionImpl TransactionImpl::Create( + TransactionsImpl& txs, + TransactionConcurrency::Type concurrency, + TransactionIsolation::Type isolation, + int64_t timeout, + int32_t txSize, + SharedPointer<common::FixedSizeArray<char> > label) + { + SP_TransactionImpl tx = threadTx.Get(); + + TransactionImpl* ptr = tx.Get(); + + if (ptr && !ptr->IsClosed()) + { + throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_STARTED); + } + + TxStartRequest req(concurrency, isolation, timeout, txSize, label); + + Int32Response rsp; + + txs.SendTxMessage(req, rsp); + + int32_t curTxId = rsp.GetValue(); + + tx = SP_TransactionImpl(new TransactionImpl(txs, curTxId, concurrency, isolation, timeout, txSize)); + + threadTx.Set(tx); + + return tx; + } + + SP_TransactionImpl TransactionImpl::GetCurrent() + { + SP_TransactionImpl tx = threadTx.Get(); + + TransactionImpl* ptr = tx.Get(); + + if (ptr) + { + if (ptr->IsClosed()) + { + tx = SP_TransactionImpl(); + + threadTx.Remove(); + } + } + else + { + tx = SP_TransactionImpl(); + } + + return tx; + } + + bool TransactionImpl::IsClosed() const + { + return closed; + } + + SP_TransactionImpl TransactionsImpl::GetCurrent() + { + return TransactionImpl::GetCurrent(); + } + + int32_t TransactionsImpl::TxCommit(int32_t txId) + { + TxEndRequest req(txId, true); + + Response rsp; + + SendTxMessage(req, rsp); + + return rsp.GetStatus(); + } + + int32_t TransactionsImpl::TxRollback(int32_t txId) + { + TxEndRequest req(txId, false); + + Response rsp; + + SendTxMessage(req, rsp); + + return rsp.GetStatus(); + } + + int32_t TransactionsImpl::TxClose(int32_t txId) + { + return TxRollback(txId); + } + + void TransactionImpl::Commit() + { + txThreadCheck(*this); + + txs.TxCommit(txId); + + txThreadEnd(*this); + } + + void TransactionImpl::Rollback() + { + txThreadCheck(*this); + + txs.TxRollback(txId); + + txThreadEnd(*this); + } + + void TransactionImpl::Close() + { + txThreadCheck(*this); + + if (IsClosed()) + { + return; + } + + txs.TxClose(txId); + + txThreadEnd(*this); + } + + void TransactionImpl::Closed() + { + closed = true; + } + + void TransactionImpl::txThreadEnd(TransactionImpl& tx) + { + tx.Closed(); + + threadTx.Set(0); + } + + void TransactionImpl::txThreadCheck(const TransactionImpl& inTx) + { + SP_TransactionImpl tx = threadTx.Get(); + + TransactionImpl* ptr = tx.Get(); + + if (!ptr) + throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_ALREADY_CLOSED); + + if (ptr->TxId() != inTx.TxId()) + throw IgniteError(IgniteError::IGNITE_ERR_TX_THIS_THREAD, TX_DIFFERENT_THREAD); + } + } + } + } +} diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h new file mode 100644 index 0000000..2f5282b --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_impl.h @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_IMPL_THIN_TRANSACTIONS_IMPL +#define _IGNITE_IMPL_THIN_TRANSACTIONS_IMPL + +#include "impl/data_router.h" +#include <ignite/common/fixed_size_array.h> +#include "ignite/thin/transactions/transaction_consts.h" +#include "impl/transactions/transaction_impl.h" + +namespace ignite +{ + namespace impl + { + namespace thin + { + namespace transactions + { + class TransactionsImpl; + + typedef ignite::common::concurrent::SharedPointer<TransactionImpl> SP_TransactionImpl; + typedef ignite::common::concurrent::SharedPointer<TransactionsImpl> SP_TransactionsImpl; + + /** + * Thin client transaction. + */ + class TransactionsImpl + { + public: + /** + * Constructor. + * + * @param router Data router instance. + */ + TransactionsImpl(const SP_DataRouter& router); + + /** + * Destructor. + */ + ~TransactionsImpl() {} + + /** + * Start new transaction. + * + * @param concurrency Concurrency. + * @param isolation Isolation. + * @param timeout Timeout in milliseconds. Zero if for infinite timeout. + * @param txSize Number of entries participating in transaction (may be approximate). + * @param label Transaction specific label. + * @return Transaction ID on success. + */ + SP_TransactionImpl TxStart( + ignite::thin::transactions::TransactionConcurrency::Type concurrency, + ignite::thin::transactions::TransactionIsolation::Type isolation, + int64_t timeout, + int32_t txSize, + ignite::common::concurrent::SharedPointer<common::FixedSizeArray<char> > label); + + /** + * Commit Transaction. + * + * @param id Transaction ID. + * @return Resulting state. + */ + int32_t TxCommit(int32_t id); + + /** + * Rollback Transaction. + * + * @param id Transaction ID. + * @return Resulting state. + */ + int32_t TxRollback(int32_t id); + + + /** + * Close the transaction. + * + * This method should only be used on the valid instance. + * + * @param id Transaction ID. + */ + int32_t TxClose(int32_t id); + + /** + * Get active transaction for the current thread. + * + * @return Active transaction implementation for current thread + * or null pointer if there is no active transaction for + * the thread. + */ + SP_TransactionImpl GetCurrent(); + + /** + * Synchronously send message and receive response. + * + * @param req Request message. + * @param rsp Response message. + * @throw IgniteError on error. + */ + template<typename ReqT, typename RspT> + void SendTxMessage(const ReqT& req, RspT& rsp); + private: + /** Data router. */ + SP_DataRouter router; + + IGNITE_NO_COPY_ASSIGNMENT(TransactionsImpl) + }; + } + } + } +} + +#endif // _IGNITE_IMPL_THIN_TRANSACTIONS_IMPL diff --git a/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_proxy.cpp b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_proxy.cpp new file mode 100644 index 0000000..e04766a --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/transactions/transactions_proxy.cpp @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/thin/transactions/transactions_proxy.h" +#include "impl/transactions/transactions_impl.h" + +using namespace ignite::impl::thin; +using namespace transactions; +using namespace ignite::thin::transactions; + +namespace +{ + using namespace ignite::common::concurrent; + + TransactionsImpl& GetTxsImpl(SharedPointer<void>& ptr) + { + return *reinterpret_cast<TransactionsImpl*>(ptr.Get()); + } + + TransactionImpl& GetTxImpl(SharedPointer<void>& ptr) + { + return *reinterpret_cast<TransactionImpl*>(ptr.Get()); + } +} + +namespace ignite +{ + namespace impl + { + namespace thin + { + namespace transactions + { + TransactionProxy TransactionsProxy::txStart( + TransactionConcurrency::Type concurrency, + TransactionIsolation::Type isolation, + int64_t timeout, + int32_t txSize, + SharedPointer<ignite::common::FixedSizeArray<char> > lbl) + { + return TransactionProxy(GetTxsImpl(impl).TxStart(concurrency, isolation, timeout, txSize, lbl)); + } + + void TransactionProxy::commit() + { + GetTxImpl(impl).Commit(); + } + + void TransactionProxy::rollback() + { + GetTxImpl(impl).Rollback(); + } + + void TransactionProxy::close() + { + try + { + GetTxImpl(impl).Close(); + } + catch (...) + { + //No-op, we can`t throw any exceptions here. + } + } + } + } + } +}