HADOOP-13285. DecayRpcScheduler MXBean should only report decayed CallVolumeSummary. Contributed by Xiaoyu Yao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0761379f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0761379f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0761379f Branch: refs/heads/YARN-2915 Commit: 0761379fe45898c44c8f161834c298ef932e4d8c Parents: 2800695 Author: Xiaoyu Yao <x...@apache.org> Authored: Fri Jun 17 15:25:14 2016 -0700 Committer: Xiaoyu Yao <x...@apache.org> Committed: Fri Jun 17 15:25:14 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/ipc/DecayRpcScheduler.java | 17 ++++++++++- .../hadoop/ipc/TestDecayRpcScheduler.java | 30 ++++++++++++++++---- 2 files changed, 41 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0761379f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index ec87c75..f40bd17 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -901,9 +901,24 @@ public class DecayRpcScheduler implements RpcScheduler, public String getCallVolumeSummary() { try { ObjectMapper om = new ObjectMapper(); - return om.writeValueAsString(callCounts); + return om.writeValueAsString(getDecayedCallCounts()); } catch (Exception e) { return "Error: " + e.getMessage(); } } + + private Map<Object, Long> getDecayedCallCounts() { + Map<Object, Long> decayedCallCounts = new HashMap<>(callCounts.size()); + Iterator<Map.Entry<Object, List<AtomicLong>>> it = + callCounts.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<Object, List<AtomicLong>> entry = it.next(); + Object user = entry.getKey(); + Long decayedCount = entry.getValue().get(0).get(); + if (decayedCount > 0) { + decayedCallCounts.put(user, decayedCount); + } + } + return decayedCallCounts; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0761379f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java index 0b0408c..58380c5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java @@ -30,6 +30,10 @@ import static org.mockito.Mockito.when; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.conf.Configuration; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; + public class TestDecayRpcScheduler { private Schedulable mockCall(String id) { Schedulable mockCall = mock(Schedulable.class); @@ -189,12 +193,14 @@ public class TestDecayRpcScheduler { @Test @SuppressWarnings("deprecation") - public void testPriority() { + public void testPriority() throws Exception { Configuration conf = new Configuration(); - conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush - conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, - "25, 50, 75"); - scheduler = new DecayRpcScheduler(4, "ns", conf); + final String namespace = "ns"; + conf.set(namespace + "." + DecayRpcScheduler + .IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush + conf.set(namespace + "." + DecayRpcScheduler + .IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75"); + scheduler = new DecayRpcScheduler(4, namespace, conf); assertEquals(0, scheduler.getPriorityLevel(mockCall("A"))); assertEquals(2, scheduler.getPriorityLevel(mockCall("A"))); @@ -206,6 +212,20 @@ public class TestDecayRpcScheduler { assertEquals(1, scheduler.getPriorityLevel(mockCall("A"))); assertEquals(1, scheduler.getPriorityLevel(mockCall("A"))); assertEquals(2, scheduler.getPriorityLevel(mockCall("A"))); + + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName mxbeanName = new ObjectName( + "Hadoop:service="+ namespace + ",name=DecayRpcScheduler"); + + String cvs1 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary"); + assertTrue("Get expected JMX of CallVolumeSummary before decay", + cvs1.equals("{\"A\":6,\"B\":2,\"C\":2}")); + + scheduler.forceDecay(); + + String cvs2 = (String) mbs.getAttribute(mxbeanName, "CallVolumeSummary"); + assertTrue("Get expected JMX for CallVolumeSummary after decay", + cvs2.equals("{\"A\":3,\"B\":1,\"C\":1}")); } @Test(timeout=2000) --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org