This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 926cac023 KUDU-3557 Simple way to change maximum thread cache size of 
tcmalloc.
926cac023 is described below

commit 926cac023832f29fc2f51c161c03aeea9f3d7036
Author: 宋家成 <songjiach...@thinkingdata.cn>
AuthorDate: Tue Feb 27 16:05:06 2024 +0800

    KUDU-3557 Simple way to change maximum thread cache size of tcmalloc.
    
    Now, Lock contention is too high in tcmalloc while the cluster in
    under high throughput. Making the total thread cache size larger
    could highly decrease the lock contention. Even though the max size
    of one single thread cache could not be greater than 4096KB at the
    first time, but it could request more from unclaimed cache space
    or steal from other thread cache each time it's doing garbage
    collection. For more details about tcmalloc, please check document:
    https://gperftools.github.io/gperftools/tcmalloc.html
    
    KUDU-3557 shows the difference on lock contention while setting
    different sizes of max_total_thread_cache_bytes. Please check it
    for details.
    
    Change-Id: I3cb8c6ed6a8f24c63258ae65f8b841fe74b75308
    Reviewed-on: http://gerrit.cloudera.org:8080/21076
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Alexey Serbin <ale...@apache.org>
---
 src/kudu/integration-tests/memory_gc-itest.cc | 114 +++++++++++++++++++++++++-
 src/kudu/server/server_base.cc                |  33 ++++++++
 2 files changed, 143 insertions(+), 4 deletions(-)

diff --git a/src/kudu/integration-tests/memory_gc-itest.cc 
b/src/kudu/integration-tests/memory_gc-itest.cc
index 0525085bd..5bebd1d57 100644
--- a/src/kudu/integration-tests/memory_gc-itest.cc
+++ b/src/kudu/integration-tests/memory_gc-itest.cc
@@ -18,6 +18,7 @@
 #include <cstdint>
 #include <functional>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <utility>
 #include <vector>
@@ -41,7 +42,8 @@ using std::vector;
 METRIC_DECLARE_entity(server);
 METRIC_DECLARE_gauge_uint64(generic_current_allocated_bytes);
 METRIC_DECLARE_gauge_uint64(tcmalloc_pageheap_free_bytes);
-
+METRIC_DECLARE_gauge_uint64(spinlock_contention_time);
+METRIC_DECLARE_gauge_uint64(tcmalloc_max_total_thread_cache_bytes);
 namespace kudu {
 
 using cluster::ExternalMiniClusterOptions;
@@ -87,7 +89,7 @@ TEST_F(MemoryGcITest, TestPeriodicGc) {
   ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(2),
                               "gc_tcmalloc_memory_interval_seconds", "0"));
 
-  // Write some data for scan later.
+  // Write some data to be scanned later on.
   {
     TestWorkload workload(cluster_.get());
     workload.set_num_tablets(60);
@@ -103,14 +105,14 @@ TEST_F(MemoryGcITest, TestPeriodicGc) {
     workload.StopAndJoin();
   }
 
-  // Start scan, then more memory will be allocated by tcmalloc.
+  // Additional memory is allocated during scan operations below.
   {
     TestWorkload workload(cluster_.get());
     workload.set_num_write_threads(0);
     workload.set_num_read_threads(8);
     workload.Setup();
     workload.Start();
-    // Sleep a long time to ensure memory consumed more.
+    // Run the scan workload for a long time to let it allocate/deallocate a 
lot of memory.
     SleepFor(MonoDelta::FromSeconds(8));
     workload.StopAndJoin();
   }
@@ -129,4 +131,108 @@ TEST_F(MemoryGcITest, TestPeriodicGc) {
   });
 }
 
