This is an automated email from the ASF dual-hosted git repository.

himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2560bf0  Add new coordinator metrics for coordinator duty runtimes 
(#10603)
2560bf0 is described below

commit 2560bf0a1919c36b824bd0e4f9286e2899deddd3
Author: Lucas Capistrant <capistr...@users.noreply.github.com>
AuthorDate: Sun Nov 29 16:47:35 2020 -0600

    Add new coordinator metrics for coordinator duty runtimes (#10603)
    
    * Add new coordinator metrics for duty runtimes
    
    * fix spelling for a constant variable value
    
    * add comment clarifying why the global runtime metric is emitted where it 
is
    
    * Remove duty alias in lieu of using the class name for metrics
    
    * fix docs
    
    * CoordinatorStats tests + add duty stats to accumulate() logic
---
 docs/operations/metrics.md                         |  2 +
 .../java/org/apache/druid/query/DruidMetrics.java  |  3 ++
 .../druid/server/coordinator/CoordinatorStats.java | 51 ++++++++++++++++++
 .../druid/server/coordinator/DruidCoordinator.java | 28 ++++++++--
 .../duty/EmitClusterStatsAndMetrics.java           | 32 +++++++++++
 .../server/coordinator/CoordinatorStatsTest.java   | 63 ++++++++++++++++++++++
 .../server/coordinator/DruidCoordinatorTest.java   |  2 +-
 7 files changed, 176 insertions(+), 5 deletions(-)

diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 62c0521..68b7f88 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -254,6 +254,8 @@ These metrics are for the Druid Coordinator and are reset 
each time the Coordina
 |`segment/skipCompact/bytes`|Total bytes of this datasource that are skipped 
(not eligible for auto compaction) by the auto compaction.|datasource.|Varies.|
 |`segment/skipCompact/count`|Total number of segments of this datasource that 
are skipped (not eligible for auto compaction) by the auto 
compaction.|datasource.|Varies.|
 |`interval/skipCompact/count`|Total number of intervals of this datasource 
that are skipped (not eligible for auto compaction) by the auto 
compaction.|datasource.|Varies.|
+|`coordinator/time`|Approximate Coordinator duty runtime in milliseconds. The 
duty dimension is the string alias of the Duty that is being run.|duty.|Varies.|
+|`coordinator/global/time`|Approximate runtime of a full coordination cycle in 
milliseconds. The `dutyGroup` dimension indicates what type of coordination 
this run was. i.e. Historical Management vs Indexing|`dutyGroup`|Varies.|
 
 If `emitBalancingStats` is set to `true` in the Coordinator [dynamic 
configuration](
 ../configuration/index.html#dynamic-configuration), then [log 
entries](../configuration/logging.md) for class
diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java 
b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
index b1e49af..1e31c5f 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java
@@ -42,6 +42,9 @@ public class DruidMetrics
   public static final String SERVER = "server";
   public static final String TIER = "tier";
 
+  public static final String DUTY = "duty";
+  public static final String DUTY_GROUP = "dutyGroup";
+
   public static int findNumComplexAggs(List<AggregatorFactory> aggs)
   {
     int retVal = 0;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorStats.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorStats.java
index fa10f20..357cf45 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorStats.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorStats.java
@@ -35,12 +35,14 @@ public class CoordinatorStats
 {
   private final Map<String, Object2LongOpenHashMap<String>> perTierStats;
   private final Map<String, Object2LongOpenHashMap<String>> perDataSourceStats;
+  private final Map<String, Object2LongOpenHashMap<String>> perDutyStats;
   private final Object2LongOpenHashMap<String> globalStats;
 
   public CoordinatorStats()
   {
     perTierStats = new HashMap<>();
     perDataSourceStats = new HashMap<>();
+    perDutyStats = new HashMap<>();
     globalStats = new Object2LongOpenHashMap<>();
   }
 
@@ -54,6 +56,11 @@ public class CoordinatorStats
     return !perDataSourceStats.isEmpty();
   }
 
+  public boolean hasPerDutyStats()
+  {
+    return !perDutyStats.isEmpty();
+  }
+
   public Set<String> getTiers(final String statName)
   {
     final Object2LongOpenHashMap<String> theStat = perTierStats.get(statName);
@@ -72,6 +79,15 @@ public class CoordinatorStats
     return Collections.unmodifiableSet(stat.keySet());
   }
 
+  public Set<String> getDuties(String statName)
+  {
+    final Object2LongOpenHashMap<String> stat = perDutyStats.get(statName);
+    if (stat == null) {
+      return Collections.emptySet();
+    }
+    return Collections.unmodifiableSet(stat.keySet());
+  }
+
   /**
    *
    * @param statName the name of the statistics
@@ -109,6 +125,21 @@ public class CoordinatorStats
     }
   }
 
+  public long getDutyStat(String statName, String duty)
+  {
+    return perDutyStats.get(statName).getLong(duty);
+  }
+
+  public void forEachDutyStat(String statName, ObjLongConsumer<String> 
consumer)
+  {
+    final Object2LongOpenHashMap<String> stat = perDutyStats.get(statName);
+    if (stat != null) {
+      for (Entry<String> entry : stat.object2LongEntrySet()) {
+        consumer.accept(entry.getKey(), entry.getLongValue());
+      }
+    }
+  }
+
   public long getGlobalStat(final String statName)
   {
     return globalStats.getLong(statName);
@@ -132,6 +163,12 @@ public class CoordinatorStats
                       .addTo(dataSource, value);
   }
 
+  public void addToDutyStat(String statName, String duty, long value)
+  {
+    perDutyStats.computeIfAbsent(statName, k -> new Object2LongOpenHashMap<>())
+                .addTo(duty, value);
+  }
+
   public void addToGlobalStat(final String statName, final long value)
   {
     globalStats.addTo(statName, value);
@@ -166,6 +203,20 @@ public class CoordinatorStats
         }
     );
 
+    stats.perDutyStats.forEach(
+        (statName, urStat) -> {
+          final Object2LongOpenHashMap<String> myStat = 
perDutyStats.computeIfAbsent(
+              statName,
+              k -> new Object2LongOpenHashMap<>()
+          );
+
+          for (Entry<String> entry : urStat.object2LongEntrySet()) {
+            myStat.addTo(entry.getKey(), entry.getLongValue());
+          }
+        }
+
+    );
+
     for (final Object2LongMap.Entry<String> entry : 
stats.globalStats.object2LongEntrySet()) {
       globalStats.addTo(entry.getKey(), entry.getLongValue());
     }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 678f440..2b6e8c2 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -60,8 +60,10 @@ import 
org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.MetadataRuleManager;
 import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.duty.BalanceSegments;
 import org.apache.druid.server.coordinator.duty.CompactSegments;
@@ -93,6 +95,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -158,6 +161,10 @@ public class DruidCoordinator
   private int cachedBalancerThreadNumber;
   private ListeningExecutorService balancerExec;
 
+  private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = 
"HistoricalManagementDuties";
+  private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = 
"IndexingServiceDuties";
+  private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = 
"CompactSegmentsDuties";
+
   @Inject
   public DruidCoordinator(
       DruidCoordinatorConfig config,
@@ -573,7 +580,7 @@ public class DruidCoordinator
   public void runCompactSegmentsDuty()
   {
     final int startingLeaderCounter = coordLeaderSelector.localTerm();
-    DutiesRunnable compactSegmentsDuty = new 
DutiesRunnable(makeCompactSegmentsDuty(), startingLeaderCounter);
+    DutiesRunnable compactSegmentsDuty = new 
DutiesRunnable(makeCompactSegmentsDuty(), startingLeaderCounter, 
COMPACT_SEGMENTS_DUTIES_DUTY_GROUP);
     compactSegmentsDuty.run();
   }
 
@@ -598,14 +605,14 @@ public class DruidCoordinator
       final List<Pair<? extends DutiesRunnable, Duration>> dutiesRunnables = 
new ArrayList<>();
       dutiesRunnables.add(
           Pair.of(
-              new DutiesRunnable(makeHistoricalManagementDuties(), 
startingLeaderCounter),
+              new DutiesRunnable(makeHistoricalManagementDuties(), 
startingLeaderCounter, HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP),
               config.getCoordinatorPeriod()
           )
       );
       if (indexingServiceClient != null) {
         dutiesRunnables.add(
             Pair.of(
-                new DutiesRunnable(makeIndexingServiceDuties(), 
startingLeaderCounter),
+                new DutiesRunnable(makeIndexingServiceDuties(), 
startingLeaderCounter, INDEXING_SERVICE_DUTIES_DUTY_GROUP),
                 config.getCoordinatorIndexingPeriod()
             )
         );
@@ -706,11 +713,13 @@ public class DruidCoordinator
     private final long startTimeNanos = System.nanoTime();
     private final List<CoordinatorDuty> duties;
     private final int startingLeaderCounter;
+    private final String dutiesRunnableAlias;
 
-    protected DutiesRunnable(List<CoordinatorDuty> duties, final int 
startingLeaderCounter)
+    protected DutiesRunnable(List<CoordinatorDuty> duties, final int 
startingLeaderCounter, String alias)
     {
       this.duties = duties;
       this.startingLeaderCounter = startingLeaderCounter;
+      this.dutiesRunnableAlias = alias;
     }
 
     @VisibleForTesting
@@ -747,6 +756,7 @@ public class DruidCoordinator
     public void run()
     {
       try {
+        final long globalStart = System.nanoTime();
         synchronized (lock) {
           if (!coordLeaderSelector.isLeader()) {
             log.info("LEGGO MY EGGO. [%s] is leader.", 
coordLeaderSelector.getCurrentLeader());
@@ -801,14 +811,24 @@ public class DruidCoordinator
               && coordLeaderSelector.isLeader()
               && startingLeaderCounter == coordLeaderSelector.localTerm()) {
 
+            final long start = System.nanoTime();
             params = duty.run(params);
+            final long end = System.nanoTime();
 
             if (params == null) {
               // This duty wanted to cancel the run. No log message, since the 
duty should have logged a reason.
               return;
+            } else {
+              params.getCoordinatorStats().addToDutyStat("runtime", 
duty.getClass().getName(), TimeUnit.NANOSECONDS.toMillis(end - start));
             }
           }
         }
+        // Emit the runtime of the full DutiesRunnable
+        params.getEmitter().emit(
+            new ServiceMetricEvent.Builder()
+                .setDimension(DruidMetrics.DUTY_GROUP, dutiesRunnableAlias)
+                .build("coordinator/global/time", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - globalStart))
+        );
       }
       catch (Exception e) {
         log.makeAlert(e, "Caught exception, ignoring so that schedule keeps 
going.").emit();
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
index 70bfb04..1f6cab3 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java
@@ -97,6 +97,35 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
     );
   }
 
+  private void emitDutyStat(
+      final ServiceEmitter emitter,
+      final String metricName,
+      final String duty,
+      final long value
+  )
+  {
+    emitter.emit(
+        new ServiceMetricEvent.Builder()
+            .setDimension(DruidMetrics.DUTY, duty)
+            .build(metricName, value)
+    );
+  }
+
+  private void emitDutyStats(
+      final ServiceEmitter emitter,
+      final String metricName,
+      final CoordinatorStats stats,
+      final String statName
+  )
+  {
+    stats.forEachDutyStat(
+        statName,
+        (final String duty, final long count) -> {
+          emitDutyStat(emitter, metricName, duty, count);
+        }
+    );
+  }
+
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
@@ -435,6 +464,9 @@ public class EmitClusterStatsAndMetrics implements 
CoordinatorDuty
         }
     );
 
+    // Emit coordinator runtime stats
+    emitDutyStats(emitter, "coordinator/time", stats, "runtime");
+
     return params;
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorStatsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorStatsTest.java
index 11fcfd1..00dc094 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorStatsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorStatsTest.java
@@ -116,12 +116,18 @@ public class CoordinatorStatsTest
     stats.addToTieredStat("stat1", "tier1", 1);
     stats.addToTieredStat("stat1", "tier2", 1);
     stats.addToTieredStat("stat2", "tier1", 1);
+    stats.addToDutyStat("stat1", "duty1", 1);
+    stats.addToDutyStat("stat1", "duty2", 1);
+    stats.addToDutyStat("stat2", "duty1", 1);
 
     final CoordinatorStats stats2 = new CoordinatorStats();
     stats2.addToGlobalStat("stat1", 1);
     stats2.addToTieredStat("stat1", "tier2", 1);
     stats2.addToTieredStat("stat2", "tier2", 1);
     stats2.addToTieredStat("stat3", "tier1", 1);
+    stats2.addToDutyStat("stat1", "duty2", 1);
+    stats2.addToDutyStat("stat2", "duty2", 1);
+    stats2.addToDutyStat("stat3", "duty1", 1);
 
     stats.accumulate(stats2);
 
@@ -132,6 +138,11 @@ public class CoordinatorStatsTest
     Assert.assertEquals(1, stats.getTieredStat("stat2", "tier1"));
     Assert.assertEquals(1, stats.getTieredStat("stat2", "tier2"));
     Assert.assertEquals(1, stats.getTieredStat("stat3", "tier1"));
+    Assert.assertEquals(1, stats.getDutyStat("stat1", "duty1"));
+    Assert.assertEquals(2, stats.getDutyStat("stat1", "duty2"));
+    Assert.assertEquals(1, stats.getDutyStat("stat2", "duty1"));
+    Assert.assertEquals(1, stats.getDutyStat("stat2", "duty2"));
+    Assert.assertEquals(1, stats.getDutyStat("stat3", "duty1"));
   }
 
   @Test
@@ -167,4 +178,56 @@ public class CoordinatorStatsTest
     Assert.assertEquals(10, stats.getTieredStat("stat1", "tier2"));
 
   }
+
+  @Test(expected = NullPointerException.class)
+  public void testGetNonexistentDutyStat()
+  {
+    stats.getDutyStat("stat", "duty");
+  }
+
+  @Test
+  public void testAddToDutyStat()
+  {
+    Assert.assertFalse(stats.hasPerDutyStats());
+    stats.addToDutyStat("stat1", "duty1", 1);
+    stats.addToDutyStat("stat1", "duty2", 1);
+    stats.addToDutyStat("stat1", "duty1", -5);
+    stats.addToDutyStat("stat2", "duty1", 1);
+    stats.addToDutyStat("stat1", "duty2", 1);
+    Assert.assertTrue(stats.hasPerDutyStats());
+
+    Assert.assertEquals(
+        Sets.newHashSet("duty1", "duty2"),
+        stats.getDuties("stat1")
+    );
+    Assert.assertEquals(
+        Sets.newHashSet("duty1"),
+        stats.getDuties("stat2")
+    );
+    Assert.assertTrue(stats.getDuties("stat3").isEmpty());
+
+    Assert.assertEquals(-4, stats.getDutyStat("stat1", "duty1"));
+    Assert.assertEquals(2, stats.getDutyStat("stat1", "duty2"));
+    Assert.assertEquals(1, stats.getDutyStat("stat2", "duty1"));
+  }
+
+  @Test
+  public void testForEachDutyStat()
+  {
+    final Map<String, Long> expected = ImmutableMap.of(
+        "duty1", 1L,
+        "duty2", 2L,
+        "duty3", 3L
+    );
+    final Map<String, Long> actual = new HashMap<>();
+
+    expected.forEach(
+        (duty, count) -> stats.addToDutyStat("stat", duty, count)
+    );
+
+    stats.forEachDutyStat("stat0", (duty, count) -> Assert.fail());
+    stats.forEachDutyStat("stat", actual::put);
+
+    Assert.assertEquals(expected, actual);
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 34fe944..31bfeec 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -709,7 +709,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
         ZkEnablementConfig.ENABLED
     );
 
-    DruidCoordinator.DutiesRunnable duty = c.new 
DutiesRunnable(Collections.emptyList(), 0);
+    DruidCoordinator.DutiesRunnable duty = c.new 
DutiesRunnable(Collections.emptyList(), 0, "TEST");
     // before initialization
     Assert.assertEquals(0, c.getCachedBalancerThreadNumber());
     Assert.assertNull(c.getBalancerExec());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to