Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-ams a9c6054fe -> e196358ca


AMBARI-22215 Refine cluster second aggregator by aligning sink publish times to 
1 minute boundaries. (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e196358c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e196358c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e196358c

Branch: refs/heads/branch-3.0-ams
Commit: e196358caa8b4c3387645238b8c48bb009cee311
Parents: a9c6054
Author: Dmytro Sen <d...@apache.org>
Authored: Thu Oct 12 12:49:22 2017 +0300
Committer: Dmytro Sen <d...@apache.org>
Committed: Thu Oct 12 12:49:22 2017 +0300

----------------------------------------------------------------------
 .../timeline/AbstractTimelineMetricsSink.java   |  95 +++++++-
 .../metrics2/sink/timeline/TimelineMetric.java  |   3 +
 .../AbstractTimelineMetricSinkTest.java         | 240 +++++++++++++++++++
 .../AbstractTimelineMetricSinkTest.java         | 113 ---------
 .../timeline/HadoopTimelineMetricsSink.java     |   2 +-
 .../timeline/HadoopTimelineMetricsSinkTest.java |   4 +-
 .../main/python/core/application_metric_map.py  |  52 +++-
 .../python/core/TestApplicationMetricMap.py     |  38 ++-
 .../timeline/TimelineMetricConfiguration.java   |   3 -
 .../timeline/TimelineMetricsIgniteCache.java    |  14 +-
 .../timeline/aggregators/AggregatorUtils.java   |   2 +-
 .../TimelineMetricAggregatorFactory.java        |   7 +-
 ...cClusterAggregatorSecondWithCacheSource.java |  38 +--
 .../TimelineMetricsIgniteCacheTest.java         |  56 -----
 ...sterAggregatorSecondWithCacheSourceTest.java |  65 +----
 15 files changed, 437 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 3c06032..739e9dc 100644
--- 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.metrics2.sink.timeline;
 
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.reflect.TypeToken;
 import com.google.gson.Gson;
 import com.google.gson.JsonSyntaxException;
@@ -58,6 +60,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -130,6 +133,13 @@ public abstract class AbstractTimelineMetricsSink {
   private static final int COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES = 75;
   private static final int COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES = 60;
 
+  //10 seconds
+  protected int collectionPeriodMillis = 10000;
+
+  private int cacheExpireTimeMinutesDefault = 10;
+
+  private volatile Cache<String, TimelineMetric> metricsPostCache = 
CacheBuilder.newBuilder().expireAfterAccess(cacheExpireTimeMinutesDefault, 
TimeUnit.MINUTES).build();
+
   static {
     mapper = new ObjectMapper();
     AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
@@ -289,7 +299,21 @@ public abstract class AbstractTimelineMetricsSink {
     return collectorHost;
   }
 
+  /**
+   * @param metrics metrics to post, metric values will be aligned by minute 
mark,
+   *                last uncompleted minute will be cached to post in future 
iteration
+   */
   protected boolean emitMetrics(TimelineMetrics metrics) {
+    return emitMetrics(metrics, false);
+  }
+
+  /**
+   * @param metrics metrics to post, if postAllCachedMetrics is false metric 
values will be aligned by minute mark,
+   *                last uncompleted minute will be cached to post in future 
iteration
+   * @param postAllCachedMetrics if set to true all cached metrics will be 
posted, ignoring the minute aligning
+   * @return
+   */
+  protected boolean emitMetrics(TimelineMetrics metrics, boolean 
postAllCachedMetrics) {
     String connectUrl;
     boolean validCollectorHost = true;
 
@@ -307,11 +331,20 @@ public abstract class AbstractTimelineMetricsSink {
       connectUrl = getCollectorUri(collectorHost);
     }
 
+    TimelineMetrics metricsToEmit = alignMetricsByMinuteMark(metrics);
+
+    if (postAllCachedMetrics) {
+      for (TimelineMetric timelineMetric : metricsPostCache.asMap().values()) {
+        metricsToEmit.addOrMergeTimelineMetric(timelineMetric);
+      }
+      metricsPostCache.invalidateAll();
+    }
+
     if (validCollectorHost) {
       String jsonData = null;
       LOG.debug("EmitMetrics connectUrl = "  + connectUrl);
       try {
-        jsonData = mapper.writeValueAsString(metrics);
+        jsonData = mapper.writeValueAsString(metricsToEmit);
       } catch (IOException e) {
         LOG.error("Unable to parse metrics", e);
       }
@@ -335,6 +368,61 @@ public abstract class AbstractTimelineMetricsSink {
   }
 
   /**
+   * Align metrics by the minutes so that only complete minutes are send.
+   * Not completed minutes data points will be cached and posted when the 
minute will be completed.
+   * Cached metrics are merged with currently posting metrics
+   * e.g:
+   * first iteration if metrics from 00m15s to 01m15s are processed,
+   *               then metrics from 00m15s to 00m59s will be posted
+   *                        and from 01m00s to 01m15s will be cached
+   * second iteration   metrics from 01m25s to 02m55s are processed,
+   *     cached metrics from previous call will be merged with current,
+   *                    metrics from 01m00s to 02m55s will be posted, cache 
will be empty
+   * @param metrics
+   * @return
+   */
+  protected TimelineMetrics alignMetricsByMinuteMark(TimelineMetrics metrics) {
+    TimelineMetrics allMetricsToPost = new TimelineMetrics();
+
+    for (TimelineMetric metric : metrics.getMetrics()) {
+      TimelineMetric cachedMetric = 
metricsPostCache.getIfPresent(metric.getMetricName());
+      if (cachedMetric != null) {
+        metric.addMetricValues(cachedMetric.getMetricValues());
+        metricsPostCache.invalidate(metric.getMetricName());
+      }
+    }
+
+    for (TimelineMetric metric : metrics.getMetrics()) {
+      TreeMap<Long, Double> valuesToCache = new TreeMap<>();
+      TreeMap<Long, Double> valuesToPost = metric.getMetricValues();
+
+      // in case there can't be any more datapoints in last minute just post 
the metrics,
+      // otherwise need to cut off and cache the last uncompleted minute
+      if (!(valuesToPost.lastKey() % 60000 > 60000 - collectionPeriodMillis)) {
+        Long lastMinute = valuesToPost.lastKey() / 60000;
+        while (!valuesToPost.isEmpty() && valuesToPost.lastKey() / 60000 == 
lastMinute) {
+          valuesToCache.put(valuesToPost.lastKey(), 
valuesToPost.get(valuesToPost.lastKey()));
+          valuesToPost.remove(valuesToPost.lastKey());
+        }
+      }
+
+      if (!valuesToCache.isEmpty()) {
+        TimelineMetric metricToCache = new TimelineMetric(metric);
+        metricToCache.setMetricValues(valuesToCache);
+        metricsPostCache.put(metricToCache.getMetricName(), metricToCache);
+      }
+
+      if (!valuesToPost.isEmpty()) {
+        TimelineMetric metricToPost = new TimelineMetric(metric);
+        metricToPost.setMetricValues(valuesToPost);
+        allMetricsToPost.addOrMergeTimelineMetric(metricToPost);
+      }
+    }
+
+    return allMetricsToPost;
+  }
+
+  /**
    * Cleans up and closes an input stream
    * see 
http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
    * @param is the InputStream to clean up
@@ -609,6 +697,11 @@ public abstract class AbstractTimelineMetricsSink {
       rand.nextInt(zookeeperMaxBackoffTimeMins - zookeeperMinBackoffTimeMins + 
1)) * 60*1000l;
   }
 
+  //for now it's used only for testing
+  protected Cache<String, TimelineMetric> getMetricsPostCache() {
+    return metricsPostCache;
+  }
+
   /**
    * Get a pre-formatted URI for the collector
    */

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index 3dfcf4e..b376048 100644
--- 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -146,6 +146,9 @@ public class TimelineMetric implements 
Comparable<TimelineMetric>, Serializable
 
   public void addMetricValues(Map<Long, Double> metricValues) {
     this.metricValues.putAll(metricValues);
+    if (!this.metricValues.isEmpty()) {
+      this.setStartTime(this.metricValues.firstKey());
+    }
   }
 
   @XmlElement(name = "metadata")

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
 
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
new file mode 100644
index 0000000..634d18c
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import junit.framework.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.TreeMap;
+
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+import static org.powermock.api.easymock.PowerMock.expectNew;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({AbstractTimelineMetricsSink.class, HttpURLConnection.class})
+public class AbstractTimelineMetricSinkTest {
+
+  @Test
+  public void testParseHostsStringIntoCollection() {
+    AbstractTimelineMetricsSink sink = new TestTimelineMetricsSink();
+    Collection<String> hosts;
+
+    hosts = sink.parseHostsStringIntoCollection("");
+    Assert.assertTrue(hosts.isEmpty());
+
+    hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local");
+    Assert.assertTrue(hosts.size() == 1);
+    Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
+
+    hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local ");
+    Assert.assertTrue(hosts.size() == 1);
+    Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
+
+    hosts = 
sink.parseHostsStringIntoCollection("test1.123.abc.def.local,test1.456.abc.def.local");
+    Assert.assertTrue(hosts.size() == 2);
+
+    hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local, 
test1.456.abc.def.local");
+    Assert.assertTrue(hosts.size() == 2);
+    Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
+    Assert.assertTrue(hosts.contains("test1.456.abc.def.local"));
+  }
+
+  @Test
+  @PrepareForTest({URL.class, OutputStream.class, 
AbstractTimelineMetricsSink.class, HttpURLConnection.class, 
TimelineMetric.class})
+  public void testEmitMetrics() throws Exception {
+    HttpURLConnection connection = 
PowerMock.createNiceMock(HttpURLConnection.class);
+    URL url = PowerMock.createNiceMock(URL.class);
+    expectNew(URL.class, anyString()).andReturn(url).anyTimes();
+    expect(url.openConnection()).andReturn(connection).anyTimes();
+    expect(connection.getResponseCode()).andReturn(200).anyTimes();
+    OutputStream os = PowerMock.createNiceMock(OutputStream.class);
+    expect(connection.getOutputStream()).andReturn(os).anyTimes();
+
+
+    TestTimelineMetricsSink sink = new TestTimelineMetricsSink();
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    long startTime = System.currentTimeMillis() / 60000 * 60000;
+
+    long seconds = 1000;
+    TreeMap<Long, Double> metricValues = new TreeMap<>();
+    /*
+
+    0        +30s      +60s
+    |         |         |
+      (1)(2)(3) (4)(5)   (6)  m1
+
+    */
+    // (6) should be cached, the rest - posted
+
+    metricValues.put(startTime + 4*seconds, 1.0);
+    metricValues.put(startTime + 14*seconds, 2.0);
+    metricValues.put(startTime + 24*seconds, 3.0);
+    metricValues.put(startTime + 34*seconds, 4.0);
+    metricValues.put(startTime + 44*seconds, 5.0);
+    metricValues.put(startTime + 64*seconds, 6.0);
+
+    TimelineMetric timelineMetric = new TimelineMetric("metric1", "host1", 
"app1", "instance1");
+    timelineMetric.setStartTime(metricValues.firstKey());
+    timelineMetric.addMetricValues(metricValues);
+
+    timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+
+    replayAll();
+    sink.emitMetrics(timelineMetrics);
+    Assert.assertEquals(1, sink.getMetricsPostCache().size());
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 64*seconds, 6.0);
+    Assert.assertEquals(metricValues, 
sink.getMetricsPostCache().getIfPresent("metric1").getMetricValues());
+
+    timelineMetrics = new TimelineMetrics();
+    metricValues = new TreeMap<>();
+    /*
+
+    +60      +90s     +120s     +150s     +180s
+    |         |         |         |         |
+       (7)      (8)       (9)           (10)   (11)   m1
+
+    */
+    // (6) from previous post should be merged with current data
+    // (6),(7),(8),(9),(10) - should be posted, (11) - cached
+    metricValues.put(startTime + 74*seconds, 7.0);
+    metricValues.put(startTime + 94*seconds, 8.0);
+    metricValues.put(startTime + 124*seconds, 9.0);
+    metricValues.put(startTime + 154*seconds, 10.0);
+    metricValues.put(startTime + 184*seconds, 11.0);
+
+    timelineMetric = new TimelineMetric("metric1", "host1", "app1", 
"instance1");
+    timelineMetric.setStartTime(metricValues.firstKey());
+    timelineMetric.addMetricValues(metricValues);
+
+    timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+    sink.emitMetrics(timelineMetrics);
+
+    Assert.assertEquals(1, sink.getMetricsPostCache().size());
+    metricValues = new TreeMap<>();
+    metricValues.put(startTime + 184*seconds, 11.0);
+    Assert.assertEquals(metricValues, 
sink.getMetricsPostCache().getIfPresent("metric1").getMetricValues());timelineMetrics
 = new TimelineMetrics();
+
+    metricValues = new TreeMap<>();
+    /*
+
+    +180s   +210s   +240s
+    |         |       |
+       (12)        (13)
+
+    */
+    // (11) from previous post should be merged with current data
+    // (11),(12),(13) - should be posted, cache should be empty
+    metricValues.put(startTime + 194*seconds, 12.0);
+    metricValues.put(startTime + 239*seconds, 13.0);
+
+    timelineMetric = new TimelineMetric("metric1", "host1", "app1", 
"instance1");
+    timelineMetric.setStartTime(metricValues.firstKey());
+    timelineMetric.addMetricValues(metricValues);
+
+    timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+    sink.emitMetrics(timelineMetrics);
+
+    Assert.assertEquals(0, sink.getMetricsPostCache().size());
+
+    metricValues = new TreeMap<>();
+    /*
+
+    +240s   +270s   +300s   +330s
+    |         |       |       |
+       (14)        (15)   (16)
+
+    */
+    // since postAllCachedMetrics in emitMetrics call is true (14),(15),(16) - 
should be posted, cache should be empty
+    metricValues.put(startTime + 245*seconds, 14.0);
+    metricValues.put(startTime + 294*seconds, 15.0);
+    metricValues.put(startTime + 315*seconds, 16.0);
+
+    timelineMetric = new TimelineMetric("metric1", "host1", "app1", 
"instance1");
+    timelineMetric.setStartTime(metricValues.firstKey());
+    timelineMetric.addMetricValues(metricValues);
+
+    timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+    sink.emitMetrics(timelineMetrics, true);
+
+    Assert.assertEquals(0, sink.getMetricsPostCache().size());
+  }
+
+  private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink {
+    @Override
+    protected String getCollectorUri(String host) {
+      return "";
+    }
+
+    @Override
+    protected String getCollectorProtocol() {
+      return "http";
+    }
+
+    @Override
+    protected String getCollectorPort() {
+      return "2181";
+    }
+
+    @Override
+    protected int getTimeoutSeconds() {
+      return 10;
+    }
+
+    @Override
+    protected String getZookeeperQuorum() {
+      return "localhost:2181";
+    }
+
+    @Override
+    protected Collection<String> getConfiguredCollectorHosts() {
+      return Arrays.asList("localhost");
+    }
+
+    @Override
+    protected String getHostname() {
+      return "h1";
+    }
+
+    @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return true;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 61888;
+    }
+
+    @Override
+    protected String getHostInMemoryAggregationProtocol() {
+      return "http";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
 
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
deleted file mode 100644
index 396d08d..0000000
--- 
a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline.availability;
-
-import junit.framework.Assert;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-public class AbstractTimelineMetricSinkTest {
-
-  @Test
-  public void testParseHostsStringIntoCollection() {
-    AbstractTimelineMetricsSink sink = new TestTimelineMetricsSink();
-    Collection<String> hosts;
-
-    hosts = sink.parseHostsStringIntoCollection("");
-    Assert.assertTrue(hosts.isEmpty());
-
-    hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local");
-    Assert.assertTrue(hosts.size() == 1);
-    Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
-
-    hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local ");
-    Assert.assertTrue(hosts.size() == 1);
-    Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
-
-    hosts = 
sink.parseHostsStringIntoCollection("test1.123.abc.def.local,test1.456.abc.def.local");
-    Assert.assertTrue(hosts.size() == 2);
-
-    hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local, 
test1.456.abc.def.local");
-    Assert.assertTrue(hosts.size() == 2);
-    Assert.assertTrue(hosts.contains("test1.123.abc.def.local"));
-    Assert.assertTrue(hosts.contains("test1.456.abc.def.local"));
-
-  }
-
-  private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink {
-    @Override
-    protected String getCollectorUri(String host) {
-      return "";
-    }
-
-    @Override
-    protected String getCollectorProtocol() {
-      return "http";
-    }
-
-    @Override
-    protected String getCollectorPort() {
-      return "2181";
-    }
-
-    @Override
-    protected int getTimeoutSeconds() {
-      return 10;
-    }
-
-    @Override
-    protected String getZookeeperQuorum() {
-      return "localhost:2181";
-    }
-
-    @Override
-    protected Collection<String> getConfiguredCollectorHosts() {
-      return Arrays.asList("localhost");
-    }
-
-    @Override
-    protected String getHostname() {
-      return "h1";
-    }
-
-    @Override
-    protected boolean isHostInMemoryAggregationEnabled() {
-      return true;
-    }
-
-    @Override
-    protected int getHostInMemoryAggregationPort() {
-      return 61888;
-    }
-
-    @Override
-    protected String getHostInMemoryAggregationProtocol() {
-      return "http";
-    }
-
-    @Override
-    public boolean emitMetrics(TimelineMetrics metrics) {
-      super.init();
-      return super.emitMetrics(metrics);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index f37c2be..f70d8ec 100644
--- 
a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -508,7 +508,7 @@ public class HadoopTimelineMetricsSink extends 
AbstractTimelineMetricsSink imple
         LOG.debug("Closing HadoopTimelineMetricSink. Flushing metrics to 
collector...");
         TimelineMetrics metrics = metricsCache.getAllMetrics();
         if (metrics != null) {
-          emitMetrics(metrics);
+          emitMetrics(metrics, true);
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
 
b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 8fde394..b194924 100644
--- 
a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ 
b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -179,7 +179,7 @@ public class HadoopTimelineMetricsSinkTest {
       createMockBuilder(HadoopTimelineMetricsSink.class)
         .withConstructor().addMockedMethod("appendPrefix")
         .addMockedMethod("findLiveCollectorHostsFromKnownCollector")
-        .addMockedMethod("emitMetrics").createNiceMock();
+        .addMockedMethod("emitMetrics", 
TimelineMetrics.class).createNiceMock();
 
     SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
     
expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes();
@@ -310,7 +310,7 @@ public class HadoopTimelineMetricsSinkTest {
       createMockBuilder(HadoopTimelineMetricsSink.class)
         .withConstructor().addMockedMethod("appendPrefix")
         .addMockedMethod("findLiveCollectorHostsFromKnownCollector")
-        .addMockedMethod("emitMetrics").createNiceMock();
+        .addMockedMethod("emitMetrics", 
TimelineMetrics.class).createNiceMock();
 
     SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
     
expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes();

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
 
b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
index 34a6787..bd957a0 100644
--- 
a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
+++ 
b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
@@ -41,6 +41,7 @@ class ApplicationMetricMap:
     self.ip_address = ip_address
     self.lock = RLock()
     self.app_metric_map = {}
+    self.cached_metric_map = {}
   pass
 
   def put_metric(self, application_id, metric_id_to_value_map, timestamp):
@@ -98,7 +99,7 @@ class ApplicationMetricMap:
             "appid" : "HOST",
             "instanceid" : result_instanceid,
             "starttime" : self.get_start_time(appId, metricId),
-            "metrics" : metricData
+            "metrics" : self.align_values_by_minute_mark(appId, metricId, 
metricData) if clear_once_flattened else metricData
           }
           timeline_metrics[ "metrics" ].append( timeline_metric )
         pass
@@ -114,6 +115,10 @@ class ApplicationMetricMap:
 
   def get_start_time(self, app_id, metric_id):
     with self.lock:
+      if self.cached_metric_map.has_key(app_id):
+        if self.cached_metric_map.get(app_id).has_key(metric_id):
+          metrics = self.cached_metric_map.get(app_id).get(metric_id)
+          return min(metrics.iterkeys())
       if self.app_metric_map.has_key(app_id):
         if self.app_metric_map.get(app_id).has_key(metric_id):
           metrics = self.app_metric_map.get(app_id).get(metric_id)
@@ -137,3 +142,48 @@ class ApplicationMetricMap:
     with self.lock:
       self.app_metric_map.clear()
   pass
+
+  # Align metrics by the minutes so that only complete minutes are send.
+  # Not completed minutes data points will be cached and posted when the 
minute will be completed.
+  # Cached metrics are merged with currently posting metrics
+  # e.g:
+  # first iteration if metrics from 00m15s to 01m15s are processed,
+  #               then metrics from 00m15s to 00m59s will be posted
+  #                        and from 01m00s to 01m15s will be cached
+  # second iteration   metrics from 01m25s to 02m55s are processed,
+  #     cached metrics from previous call will be merged with current,
+  #                    metrics from 01m00s to 02m55s will be posted, cache 
will be empty
+  def align_values_by_minute_mark(self, appId, metricId, metricData):
+    with self.lock:
+      # append with cached values
+      if self.cached_metric_map.get(appId) and 
self.cached_metric_map.get(appId).get(metricId):
+        metricData.update(self.cached_metric_map[appId][metricId])
+        self.cached_metric_map[appId].pop(metricId)
+
+      # check if needs to be cached
+      # in case there can't be any more datapoints in last minute just post 
the metrics,
+      # otherwise need to cut off and cache the last uncompleted minute
+      max_time = max(metricData.iterkeys())
+      if max_time % 60000 <= 60000 - 10000:
+        max_minute = max_time / 60000
+        metric_data_copy = metricData.copy()
+        for time,value in metric_data_copy.iteritems():
+          if time / 60000 == max_minute:
+            cached_metric_map = self.cached_metric_map.get(appId)
+            if not cached_metric_map:
+              cached_metric_map = { metricId : { time : value } }
+              self.cached_metric_map[ appId ] = cached_metric_map
+            else:
+              cached_metric_id_map = cached_metric_map.get(metricId)
+              if not cached_metric_id_map:
+                cached_metric_id_map = { time : value }
+                cached_metric_map[ metricId ] = cached_metric_id_map
+              else:
+                cached_metric_map[ metricId ].update( { time : value } )
+              pass
+            pass
+            metricData.pop(time)
+          pass
+        pass
+
+      return metricData
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py
 
b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py
index a956a78..d9ea55d 100644
--- 
a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py
+++ 
b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py
@@ -50,7 +50,7 @@ class TestApplicationMetricMap(TestCase):
     self.assertEqual(p['metrics'][0]['metrics'][str(timestamp)], 'bv')
     
     self.assertEqual(application_metric_map.get_start_time(application_id, 
"b"), timestamp)
-    
+
     metrics = {}
     metrics.update({"b" : 'bv'})
     metrics.update({"a" : 'av'})
@@ -71,4 +71,38 @@ class TestApplicationMetricMap(TestCase):
     json_data = json.loads(application_metric_map.flatten('A1', True))
     self.assertEqual(len(json_data['metrics']), 1)
     self.assertTrue(json_data['metrics'][0]['metricname'] == 'a')
-    self.assertFalse(application_metric_map.app_metric_map)
\ No newline at end of file
+    self.assertFalse(application_metric_map.app_metric_map)
+
+  def test_flatten_and_align_values_by_minute_mark(self):
+    application_metric_map = ApplicationMetricMap("host", "10.10.10.10")
+    second = 1000
+    timestamp = int(round(1415390640.3806491 * second))
+    application_id = application_metric_map.format_app_id("A","1")
+    metrics = {}
+    metrics.update({"b" : 'bv'})
+
+    #   0s    60s   120s
+    #   (0) (1)   (2)    (3)
+    # (3) should be cut off and cached
+    application_metric_map.put_metric(application_id, metrics, timestamp)
+    application_metric_map.put_metric(application_id, metrics, timestamp + 
second*24)
+    application_metric_map.put_metric(application_id, metrics, timestamp + 
second*84)
+    application_metric_map.put_metric(application_id, metrics, timestamp + 
second*124)
+
+    json_data = json.loads(application_metric_map.flatten(application_id, 
True))
+    self.assertEqual(len(json_data['metrics'][0]['metrics']), 3)
+    
self.assertEqual(len(application_metric_map.cached_metric_map.get(application_id).get("b")),
 1)
+    
self.assertEqual(application_metric_map.cached_metric_map.get(application_id).get("b"),
 {timestamp + second*124 : 'bv'})
+
+    #   120s    180s
+    #      (3)  (4)
+    # cached (3) should be added to the post;
+    # (4) should be posted as well because there can't be more data points in 
the minute
+    application_metric_map.put_metric(application_id, metrics, timestamp + 
second * 176)
+
+    json_data = json.loads(application_metric_map.flatten(application_id, 
True))
+    self.assertEqual(len(json_data['metrics'][0]['metrics']), 2)
+
+    # starttime should be set to (3)
+    self.assertEqual(json_data['metrics'][0]['starttime'], timestamp + 
second*124)
+    
self.assertEqual(len(application_metric_map.cached_metric_map.get(application_id)),
 0)

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index d0e385b..026eaf5 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -121,9 +121,6 @@ public class TimelineMetricConfiguration {
   public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL =
     "timeline.metrics.cluster.aggregator.second.timeslice.interval";
 
-  public static final String CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL =
-    "timeline.metrics.cluster.cache.aggregator.second.timeslice.interval";
-
   public static final String AGGREGATOR_CHECKPOINT_DELAY =
     "timeline.metrics.service.checkpointDelay";
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java
index aeaa4ba..6441c9c 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java
@@ -50,7 +50,6 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -60,12 +59,11 @@ import java.util.concurrent.locks.Lock;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_SINK_COLLECTION_PERIOD;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
@@ -77,7 +75,6 @@ public class TimelineMetricsIgniteCache implements 
TimelineMetricDistributedCach
       LogFactory.getLog(TimelineMetricsIgniteCache.class);
   private IgniteCache<TimelineClusterMetric, MetricClusterAggregate> 
igniteCache;
   private long cacheSliceIntervalMillis;
-  private int collectionPeriodMillis;
   private boolean interpolationEnabled;
   private List<String> skipAggrPatternStrings = new ArrayList<>();
   private List<String> appIdsToAggregate;
@@ -110,8 +107,7 @@ public class TimelineMetricsIgniteCache implements 
TimelineMetricDistributedCach
     //aggregation parameters
     appIdsToAggregate = 
timelineMetricConfiguration.getAppIdsForHostAggregation();
     interpolationEnabled = 
Boolean.parseBoolean(metricConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED,
 "true"));
-    collectionPeriodMillis = (int) 
SECONDS.toMillis(metricConf.getInt(TIMELINE_METRICS_SINK_COLLECTION_PERIOD, 
10));
-    cacheSliceIntervalMillis = 
SECONDS.toMillis(metricConf.getInt(CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL, 
30));
+    cacheSliceIntervalMillis = 
SECONDS.toMillis(metricConf.getInt(CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
     Long aggregationInterval = 
metricConf.getLong(CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120L);
 
     String filteredMetricPatterns = 
metricConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
@@ -215,12 +211,6 @@ public class TimelineMetricsIgniteCache implements 
TimelineMetricDistributedCach
 
       if (slicedClusterMetrics != null) {
         for (Map.Entry<TimelineClusterMetric, Double> metricDoubleEntry : 
slicedClusterMetrics.entrySet()) {
-          if (metricDoubleEntry.getKey().getTimestamp() == 
timeSlices.get(timeSlices.size()-1)[1] && 
metricDoubleEntry.getKey().getTimestamp() - metric.getMetricValues().lastKey() 
> collectionPeriodMillis) {
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("Last skipped timestamp @ " + new 
Date(metric.getMetricValues().lastKey()) + " slice timestamp @ " + new 
Date(metricDoubleEntry.getKey().getTimestamp()));
-            }
-            continue;
-          }
           MetricClusterAggregate newMetricClusterAggregate  = new 
MetricClusterAggregate(
               metricDoubleEntry.getValue(), 1, null, 
metricDoubleEntry.getValue(), metricDoubleEntry.getValue());
           //put app metric into cache

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
index b12cb86..b8338fb 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
@@ -223,7 +223,7 @@ public class AggregatorUtils {
    */
   public static Long getSliceTimeForMetric(List<Long[]> timeSlices, Long 
timestamp) {
     for (Long[] timeSlice : timeSlices) {
-      if (timestamp > timeSlice[0] && timestamp <= timeSlice[1]) {
+      if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
         return timeSlice[1];
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index c27d712..9e493ea 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -41,7 +41,6 @@ import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_DISABLED;
@@ -273,9 +272,6 @@ public class TimelineMetricAggregatorFactory {
     long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
       (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
 
-    long cacheTimeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
-      (CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL, 30));
-
     int checkpointCutOffMultiplier =
       
metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
 
@@ -297,8 +293,7 @@ public class TimelineMetricAggregatorFactory {
         120000l,
         timeSliceIntervalMillis,
         haController,
-        distributedCache,
-        cacheTimeSliceIntervalMillis
+        distributedCache
       );
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
index 0c030b6..888044a 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
@@ -31,19 +31,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getSliceTimeForMetric;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices;
 
 public class TimelineMetricClusterAggregatorSecondWithCacheSource extends 
TimelineMetricClusterAggregatorSecond {
   private TimelineMetricDistributedCache distributedCache;
-  private Long cacheTimeSliceIntervalMillis;
   public 
TimelineMetricClusterAggregatorSecondWithCacheSource(AggregationTaskRunner.AGGREGATOR_NAME
 metricAggregateSecond, TimelineMetricMetadataManager metricMetadataManager, 
PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String 
checkpointLocation, long sleepIntervalMillis, int checkpointCutOffMultiplier, 
String aggregatorDisabledParam, String inputTableName, String outputTableName,
                                                               Long 
nativeTimeRangeDelay,
                                                               Long 
timeSliceInterval,
-                                                              
MetricCollectorHAController haController, TimelineMetricDistributedCache 
distributedCache, Long cacheTimeSliceIntervalMillis) {
+                                                              
MetricCollectorHAController haController, TimelineMetricDistributedCache 
distributedCache) {
     super(metricAggregateSecond, metricMetadataManager, hBaseAccessor, 
metricsConf, checkpointLocation, sleepIntervalMillis, 
checkpointCutOffMultiplier, aggregatorDisabledParam, inputTableName, 
outputTableName, nativeTimeRangeDelay, timeSliceInterval, haController);
     this.distributedCache = distributedCache;
-    this.cacheTimeSliceIntervalMillis = cacheTimeSliceIntervalMillis;
   }
 
   @Override
@@ -81,36 +78,11 @@ public class 
TimelineMetricClusterAggregatorSecondWithCacheSource extends Timeli
 
   //Slices in cache could be different from aggregate slices, so need to 
recalculate. Counts hosted apps
   Map<TimelineClusterMetric, MetricClusterAggregate> 
aggregateMetricsFromMetricClusterAggregates(Map<TimelineClusterMetric, 
MetricClusterAggregate> metricsFromCache, List<Long[]> timeSlices) {
-    Map<TimelineClusterMetric, MetricClusterAggregate> result = new 
HashMap<>();
-
-    //normalize if slices in cache are different from the aggregation slices
-    //TODO add basic interpolation, current implementation assumes that 
cacheTimeSliceIntervalMillis <= timeSliceIntervalMillis
-    if (cacheTimeSliceIntervalMillis.equals(timeSliceIntervalMillis)) {
-      result = metricsFromCache;
-    } else {
-      for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> 
clusterMetricAggregateEntry : metricsFromCache.entrySet()) {
-        Long timestamp = getSliceTimeForMetric(timeSlices, 
clusterMetricAggregateEntry.getKey().getTimestamp());
-        if (timestamp <= 0) {
-          LOG.warn("Entry doesn't match any slice. Slices : " + timeSlices + " 
metric timestamp : " + clusterMetricAggregateEntry.getKey().getTimestamp());
-          continue;
-        }
-        TimelineClusterMetric timelineClusterMetric = new 
TimelineClusterMetric(clusterMetricAggregateEntry.getKey().getMetricName(), 
clusterMetricAggregateEntry.getKey().getAppId(), 
clusterMetricAggregateEntry.getKey().getInstanceId(), timestamp);
-        if (result.containsKey(timelineClusterMetric)) {
-          MetricClusterAggregate metricClusterAggregate = 
result.get(timelineClusterMetric);
-          
metricClusterAggregate.updateMax(clusterMetricAggregateEntry.getValue().getMax());
-          
metricClusterAggregate.updateMin(clusterMetricAggregateEntry.getValue().getMin());
-          metricClusterAggregate.setSum((metricClusterAggregate.getSum() + 
clusterMetricAggregateEntry.getValue().getSum()) / 2D);
-          
metricClusterAggregate.setNumberOfHosts(Math.max(metricClusterAggregate.getNumberOfHosts(),
 clusterMetricAggregateEntry.getValue().getNumberOfHosts()));
-        } else {
-          result.put(timelineClusterMetric, 
clusterMetricAggregateEntry.getValue());
-        }
-      }
-    }
-
+    //TODO add basic interpolation
     //TODO investigate if needed, maybe add config to disable/enable
     //count hosted apps
     Map<String, MutableInt> hostedAppCounter = new HashMap<>();
-    for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> 
clusterMetricAggregateEntry : result.entrySet()) {
+    for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> 
clusterMetricAggregateEntry : metricsFromCache.entrySet()) {
       int numHosts = clusterMetricAggregateEntry.getValue().getNumberOfHosts();
       String appId = clusterMetricAggregateEntry.getKey().getAppId();
       if (!hostedAppCounter.containsKey(appId)) {
@@ -124,9 +96,9 @@ public class 
TimelineMetricClusterAggregatorSecondWithCacheSource extends Timeli
     }
 
     // Add liveHosts per AppId metrics.
-    processLiveAppCountMetrics(result, hostedAppCounter, 
timeSlices.get(timeSlices.size() - 1)[1]);
+    processLiveAppCountMetrics(metricsFromCache, hostedAppCounter, 
timeSlices.get(timeSlices.size() - 1)[1]);
 
-    return result;
+    return metricsFromCache;
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
index d3c6061..2cb66ba 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java
@@ -167,62 +167,6 @@ public class TimelineMetricsIgniteCacheTest {
     metricValues.clear();
     timelineMetrics.clear();
 
-    /*
-
-    0      +30s    +60s    +90s
-    |       |       |       |
-     (1)      (2)                h1
-                (3)       (4)    h2
-                 (5)      (6)    h1
-
-    */
-    // Case 3 : merging host data points, ignore (2) for h1 as it will 
conflict with (5), two hosts.
-    metricValues = new TreeMap<>();
-    metricValues.put(startTime + 15*seconds, 1.0);
-    metricValues.put(startTime + 45*seconds, 2.0);
-    timelineMetric = new TimelineMetric("metric1", "host1", "app1", 
"instance1");
-    timelineMetric.setMetricValues(metricValues);
-    timelineMetrics.add(timelineMetric);
-
-    metricValues = new TreeMap<>();
-    metricValues.put(startTime + 45*seconds, 3.0);
-    metricValues.put(startTime + 85*seconds, 4.0);
-    timelineMetric = new TimelineMetric("metric1", "host2", "app1", 
"instance1");
-    timelineMetric.setMetricValues(metricValues);
-    timelineMetrics.add(timelineMetric);
-
-    metricValues = new TreeMap<>();
-    metricValues.put(startTime + 55*seconds, 5.0);
-    metricValues.put(startTime + 85*seconds, 6.0);
-    timelineMetric = new TimelineMetric("metric1", "host1", "app1", 
"instance1");
-    timelineMetric.setMetricValues(metricValues);
-    timelineMetrics.add(timelineMetric);
-
-    timelineMetricsIgniteCache.putMetrics(timelineMetrics, 
metricMetadataManagerMock);
-
-    aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, 
startTime + 120*seconds);
-
-    Assert.assertEquals(aggregateMap.size(), 3);
-    timelineClusterMetric = new 
TimelineClusterMetric(timelineMetric.getMetricName(),
-      timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 
30*seconds);
-
-    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
-    Assert.assertEquals(1.0, aggregateMap.get(timelineClusterMetric).getSum());
-    Assert.assertEquals(1, 
aggregateMap.get(timelineClusterMetric).getNumberOfHosts());
-
-    timelineClusterMetric.setTimestamp(startTime + 2*30*seconds);
-    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
-    Assert.assertEquals(8.0, aggregateMap.get(timelineClusterMetric).getSum());
-    Assert.assertEquals(2, 
aggregateMap.get(timelineClusterMetric).getNumberOfHosts());
-
-    timelineClusterMetric.setTimestamp(startTime + 3*30*seconds);
-    Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric));
-    Assert.assertEquals(10.0, 
aggregateMap.get(timelineClusterMetric).getSum());
-    Assert.assertEquals(2, 
aggregateMap.get(timelineClusterMetric).getNumberOfHosts());
-
-    metricValues.clear();
-    timelineMetrics.clear();
-
     Assert.assertEquals(0d, 
timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize"));
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
index 7cddb00..e8a9dc2 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
@@ -79,7 +79,7 @@ public class 
TimelineMetricClusterAggregatorSecondWithCacheSourceTest {
     TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = 
new TimelineMetricClusterAggregatorSecondWithCacheSource(
         METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, 
configuration, null,
         aggregatorInterval, 2, "false", "", "", aggregatorInterval,
-        sliceInterval, null, timelineMetricsIgniteCache, 30L);
+        sliceInterval, null, timelineMetricsIgniteCache);
 
     long now = System.currentTimeMillis();
     long startTime = now - 120000;
@@ -112,67 +112,4 @@ public class 
TimelineMetricClusterAggregatorSecondWithCacheSourceTest {
     Assert.assertEquals(2d, a1.getSum());
     Assert.assertEquals(5d, a2.getSum());
   }
-
-  @Test
-  public void testSlicesRecalculation() throws Exception {
-    long aggregatorInterval = 120000;
-    long sliceInterval = 30000;
-
-    Configuration configuration = new Configuration();
-
-    TimelineMetricMetadataManager metricMetadataManagerMock = 
createNiceMock(TimelineMetricMetadataManager.class);
-    
expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey)
 anyObject())).andReturn(null).anyTimes();
-    replay(metricMetadataManagerMock);
-
-    TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = 
new TimelineMetricClusterAggregatorSecondWithCacheSource(
-        METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, 
configuration, null,
-        aggregatorInterval, 2, "false", "", "", aggregatorInterval,
-        sliceInterval, null, timelineMetricsIgniteCache, 30L);
-
-    long seconds = 1000;
-    long now = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), 
120*seconds);
-    long startTime = now - 120*seconds;
-
-    Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = new 
HashMap<>();
-    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime 
+ 5 * seconds),
-        new MetricClusterAggregate(1.0, 2, 1.0, 1.0, 1.0));
-    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime 
+ 25 * seconds),
-        new MetricClusterAggregate(2.0, 2, 1.0, 2.0, 2.0));
-    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime 
+ 45 * seconds),
-        new MetricClusterAggregate(3.0, 2, 1.0, 1.0, 1.0));
-    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime 
+ 65 * seconds),
-        new MetricClusterAggregate(4.0, 2, 1.0, 4.0, 4.0));
-    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime 
+ 85 * seconds),
-        new MetricClusterAggregate(5.0, 2, 1.0, 5.0, 5.0));
-
-    List<Long[]> timeslices = getTimeSlices(startTime, startTime + 
120*seconds, 30*seconds);
-
-    Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = 
secondAggregator.aggregateMetricsFromMetricClusterAggregates(metricsFromCache, 
timeslices);
-
-    Assert.assertNotNull(aggregates);
-    Assert.assertEquals(4, aggregates.size());
-
-    TimelineClusterMetric timelineClusterMetric = new 
TimelineClusterMetric("m1", "a1", "i1", startTime + 30*seconds);
-    MetricClusterAggregate metricClusterAggregate = 
aggregates.get(timelineClusterMetric);
-    Assert.assertNotNull(metricClusterAggregate);
-    Assert.assertEquals(1.5, metricClusterAggregate.getSum());
-    Assert.assertEquals(1d, metricClusterAggregate.getMin());
-    Assert.assertEquals(2d, metricClusterAggregate.getMax());
-    Assert.assertEquals(2, metricClusterAggregate.getNumberOfHosts());
-
-    timelineClusterMetric.setTimestamp(startTime + 60*seconds);
-    metricClusterAggregate = aggregates.get(timelineClusterMetric);
-    Assert.assertNotNull(metricClusterAggregate);
-    Assert.assertEquals(3d, metricClusterAggregate.getSum());
-
-    timelineClusterMetric.setTimestamp(startTime + 90*seconds);
-    metricClusterAggregate = aggregates.get(timelineClusterMetric);
-    Assert.assertNotNull(metricClusterAggregate);
-    Assert.assertEquals(4.5d, metricClusterAggregate.getSum());
-
-    timelineClusterMetric = new TimelineClusterMetric("live_hosts", "a1", 
null, startTime + 120*seconds);
-    metricClusterAggregate = aggregates.get(timelineClusterMetric);
-    Assert.assertNotNull(metricClusterAggregate);
-    Assert.assertEquals(2d, metricClusterAggregate.getSum());
-  }
 }

Reply via email to