+// Test if the lock contention decreases if increasing the flag
+// tcmalloc_max_total_thread_cache_bytes.
+TEST_F(MemoryGcITest, TestLockContentionInVariousThreadCacheSize) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  ExternalMiniClusterOptions opts;
+  // Set the max_total_thread_cache_bytes to 1MB.
+  vector<string> ts_flags;
+  ts_flags.emplace_back("--tcmalloc_max_total_thread_cache_bytes=1048576");
+  opts.num_tablet_servers = 3;
+  opts.extra_tserver_flags = std::move(ts_flags);
+  NO_FATALS(StartClusterWithOpts(opts));
+
+  // Set --max_total_thread_cache_bytes to 8MB for the second tablet server.
+  auto *ts = cluster_->tablet_server(1);
+  
ts->mutable_flags()->emplace_back("--tcmalloc_max_total_thread_cache_bytes=8388608");
+  ts->Shutdown();
+  ASSERT_OK(ts->Restart());
+
+  // Set --max_total_thread_cache_bytes to 64MB for the third tablet server.
+  ts = cluster_->tablet_server(2);
+  
ts->mutable_flags()->emplace_back("--tcmalloc_max_total_thread_cache_bytes=67108864");
+  ts->Shutdown();
+  ASSERT_OK(ts->Restart());
+
+  // Make sure the flag works.
+  int64_t total_size = 0;
+  itest::GetInt64Metric(cluster_->tablet_server(0)->bound_http_hostport(),
+                        &METRIC_ENTITY_server,
+                        "kudu.tabletserver",
+                        &METRIC_tcmalloc_max_total_thread_cache_bytes,
+                        "value",
+                        &total_size);
+  ASSERT_EQ(1048576, total_size);
+  itest::GetInt64Metric(cluster_->tablet_server(1)->bound_http_hostport(),
+                        &METRIC_ENTITY_server,
+                        "kudu.tabletserver",
+                        &METRIC_tcmalloc_max_total_thread_cache_bytes,
+                        "value",
+                        &total_size);
+  ASSERT_EQ(8388608, total_size);
+  itest::GetInt64Metric(cluster_->tablet_server(2)->bound_http_hostport(),
+                        &METRIC_ENTITY_server,
+                        "kudu.tabletserver",
+                        &METRIC_tcmalloc_max_total_thread_cache_bytes,
+                        "value",
+                        &total_size);
+  ASSERT_EQ(67108864, total_size);
+
+  // Write some data to be scanned later on.
+  {
+    TestWorkload workload(cluster_.get());
+    workload.set_num_tablets(60);
+    workload.set_num_replicas(3);
+    workload.set_num_write_threads(20);
+    workload.set_write_batch_size(100);
+    workload.set_payload_bytes(1024);
+    workload.Setup();
+    workload.Start();
+    ASSERT_EVENTUALLY([&]() {
+      ASSERT_GE(workload.rows_inserted(), 30000);
+    });
+    workload.StopAndJoin();
+  }
+
+  // Additional memory is allocated during scan operations below.
+  {
+    TestWorkload workload(cluster_.get());
+    workload.set_num_write_threads(0);
+    workload.set_num_read_threads(20);
+    workload.Setup();
+    workload.Start();
+    // Run the scan workload for a long time to let it allocate/deallocate a 
lot of memory.
+    SleepFor(MonoDelta::FromSeconds(8));
+    workload.StopAndJoin();
+  }
+
+  // Compare the lock contention.
+  int64_t contention_0 = 0;
+  itest::GetInt64Metric(cluster_->tablet_server(0)->bound_http_hostport(),
+                        &METRIC_ENTITY_server,
+                        "kudu.tabletserver",
+                        &METRIC_spinlock_contention_time,
+                        "value",
+                        &contention_0);
+  int64_t contention_1 = 1;
+  itest::GetInt64Metric(cluster_->tablet_server(1)->bound_http_hostport(),
+                        &METRIC_ENTITY_server,
+                        "kudu.tabletserver",
+                        &METRIC_spinlock_contention_time,
+                        "value",
+                        &contention_1);
+  int64_t contention_2 = 2;
+  itest::GetInt64Metric(cluster_->tablet_server(2)->bound_http_hostport(),
+                        &METRIC_ENTITY_server,
+                        "kudu.tabletserver",
+                        &METRIC_spinlock_contention_time,
+                        "value",
+                        &contention_2);
+  LOG(INFO) << "The lock contention metric of Tablet server 0 is " << 
contention_0;
+  LOG(INFO) << "The lock contention metric of Tablet server 1 is " << 
contention_1;
+  LOG(INFO) << "The lock contention metric of Tablet server 2 is " << 
contention_2;
+}
+
 } // namespace kudu
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index ff3927008..3ba74867e 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -35,6 +35,9 @@
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
+#ifdef TCMALLOC_ENABLED
+#include <gperftools/malloc_extension.h>
+#endif
 
 #include "kudu/clock/clock.h"
 #include "kudu/clock/hybrid_clock.h"
@@ -303,6 +306,18 @@ DEFINE_uint32(wall_clock_jump_threshold_sec, 15 * 60,
               "is enabled");
 TAG_FLAG(wall_clock_jump_threshold_sec, experimental);
 
+DEFINE_uint64(tcmalloc_max_total_thread_cache_bytes, 128 * 1024 * 1024,
+              "Upper limit on total number of bytes stored across all thread 
cache "
+              "in tcmalloc. Increasing this value helps in reducing lock 
contention "
+              "in tcmalloc for memory-intensive workloads. WARNING: This flag 
will "
+              "cover the TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES environment 
variable. "
+              "Please change the max size of all thread caches via this flag. 
Note "
+              "that This bound is not strict, so it is possible for the cache 
to go "
+              "over this bound in certain circumstances. And the maximum value 
of this "
+              "flag is capped to 1 GB");
+TAG_FLAG(tcmalloc_max_total_thread_cache_bytes, advanced);
+TAG_FLAG(tcmalloc_max_total_thread_cache_bytes, experimental);
+
 DECLARE_bool(use_hybrid_clock);
 DECLARE_int32(dns_resolver_max_threads_num);
 DECLARE_uint32(dns_resolver_cache_capacity_mb);
@@ -377,6 +392,16 @@ enum class TriState {
   DISABLED,
 };
 
+bool ValidateThreadCacheMaxTotalSize(const char* flagname, uint64_t value) {
+  if (value >= 0 && value <= 1 * 1024 * 1024 * 1024) {
+    return true;
+  }
+  LOG(ERROR) << Substitute("$0 must be a value in range [0, 1GB], value $1 is 
invalid.",
+                           flagname, value);
+  return false;
+}
+DEFINE_validator(tcmalloc_max_total_thread_cache_bytes, 
&ValidateThreadCacheMaxTotalSize);
+
 // This is a helper function to parse a flag that has three possible values:
 // "auto", "enabled", "disabled".  That directly maps into the TriState enum.
 Status ParseTriStateFlag(const string& name,
@@ -797,7 +822,15 @@ Status ServerBase::Init() {
   init->Start();
   glog_metrics_.reset(new ScopedGLogMetrics(metric_entity_));
   tcmalloc::RegisterMetrics(metric_entity_);
+
+#ifdef TCMALLOC_ENABLED
+  MallocExtension::instance()->SetNumericProperty(
+    "tcmalloc.max_total_thread_cache_bytes", 
FLAGS_tcmalloc_max_total_thread_cache_bytes);
   RegisterSpinLockContentionMetrics(metric_entity_);
+#else
+  LOG(INFO) << "Flag tcmalloc_max_total_thread_cache_bytes is not working 
since tcmalloc "
+               "is not enabled.";
+#endif
 
   InitSpinLockContentionProfiling();
 

Reply via email to