http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d3a13a49/thirdparty/rocksdb/util/thread_list_test.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/util/thread_list_test.cc b/thirdparty/rocksdb/util/thread_list_test.cc deleted file mode 100644 index 36a221b..0000000 --- a/thirdparty/rocksdb/util/thread_list_test.cc +++ /dev/null @@ -1,352 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). - -#include <mutex> -#include <condition_variable> - -#include "monitoring/thread_status_updater.h" -#include "rocksdb/db.h" -#include "util/testharness.h" - -#ifdef ROCKSDB_USING_THREAD_STATUS - -namespace rocksdb { - -class SimulatedBackgroundTask { - public: - SimulatedBackgroundTask( - const void* db_key, const std::string& db_name, - const void* cf_key, const std::string& cf_name, - const ThreadStatus::OperationType operation_type = - ThreadStatus::OP_UNKNOWN, - const ThreadStatus::StateType state_type = - ThreadStatus::STATE_UNKNOWN) - : db_key_(db_key), db_name_(db_name), - cf_key_(cf_key), cf_name_(cf_name), - operation_type_(operation_type), state_type_(state_type), - should_run_(true), running_count_(0) { - Env::Default()->GetThreadStatusUpdater()->NewColumnFamilyInfo( - db_key_, db_name_, cf_key_, cf_name_); - } - - ~SimulatedBackgroundTask() { - Env::Default()->GetThreadStatusUpdater()->EraseDatabaseInfo(db_key_); - } - - void Run() { - std::unique_lock<std::mutex> l(mutex_); - running_count_++; - Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_); - Env::Default()->GetThreadStatusUpdater()->SetThreadOperation( - operation_type_); - Env::Default()->GetThreadStatusUpdater()->SetThreadState(state_type_); - while (should_run_) { - bg_cv_.wait(l); - } - Env::Default()->GetThreadStatusUpdater()->ClearThreadState(); - Env::Default()->GetThreadStatusUpdater()->ClearThreadOperation(); - Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(0); - running_count_--; - bg_cv_.notify_all(); - } - - void FinishAllTasks() { - std::unique_lock<std::mutex> l(mutex_); - should_run_ = false; - bg_cv_.notify_all(); - } - - void WaitUntilScheduled(int job_count, Env* env) { - while (running_count_ < job_count) { - env->SleepForMicroseconds(1000); - } - } - - void WaitUntilDone() { - std::unique_lock<std::mutex> l(mutex_); - while (running_count_ > 0) { - bg_cv_.wait(l); - } - } - - static void DoSimulatedTask(void* arg) { - reinterpret_cast<SimulatedBackgroundTask*>(arg)->Run(); - } - - private: - const void* db_key_; - const std::string db_name_; - const void* cf_key_; - const std::string cf_name_; - const ThreadStatus::OperationType operation_type_; - const ThreadStatus::StateType state_type_; - std::mutex mutex_; - std::condition_variable bg_cv_; - bool should_run_; - std::atomic<int> running_count_; -}; - -class ThreadListTest : public testing::Test { - public: - ThreadListTest() { - } -}; - -TEST_F(ThreadListTest, GlobalTables) { - // verify the global tables for operations and states are properly indexed. - for (int type = 0; type != ThreadStatus::NUM_OP_TYPES; ++type) { - ASSERT_EQ(global_operation_table[type].type, type); - ASSERT_EQ(global_operation_table[type].name, - ThreadStatus::GetOperationName( - ThreadStatus::OperationType(type))); - } - - for (int type = 0; type != ThreadStatus::NUM_STATE_TYPES; ++type) { - ASSERT_EQ(global_state_table[type].type, type); - ASSERT_EQ(global_state_table[type].name, - ThreadStatus::GetStateName( - ThreadStatus::StateType(type))); - } - - for (int stage = 0; stage != ThreadStatus::NUM_OP_STAGES; ++stage) { - ASSERT_EQ(global_op_stage_table[stage].stage, stage); - ASSERT_EQ(global_op_stage_table[stage].name, - ThreadStatus::GetOperationStageName( - ThreadStatus::OperationStage(stage))); - } -} - -TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) { - Env* env = Env::Default(); - const int kHighPriorityThreads = 3; - const int kLowPriorityThreads = 5; - const int kSimulatedHighPriThreads = kHighPriorityThreads - 1; - const int kSimulatedLowPriThreads = kLowPriorityThreads / 3; - env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH); - env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW); - - SimulatedBackgroundTask running_task( - reinterpret_cast<void*>(1234), "running", - reinterpret_cast<void*>(5678), "pikachu"); - - for (int test = 0; test < kSimulatedHighPriThreads; ++test) { - env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, - &running_task, Env::Priority::HIGH); - } - for (int test = 0; test < kSimulatedLowPriThreads; ++test) { - env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, - &running_task, Env::Priority::LOW); - } - running_task.WaitUntilScheduled( - kSimulatedHighPriThreads + kSimulatedLowPriThreads, env); - - std::vector<ThreadStatus> thread_list; - - // Verify the number of running threads in each pool. - env->GetThreadList(&thread_list); - int running_count[ThreadStatus::NUM_THREAD_TYPES] = {0}; - for (auto thread_status : thread_list) { - if (thread_status.cf_name == "pikachu" && - thread_status.db_name == "running") { - running_count[thread_status.thread_type]++; - } - } - ASSERT_EQ( - running_count[ThreadStatus::HIGH_PRIORITY], - kSimulatedHighPriThreads); - ASSERT_EQ( - running_count[ThreadStatus::LOW_PRIORITY], - kSimulatedLowPriThreads); - ASSERT_EQ( - running_count[ThreadStatus::USER], 0); - - running_task.FinishAllTasks(); - running_task.WaitUntilDone(); - - // Verify none of the threads are running - env->GetThreadList(&thread_list); - - for (int i = 0; i < ThreadStatus::NUM_THREAD_TYPES; ++i) { - running_count[i] = 0; - } - for (auto thread_status : thread_list) { - if (thread_status.cf_name == "pikachu" && - thread_status.db_name == "running") { - running_count[thread_status.thread_type]++; - } - } - - ASSERT_EQ( - running_count[ThreadStatus::HIGH_PRIORITY], 0); - ASSERT_EQ( - running_count[ThreadStatus::LOW_PRIORITY], 0); - ASSERT_EQ( - running_count[ThreadStatus::USER], 0); -} - -namespace { - void UpdateStatusCounts( - const std::vector<ThreadStatus>& thread_list, - int operation_counts[], int state_counts[]) { - for (auto thread_status : thread_list) { - operation_counts[thread_status.operation_type]++; - state_counts[thread_status.state_type]++; - } - } - - void VerifyAndResetCounts( - const int correct_counts[], int collected_counts[], int size) { - for (int i = 0; i < size; ++i) { - ASSERT_EQ(collected_counts[i], correct_counts[i]); - collected_counts[i] = 0; - } - } - - void UpdateCount( - int operation_counts[], int from_event, int to_event, int amount) { - operation_counts[from_event] -= amount; - operation_counts[to_event] += amount; - } -} // namespace - -TEST_F(ThreadListTest, SimpleEventTest) { - Env* env = Env::Default(); - - // simulated tasks - const int kFlushWriteTasks = 3; - SimulatedBackgroundTask flush_write_task( - reinterpret_cast<void*>(1234), "running", - reinterpret_cast<void*>(5678), "pikachu", - ThreadStatus::OP_FLUSH); - - const int kCompactionWriteTasks = 4; - SimulatedBackgroundTask compaction_write_task( - reinterpret_cast<void*>(1234), "running", - reinterpret_cast<void*>(5678), "pikachu", - ThreadStatus::OP_COMPACTION); - - const int kCompactionReadTasks = 5; - SimulatedBackgroundTask compaction_read_task( - reinterpret_cast<void*>(1234), "running", - reinterpret_cast<void*>(5678), "pikachu", - ThreadStatus::OP_COMPACTION); - - const int kCompactionWaitTasks = 6; - SimulatedBackgroundTask compaction_wait_task( - reinterpret_cast<void*>(1234), "running", - reinterpret_cast<void*>(5678), "pikachu", - ThreadStatus::OP_COMPACTION); - - // setup right answers - int correct_operation_counts[ThreadStatus::NUM_OP_TYPES] = {0}; - correct_operation_counts[ThreadStatus::OP_FLUSH] = - kFlushWriteTasks; - correct_operation_counts[ThreadStatus::OP_COMPACTION] = - kCompactionWriteTasks + kCompactionReadTasks + kCompactionWaitTasks; - - env->SetBackgroundThreads( - correct_operation_counts[ThreadStatus::OP_FLUSH], Env::HIGH); - env->SetBackgroundThreads( - correct_operation_counts[ThreadStatus::OP_COMPACTION], Env::LOW); - - // schedule the simulated tasks - for (int t = 0; t < kFlushWriteTasks; ++t) { - env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, - &flush_write_task, Env::Priority::HIGH); - } - flush_write_task.WaitUntilScheduled(kFlushWriteTasks, env); - - for (int t = 0; t < kCompactionWriteTasks; ++t) { - env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, - &compaction_write_task, Env::Priority::LOW); - } - compaction_write_task.WaitUntilScheduled(kCompactionWriteTasks, env); - - for (int t = 0; t < kCompactionReadTasks; ++t) { - env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, - &compaction_read_task, Env::Priority::LOW); - } - compaction_read_task.WaitUntilScheduled(kCompactionReadTasks, env); - - for (int t = 0; t < kCompactionWaitTasks; ++t) { - env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, - &compaction_wait_task, Env::Priority::LOW); - } - compaction_wait_task.WaitUntilScheduled(kCompactionWaitTasks, env); - - // verify the thread-status - int operation_counts[ThreadStatus::NUM_OP_TYPES] = {0}; - int state_counts[ThreadStatus::NUM_STATE_TYPES] = {0}; - - std::vector<ThreadStatus> thread_list; - env->GetThreadList(&thread_list); - UpdateStatusCounts(thread_list, operation_counts, state_counts); - VerifyAndResetCounts(correct_operation_counts, operation_counts, - ThreadStatus::NUM_OP_TYPES); - - // terminate compaction-wait tasks and see if the thread-status - // reflects this update - compaction_wait_task.FinishAllTasks(); - compaction_wait_task.WaitUntilDone(); - UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION, - ThreadStatus::OP_UNKNOWN, kCompactionWaitTasks); - - env->GetThreadList(&thread_list); - UpdateStatusCounts(thread_list, operation_counts, state_counts); - VerifyAndResetCounts(correct_operation_counts, operation_counts, - ThreadStatus::NUM_OP_TYPES); - - // terminate flush-write tasks and see if the thread-status - // reflects this update - flush_write_task.FinishAllTasks(); - flush_write_task.WaitUntilDone(); - UpdateCount(correct_operation_counts, ThreadStatus::OP_FLUSH, - ThreadStatus::OP_UNKNOWN, kFlushWriteTasks); - - env->GetThreadList(&thread_list); - UpdateStatusCounts(thread_list, operation_counts, state_counts); - VerifyAndResetCounts(correct_operation_counts, operation_counts, - ThreadStatus::NUM_OP_TYPES); - - // terminate compaction-write tasks and see if the thread-status - // reflects this update - compaction_write_task.FinishAllTasks(); - compaction_write_task.WaitUntilDone(); - UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION, - ThreadStatus::OP_UNKNOWN, kCompactionWriteTasks); - - env->GetThreadList(&thread_list); - UpdateStatusCounts(thread_list, operation_counts, state_counts); - VerifyAndResetCounts(correct_operation_counts, operation_counts, - ThreadStatus::NUM_OP_TYPES); - - // terminate compaction-write tasks and see if the thread-status - // reflects this update - compaction_read_task.FinishAllTasks(); - compaction_read_task.WaitUntilDone(); - UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION, - ThreadStatus::OP_UNKNOWN, kCompactionReadTasks); - - env->GetThreadList(&thread_list); - UpdateStatusCounts(thread_list, operation_counts, state_counts); - VerifyAndResetCounts(correct_operation_counts, operation_counts, - ThreadStatus::NUM_OP_TYPES); -} - -} // namespace rocksdb - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} - -#else - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - return 0; -} - -#endif // ROCKSDB_USING_THREAD_STATUS
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d3a13a49/thirdparty/rocksdb/util/thread_local_test.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/util/thread_local_test.cc b/thirdparty/rocksdb/util/thread_local_test.cc deleted file mode 100644 index 6fee5ea..0000000 --- a/thirdparty/rocksdb/util/thread_local_test.cc +++ /dev/null @@ -1,582 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). - -#include <thread> -#include <atomic> -#include <string> - -#include "rocksdb/env.h" -#include "port/port.h" -#include "util/autovector.h" -#include "util/sync_point.h" -#include "util/testharness.h" -#include "util/testutil.h" -#include "util/thread_local.h" - -namespace rocksdb { - -class ThreadLocalTest : public testing::Test { - public: - ThreadLocalTest() : env_(Env::Default()) {} - - Env* env_; -}; - -namespace { - -struct Params { - Params(port::Mutex* m, port::CondVar* c, int* u, int n, - UnrefHandler handler = nullptr) - : mu(m), - cv(c), - unref(u), - total(n), - started(0), - completed(0), - doWrite(false), - tls1(handler), - tls2(nullptr) {} - - port::Mutex* mu; - port::CondVar* cv; - int* unref; - int total; - int started; - int completed; - bool doWrite; - ThreadLocalPtr tls1; - ThreadLocalPtr* tls2; -}; - -class IDChecker : public ThreadLocalPtr { -public: - static uint32_t PeekId() { - return TEST_PeekId(); - } -}; - -} // anonymous namespace - -// Suppress false positive clang analyzer warnings. -#ifndef __clang_analyzer__ -TEST_F(ThreadLocalTest, UniqueIdTest) { - port::Mutex mu; - port::CondVar cv(&mu); - - ASSERT_EQ(IDChecker::PeekId(), 0u); - // New ThreadLocal instance bumps id by 1 - { - // Id used 0 - Params p1(&mu, &cv, nullptr, 1u); - ASSERT_EQ(IDChecker::PeekId(), 1u); - // Id used 1 - Params p2(&mu, &cv, nullptr, 1u); - ASSERT_EQ(IDChecker::PeekId(), 2u); - // Id used 2 - Params p3(&mu, &cv, nullptr, 1u); - ASSERT_EQ(IDChecker::PeekId(), 3u); - // Id used 3 - Params p4(&mu, &cv, nullptr, 1u); - ASSERT_EQ(IDChecker::PeekId(), 4u); - } - // id 3, 2, 1, 0 are in the free queue in order - ASSERT_EQ(IDChecker::PeekId(), 0u); - - // pick up 0 - Params p1(&mu, &cv, nullptr, 1u); - ASSERT_EQ(IDChecker::PeekId(), 1u); - // pick up 1 - Params* p2 = new Params(&mu, &cv, nullptr, 1u); - ASSERT_EQ(IDChecker::PeekId(), 2u); - // pick up 2 - Params p3(&mu, &cv, nullptr, 1u); - ASSERT_EQ(IDChecker::PeekId(), 3u); - // return up 1 - delete p2; - ASSERT_EQ(IDChecker::PeekId(), 1u); - // Now we have 3, 1 in queue - // pick up 1 - Params p4(&mu, &cv, nullptr, 1u); - ASSERT_EQ(IDChecker::PeekId(), 3u); - // pick up 3 - Params p5(&mu, &cv, nullptr, 1u); - // next new id - ASSERT_EQ(IDChecker::PeekId(), 4u); - // After exit, id sequence in queue: - // 3, 1, 2, 0 -} -#endif // __clang_analyzer__ - -TEST_F(ThreadLocalTest, SequentialReadWriteTest) { - // global id list carries over 3, 1, 2, 0 - ASSERT_EQ(IDChecker::PeekId(), 0u); - - port::Mutex mu; - port::CondVar cv(&mu); - Params p(&mu, &cv, nullptr, 1); - ThreadLocalPtr tls2; - p.tls2 = &tls2; - - auto func = [](void* ptr) { - auto& params = *static_cast<Params*>(ptr); - - ASSERT_TRUE(params.tls1.Get() == nullptr); - params.tls1.Reset(reinterpret_cast<int*>(1)); - ASSERT_TRUE(params.tls1.Get() == reinterpret_cast<int*>(1)); - params.tls1.Reset(reinterpret_cast<int*>(2)); - ASSERT_TRUE(params.tls1.Get() == reinterpret_cast<int*>(2)); - - ASSERT_TRUE(params.tls2->Get() == nullptr); - params.tls2->Reset(reinterpret_cast<int*>(1)); - ASSERT_TRUE(params.tls2->Get() == reinterpret_cast<int*>(1)); - params.tls2->Reset(reinterpret_cast<int*>(2)); - ASSERT_TRUE(params.tls2->Get() == reinterpret_cast<int*>(2)); - - params.mu->Lock(); - ++(params.completed); - params.cv->SignalAll(); - params.mu->Unlock(); - }; - - for (int iter = 0; iter < 1024; ++iter) { - ASSERT_EQ(IDChecker::PeekId(), 1u); - // Another new thread, read/write should not see value from previous thread - env_->StartThread(func, static_cast<void*>(&p)); - mu.Lock(); - while (p.completed != iter + 1) { - cv.Wait(); - } - mu.Unlock(); - ASSERT_EQ(IDChecker::PeekId(), 1u); - } -} - -TEST_F(ThreadLocalTest, ConcurrentReadWriteTest) { - // global id list carries over 3, 1, 2, 0 - ASSERT_EQ(IDChecker::PeekId(), 0u); - - ThreadLocalPtr tls2; - port::Mutex mu1; - port::CondVar cv1(&mu1); - Params p1(&mu1, &cv1, nullptr, 16); - p1.tls2 = &tls2; - - port::Mutex mu2; - port::CondVar cv2(&mu2); - Params p2(&mu2, &cv2, nullptr, 16); - p2.doWrite = true; - p2.tls2 = &tls2; - - auto func = [](void* ptr) { - auto& p = *static_cast<Params*>(ptr); - - p.mu->Lock(); - // Size_T switches size along with the ptr size - // we want to cast to. - size_t own = ++(p.started); - p.cv->SignalAll(); - while (p.started != p.total) { - p.cv->Wait(); - } - p.mu->Unlock(); - - // Let write threads write a different value from the read threads - if (p.doWrite) { - own += 8192; - } - - ASSERT_TRUE(p.tls1.Get() == nullptr); - ASSERT_TRUE(p.tls2->Get() == nullptr); - - auto* env = Env::Default(); - auto start = env->NowMicros(); - - p.tls1.Reset(reinterpret_cast<size_t*>(own)); - p.tls2->Reset(reinterpret_cast<size_t*>(own + 1)); - // Loop for 1 second - while (env->NowMicros() - start < 1000 * 1000) { - for (int iter = 0; iter < 100000; ++iter) { - ASSERT_TRUE(p.tls1.Get() == reinterpret_cast<size_t*>(own)); - ASSERT_TRUE(p.tls2->Get() == reinterpret_cast<size_t*>(own + 1)); - if (p.doWrite) { - p.tls1.Reset(reinterpret_cast<size_t*>(own)); - p.tls2->Reset(reinterpret_cast<size_t*>(own + 1)); - } - } - } - - p.mu->Lock(); - ++(p.completed); - p.cv->SignalAll(); - p.mu->Unlock(); - }; - - // Initiate 2 instnaces: one keeps writing and one keeps reading. - // The read instance should not see data from the write instance. - // Each thread local copy of the value are also different from each - // other. - for (int th = 0; th < p1.total; ++th) { - env_->StartThread(func, static_cast<void*>(&p1)); - } - for (int th = 0; th < p2.total; ++th) { - env_->StartThread(func, static_cast<void*>(&p2)); - } - - mu1.Lock(); - while (p1.completed != p1.total) { - cv1.Wait(); - } - mu1.Unlock(); - - mu2.Lock(); - while (p2.completed != p2.total) { - cv2.Wait(); - } - mu2.Unlock(); - - ASSERT_EQ(IDChecker::PeekId(), 3u); -} - -TEST_F(ThreadLocalTest, Unref) { - ASSERT_EQ(IDChecker::PeekId(), 0u); - - auto unref = [](void* ptr) { - auto& p = *static_cast<Params*>(ptr); - p.mu->Lock(); - ++(*p.unref); - p.mu->Unlock(); - }; - - // Case 0: no unref triggered if ThreadLocalPtr is never accessed - auto func0 = [](void* ptr) { - auto& p = *static_cast<Params*>(ptr); - - p.mu->Lock(); - ++(p.started); - p.cv->SignalAll(); - while (p.started != p.total) { - p.cv->Wait(); - } - p.mu->Unlock(); - }; - - for (int th = 1; th <= 128; th += th) { - port::Mutex mu; - port::CondVar cv(&mu); - int unref_count = 0; - Params p(&mu, &cv, &unref_count, th, unref); - - for (int i = 0; i < p.total; ++i) { - env_->StartThread(func0, static_cast<void*>(&p)); - } - env_->WaitForJoin(); - ASSERT_EQ(unref_count, 0); - } - - // Case 1: unref triggered by thread exit - auto func1 = [](void* ptr) { - auto& p = *static_cast<Params*>(ptr); - - p.mu->Lock(); - ++(p.started); - p.cv->SignalAll(); - while (p.started != p.total) { - p.cv->Wait(); - } - p.mu->Unlock(); - - ASSERT_TRUE(p.tls1.Get() == nullptr); - ASSERT_TRUE(p.tls2->Get() == nullptr); - - p.tls1.Reset(ptr); - p.tls2->Reset(ptr); - - p.tls1.Reset(ptr); - p.tls2->Reset(ptr); - }; - - for (int th = 1; th <= 128; th += th) { - port::Mutex mu; - port::CondVar cv(&mu); - int unref_count = 0; - ThreadLocalPtr tls2(unref); - Params p(&mu, &cv, &unref_count, th, unref); - p.tls2 = &tls2; - - for (int i = 0; i < p.total; ++i) { - env_->StartThread(func1, static_cast<void*>(&p)); - } - - env_->WaitForJoin(); - - // N threads x 2 ThreadLocal instance cleanup on thread exit - ASSERT_EQ(unref_count, 2 * p.total); - } - - // Case 2: unref triggered by ThreadLocal instance destruction - auto func2 = [](void* ptr) { - auto& p = *static_cast<Params*>(ptr); - - p.mu->Lock(); - ++(p.started); - p.cv->SignalAll(); - while (p.started != p.total) { - p.cv->Wait(); - } - p.mu->Unlock(); - - ASSERT_TRUE(p.tls1.Get() == nullptr); - ASSERT_TRUE(p.tls2->Get() == nullptr); - - p.tls1.Reset(ptr); - p.tls2->Reset(ptr); - - p.tls1.Reset(ptr); - p.tls2->Reset(ptr); - - p.mu->Lock(); - ++(p.completed); - p.cv->SignalAll(); - - // Waiting for instruction to exit thread - while (p.completed != 0) { - p.cv->Wait(); - } - p.mu->Unlock(); - }; - - for (int th = 1; th <= 128; th += th) { - port::Mutex mu; - port::CondVar cv(&mu); - int unref_count = 0; - Params p(&mu, &cv, &unref_count, th, unref); - p.tls2 = new ThreadLocalPtr(unref); - - for (int i = 0; i < p.total; ++i) { - env_->StartThread(func2, static_cast<void*>(&p)); - } - - // Wait for all threads to finish using Params - mu.Lock(); - while (p.completed != p.total) { - cv.Wait(); - } - mu.Unlock(); - - // Now destroy one ThreadLocal instance - delete p.tls2; - p.tls2 = nullptr; - // instance destroy for N threads - ASSERT_EQ(unref_count, p.total); - - // Signal to exit - mu.Lock(); - p.completed = 0; - cv.SignalAll(); - mu.Unlock(); - env_->WaitForJoin(); - // additional N threads exit unref for the left instance - ASSERT_EQ(unref_count, 2 * p.total); - } -} - -TEST_F(ThreadLocalTest, Swap) { - ThreadLocalPtr tls; - tls.Reset(reinterpret_cast<void*>(1)); - ASSERT_EQ(reinterpret_cast<int64_t>(tls.Swap(nullptr)), 1); - ASSERT_TRUE(tls.Swap(reinterpret_cast<void*>(2)) == nullptr); - ASSERT_EQ(reinterpret_cast<int64_t>(tls.Get()), 2); - ASSERT_EQ(reinterpret_cast<int64_t>(tls.Swap(reinterpret_cast<void*>(3))), 2); -} - -TEST_F(ThreadLocalTest, Scrape) { - auto unref = [](void* ptr) { - auto& p = *static_cast<Params*>(ptr); - p.mu->Lock(); - ++(*p.unref); - p.mu->Unlock(); - }; - - auto func = [](void* ptr) { - auto& p = *static_cast<Params*>(ptr); - - ASSERT_TRUE(p.tls1.Get() == nullptr); - ASSERT_TRUE(p.tls2->Get() == nullptr); - - p.tls1.Reset(ptr); - p.tls2->Reset(ptr); - - p.tls1.Reset(ptr); - p.tls2->Reset(ptr); - - p.mu->Lock(); - ++(p.completed); - p.cv->SignalAll(); - - // Waiting for instruction to exit thread - while (p.completed != 0) { - p.cv->Wait(); - } - p.mu->Unlock(); - }; - - for (int th = 1; th <= 128; th += th) { - port::Mutex mu; - port::CondVar cv(&mu); - int unref_count = 0; - Params p(&mu, &cv, &unref_count, th, unref); - p.tls2 = new ThreadLocalPtr(unref); - - for (int i = 0; i < p.total; ++i) { - env_->StartThread(func, static_cast<void*>(&p)); - } - - // Wait for all threads to finish using Params - mu.Lock(); - while (p.completed != p.total) { - cv.Wait(); - } - mu.Unlock(); - - ASSERT_EQ(unref_count, 0); - - // Scrape all thread local data. No unref at thread - // exit or ThreadLocalPtr destruction - autovector<void*> ptrs; - p.tls1.Scrape(&ptrs, nullptr); - p.tls2->Scrape(&ptrs, nullptr); - delete p.tls2; - // Signal to exit - mu.Lock(); - p.completed = 0; - cv.SignalAll(); - mu.Unlock(); - env_->WaitForJoin(); - - ASSERT_EQ(unref_count, 0); - } -} - -TEST_F(ThreadLocalTest, Fold) { - auto unref = [](void* ptr) { - delete static_cast<std::atomic<int64_t>*>(ptr); - }; - static const int kNumThreads = 16; - static const int kItersPerThread = 10; - port::Mutex mu; - port::CondVar cv(&mu); - Params params(&mu, &cv, nullptr, kNumThreads, unref); - auto func = [](void* ptr) { - auto& p = *static_cast<Params*>(ptr); - ASSERT_TRUE(p.tls1.Get() == nullptr); - p.tls1.Reset(new std::atomic<int64_t>(0)); - - for (int i = 0; i < kItersPerThread; ++i) { - static_cast<std::atomic<int64_t>*>(p.tls1.Get())->fetch_add(1); - } - - p.mu->Lock(); - ++(p.completed); - p.cv->SignalAll(); - - // Waiting for instruction to exit thread - while (p.completed != 0) { - p.cv->Wait(); - } - p.mu->Unlock(); - }; - - for (int th = 0; th < params.total; ++th) { - env_->StartThread(func, static_cast<void*>(¶ms)); - } - - // Wait for all threads to finish using Params - mu.Lock(); - while (params.completed != params.total) { - cv.Wait(); - } - mu.Unlock(); - - // Verify Fold() behavior - int64_t sum = 0; - params.tls1.Fold( - [](void* ptr, void* res) { - auto sum_ptr = static_cast<int64_t*>(res); - *sum_ptr += static_cast<std::atomic<int64_t>*>(ptr)->load(); - }, - &sum); - ASSERT_EQ(sum, kNumThreads * kItersPerThread); - - // Signal to exit - mu.Lock(); - params.completed = 0; - cv.SignalAll(); - mu.Unlock(); - env_->WaitForJoin(); -} - -TEST_F(ThreadLocalTest, CompareAndSwap) { - ThreadLocalPtr tls; - ASSERT_TRUE(tls.Swap(reinterpret_cast<void*>(1)) == nullptr); - void* expected = reinterpret_cast<void*>(1); - // Swap in 2 - ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast<void*>(2), expected)); - expected = reinterpret_cast<void*>(100); - // Fail Swap, still 2 - ASSERT_TRUE(!tls.CompareAndSwap(reinterpret_cast<void*>(2), expected)); - ASSERT_EQ(expected, reinterpret_cast<void*>(2)); - // Swap in 3 - expected = reinterpret_cast<void*>(2); - ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast<void*>(3), expected)); - ASSERT_EQ(tls.Get(), reinterpret_cast<void*>(3)); -} - -namespace { - -void* AccessThreadLocal(void* arg) { - TEST_SYNC_POINT("AccessThreadLocal:Start"); - ThreadLocalPtr tlp; - tlp.Reset(new std::string("hello RocksDB")); - TEST_SYNC_POINT("AccessThreadLocal:End"); - return nullptr; -} - -} // namespace - -// The following test is disabled as it requires manual steps to run it -// correctly. -// -// Currently we have no way to acess SyncPoint w/o ASAN error when the -// child thread dies after the main thread dies. So if you manually enable -// this test and only see an ASAN error on SyncPoint, it means you pass the -// test. -TEST_F(ThreadLocalTest, DISABLED_MainThreadDiesFirst) { - rocksdb::SyncPoint::GetInstance()->LoadDependency( - {{"AccessThreadLocal:Start", "MainThreadDiesFirst:End"}, - {"PosixEnv::~PosixEnv():End", "AccessThreadLocal:End"}}); - - // Triggers the initialization of singletons. - Env::Default(); - -#ifndef ROCKSDB_LITE - try { -#endif // ROCKSDB_LITE - rocksdb::port::Thread th(&AccessThreadLocal, nullptr); - th.detach(); - TEST_SYNC_POINT("MainThreadDiesFirst:End"); -#ifndef ROCKSDB_LITE - } catch (const std::system_error& ex) { - std::cerr << "Start thread: " << ex.code() << std::endl; - FAIL(); - } -#endif // ROCKSDB_LITE -} - -} // namespace rocksdb - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d3a13a49/thirdparty/rocksdb/util/timer_queue_test.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/util/timer_queue_test.cc b/thirdparty/rocksdb/util/timer_queue_test.cc deleted file mode 100644 index 5f5f08f..0000000 --- a/thirdparty/rocksdb/util/timer_queue_test.cc +++ /dev/null @@ -1,72 +0,0 @@ -// Portions Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). - -// borrowed from -// http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/ -// Timer Queue -// -// License -// -// The source code in this article is licensed under the CC0 license, so feel -// free -// to copy, modify, share, do whatever you want with it. -// No attribution is required, but Ill be happy if you do. -// CC0 license - -// The person who associated a work with this deed has dedicated the work to the -// public domain by waiving all of his or her rights to the work worldwide -// under copyright law, including all related and neighboring rights, to the -// extent allowed by law. You can copy, modify, distribute and perform the -// work, even for -// commercial purposes, all without asking permission. See Other Information -// below. -// - -#include "util/timer_queue.h" -#include <future> - -namespace Timing { - -using Clock = std::chrono::high_resolution_clock; -double now() { - static auto start = Clock::now(); - return std::chrono::duration<double, std::milli>(Clock::now() - start) - .count(); -} - -} // namespace Timing - -int main() { - TimerQueue q; - - double tnow = Timing::now(); - - q.add(10000, [tnow](bool aborted) mutable { - printf("T 1: %d, Elapsed %4.2fms\n", aborted, Timing::now() - tnow); - return std::make_pair(false, 0); - }); - q.add(10001, [tnow](bool aborted) mutable { - printf("T 2: %d, Elapsed %4.2fms\n", aborted, Timing::now() - tnow); - return std::make_pair(false, 0); - }); - - q.add(1000, [tnow](bool aborted) mutable { - printf("T 3: %d, Elapsed %4.2fms\n", aborted, Timing::now() - tnow); - return std::make_pair(!aborted, 1000); - }); - - auto id = q.add(2000, [tnow](bool aborted) mutable { - printf("T 4: %d, Elapsed %4.2fms\n", aborted, Timing::now() - tnow); - return std::make_pair(!aborted, 2000); - }); - - (void)id; - // auto ret = q.cancel(id); - // assert(ret == 1); - // q.cancelAll(); - - return 0; -} -////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d3a13a49/thirdparty/rocksdb/util/transaction_test_util.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/util/transaction_test_util.cc b/thirdparty/rocksdb/util/transaction_test_util.cc deleted file mode 100644 index 0d6948b..0000000 --- a/thirdparty/rocksdb/util/transaction_test_util.cc +++ /dev/null @@ -1,246 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). -#ifndef ROCKSDB_LITE - -#ifndef __STDC_FORMAT_MACROS -#define __STDC_FORMAT_MACROS -#endif - -#include "util/transaction_test_util.h" - -#include <inttypes.h> -#include <string> -#include <thread> - -#include "rocksdb/db.h" -#include "rocksdb/utilities/optimistic_transaction_db.h" -#include "rocksdb/utilities/transaction.h" -#include "rocksdb/utilities/transaction_db.h" -#include "util/random.h" -#include "util/string_util.h" - -namespace rocksdb { - -RandomTransactionInserter::RandomTransactionInserter( - Random64* rand, const WriteOptions& write_options, - const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets) - : rand_(rand), - write_options_(write_options), - read_options_(read_options), - num_keys_(num_keys), - num_sets_(num_sets), - txn_id_(0) {} - -RandomTransactionInserter::~RandomTransactionInserter() { - if (txn_ != nullptr) { - delete txn_; - } - if (optimistic_txn_ != nullptr) { - delete optimistic_txn_; - } -} - -bool RandomTransactionInserter::TransactionDBInsert( - TransactionDB* db, const TransactionOptions& txn_options) { - txn_ = db->BeginTransaction(write_options_, txn_options, txn_); - - return DoInsert(nullptr, txn_, false); -} - -bool RandomTransactionInserter::OptimisticTransactionDBInsert( - OptimisticTransactionDB* db, - const OptimisticTransactionOptions& txn_options) { - optimistic_txn_ = - db->BeginTransaction(write_options_, txn_options, optimistic_txn_); - - return DoInsert(nullptr, optimistic_txn_, true); -} - -bool RandomTransactionInserter::DBInsert(DB* db) { - return DoInsert(db, nullptr, false); -} - -bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, - bool is_optimistic) { - Status s; - WriteBatch batch; - std::string value; - - // pick a random number to use to increment a key in each set - uint64_t incr = (rand_->Next() % 100) + 1; - - bool unexpected_error = false; - - // For each set, pick a key at random and increment it - for (uint8_t i = 0; i < num_sets_; i++) { - uint64_t int_value = 0; - char prefix_buf[5]; - // prefix_buf needs to be large enough to hold a uint16 in string form - - // key format: [SET#][random#] - std::string rand_key = ToString(rand_->Next() % num_keys_); - Slice base_key(rand_key); - - // Pad prefix appropriately so we can iterate over each set - snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", i + 1); - std::string full_key = std::string(prefix_buf) + base_key.ToString(); - Slice key(full_key); - - if (txn != nullptr) { - s = txn->GetForUpdate(read_options_, key, &value); - } else { - s = db->Get(read_options_, key, &value); - } - - if (s.ok()) { - // Found key, parse its value - int_value = std::stoull(value); - - if (int_value == 0 || int_value == ULONG_MAX) { - unexpected_error = true; - fprintf(stderr, "Get returned unexpected value: %s\n", value.c_str()); - s = Status::Corruption(); - } - } else if (s.IsNotFound()) { - // Have not yet written to this key, so assume its value is 0 - int_value = 0; - s = Status::OK(); - } else { - // Optimistic transactions should never return non-ok status here. - // Non-optimistic transactions may return write-coflict/timeout errors. - if (is_optimistic || !(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { - fprintf(stderr, "Get returned an unexpected error: %s\n", - s.ToString().c_str()); - unexpected_error = true; - } - break; - } - - if (s.ok()) { - // Increment key - std::string sum = ToString(int_value + incr); - if (txn != nullptr) { - s = txn->Put(key, sum); - if (!s.ok()) { - // Since we did a GetForUpdate, Put should not fail. - fprintf(stderr, "Put returned an unexpected error: %s\n", - s.ToString().c_str()); - unexpected_error = true; - } - } else { - batch.Put(key, sum); - } - } - } - - if (s.ok()) { - if (txn != nullptr) { - std::hash<std::thread::id> hasher; - char name[64]; - snprintf(name, 64, "txn%zu-%d", hasher(std::this_thread::get_id()), - txn_id_++); - assert(strlen(name) < 64 - 1); - txn->SetName(name); - s = txn->Prepare(); - s = txn->Commit(); - - if (!s.ok()) { - if (is_optimistic) { - // Optimistic transactions can have write-conflict errors on commit. - // Any other error is unexpected. - if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { - unexpected_error = true; - } - } else { - // Non-optimistic transactions should only fail due to expiration - // or write failures. For testing purproses, we do not expect any - // write failures. - if (!s.IsExpired()) { - unexpected_error = true; - } - } - - if (unexpected_error) { - fprintf(stderr, "Commit returned an unexpected error: %s\n", - s.ToString().c_str()); - } - } - - } else { - s = db->Write(write_options_, &batch); - if (!s.ok()) { - unexpected_error = true; - fprintf(stderr, "Write returned an unexpected error: %s\n", - s.ToString().c_str()); - } - } - } else { - if (txn != nullptr) { - txn->Rollback(); - } - } - - if (s.ok()) { - success_count_++; - } else { - failure_count_++; - } - - last_status_ = s; - - // return success if we didn't get any unexpected errors - return !unexpected_error; -} - -Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets) { - uint64_t prev_total = 0; - - // For each set of keys with the same prefix, sum all the values - for (uint32_t i = 0; i < num_sets; i++) { - char prefix_buf[6]; - snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", i + 1); - uint64_t total = 0; - - Iterator* iter = db->NewIterator(ReadOptions()); - - for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) { - Slice key = iter->key(); - - // stop when we reach a different prefix - if (key.ToString().compare(0, 4, prefix_buf) != 0) { - break; - } - - Slice value = iter->value(); - uint64_t int_value = std::stoull(value.ToString()); - if (int_value == 0 || int_value == ULONG_MAX) { - fprintf(stderr, "Iter returned unexpected value: %s\n", - value.ToString().c_str()); - return Status::Corruption(); - } - - total += int_value; - } - delete iter; - - if (i > 0) { - if (total != prev_total) { - fprintf(stderr, - "RandomTransactionVerify found inconsistent totals. " - "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 - " \n", - i - 1, prev_total, i, total); - return Status::Corruption(); - } - } - prev_total = total; - } - - return Status::OK(); -} - -} // namespace rocksdb - -#endif // ROCKSDB_LITE http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/d3a13a49/thirdparty/rocksdb/util/transaction_test_util.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/util/transaction_test_util.h b/thirdparty/rocksdb/util/transaction_test_util.h deleted file mode 100644 index 8805490..0000000 --- a/thirdparty/rocksdb/util/transaction_test_util.h +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). - -#pragma once - -#ifndef ROCKSDB_LITE - -#include "rocksdb/options.h" -#include "port/port.h" -#include "rocksdb/utilities/optimistic_transaction_db.h" -#include "rocksdb/utilities/transaction_db.h" - -namespace rocksdb { - -class DB; -class Random64; - -// Utility class for stress testing transactions. Can be used to write many -// transactions in parallel and then validate that the data written is logically -// consistent. This class assumes the input DB is initially empty. -// -// Each call to TransactionDBInsert()/OptimisticTransactionDBInsert() will -// increment the value of a key in #num_sets sets of keys. Regardless of -// whether the transaction succeeds, the total sum of values of keys in each -// set is an invariant that should remain equal. -// -// After calling TransactionDBInsert()/OptimisticTransactionDBInsert() many -// times, Verify() can be called to validate that the invariant holds. -// -// To test writing Transaction in parallel, multiple threads can create a -// RandomTransactionInserter with similar arguments using the same DB. -class RandomTransactionInserter { - public: - // num_keys is the number of keys in each set. - // num_sets is the number of sets of keys. - explicit RandomTransactionInserter( - Random64* rand, const WriteOptions& write_options = WriteOptions(), - const ReadOptions& read_options = ReadOptions(), uint64_t num_keys = 1000, - uint16_t num_sets = 3); - - ~RandomTransactionInserter(); - - // Increment a key in each set using a Transaction on a TransactionDB. - // - // Returns true if the transaction succeeded OR if any error encountered was - // expected (eg a write-conflict). Error status may be obtained by calling - // GetLastStatus(); - bool TransactionDBInsert( - TransactionDB* db, - const TransactionOptions& txn_options = TransactionOptions()); - - // Increment a key in each set using a Transaction on an - // OptimisticTransactionDB - // - // Returns true if the transaction succeeded OR if any error encountered was - // expected (eg a write-conflict). Error status may be obtained by calling - // GetLastStatus(); - bool OptimisticTransactionDBInsert( - OptimisticTransactionDB* db, - const OptimisticTransactionOptions& txn_options = - OptimisticTransactionOptions()); - // Increment a key in each set without using a transaction. If this function - // is called in parallel, then Verify() may fail. - // - // Returns true if the write succeeds. - // Error status may be obtained by calling GetLastStatus(). - bool DBInsert(DB* db); - - // Returns OK if Invariant is true. - static Status Verify(DB* db, uint16_t num_sets); - - // Returns the status of the previous Insert operation - Status GetLastStatus() { return last_status_; } - - // Returns the number of successfully written calls to - // TransactionDBInsert/OptimisticTransactionDBInsert/DBInsert - uint64_t GetSuccessCount() { return success_count_; } - - // Returns the number of calls to - // TransactionDBInsert/OptimisticTransactionDBInsert/DBInsert that did not - // write any data. - uint64_t GetFailureCount() { return failure_count_; } - - private: - // Input options - Random64* rand_; - const WriteOptions write_options_; - const ReadOptions read_options_; - const uint64_t num_keys_; - const uint16_t num_sets_; - - // Number of successful insert batches performed - uint64_t success_count_ = 0; - - // Number of failed insert batches attempted - uint64_t failure_count_ = 0; - - // Status returned by most recent insert operation - Status last_status_; - - // optimization: re-use allocated transaction objects. - Transaction* txn_ = nullptr; - Transaction* optimistic_txn_ = nullptr; - - std::atomic<int> txn_id_; - - bool DoInsert(DB* db, Transaction* txn, bool is_optimistic); -}; - -} // namespace rocksdb - -#endif // ROCKSDB_LITE