Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 c6f4c5136 -> bc698197c


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc698197/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
new file mode 100644
index 0000000..ace218b
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -0,0 +1,635 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+
+/**
+ * Tests the FlowRun and FlowActivity Tables
+ */
+public class TestHBaseStorageFlowRunCompaction {
+
+  private static HBaseTestingUtility util;
+
+  private final String metric1 = "MAP_SLOT_MILLIS";
+  private final String metric2 = "HDFS_BYTES_READ";
+
+  private final byte[] aRowKey = Bytes.toBytes("a");
+  private final byte[] aFamily = Bytes.toBytes("family");
+  private final byte[] aQualifier = Bytes.toBytes("qualifier");
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    util.startMiniCluster();
+    createSchema();
+  }
+
+  private static void createSchema() throws IOException {
+    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+  }
+
+  @Test
+  public void testWriteFlowRunCompaction() throws Exception {
+    String cluster = "kompaction_cluster1";
+    String user = "kompaction_FlowRun__user1";
+    String flow = "kompaction_flowRun_flow_name";
+    String flowVersion = "AF1021C19F1351";
+    long runid = 1449526652000L;
+
+    int start = 10;
+    int count = 2000;
+    int appIdSuffix = 1;
+    HBaseTimelineWriterImpl hbi = null;
+    long insertTs = System.currentTimeMillis() - count;
+    Configuration c1 = util.getConfiguration();
+    TimelineEntities te1 = null;
+    TimelineEntity entityApp1 = null;
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      // now insert count * ( 100 + 100) metrics
+      // each call to getEntityMetricsApp1 brings back 100 values
+      // of metric1 and 100 of metric2
+      for (int i = start; i < start + count; i++) {
+        String appName = "application_10240000000000_" + appIdSuffix;
+        insertTs++;
+        te1 = new TimelineEntities();
+        entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1);
+        te1.addEntity(entityApp1);
+        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+
+        appName = "application_2048000000000_7" + appIdSuffix;
+        insertTs++;
+        te1 = new TimelineEntities();
+        entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs);
+        te1.addEntity(entityApp1);
+        hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+      }
+    } finally {
+      String appName = "application_10240000000000_" + appIdSuffix;
+      te1 = new TimelineEntities();
+      entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1Complete(
+          insertTs + 1, c1);
+      te1.addEntity(entityApp1);
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+      hbi.flush();
+      hbi.close();
+    }
+
+    // check in flow run table
+    HRegionServer server = util.getRSForFirstRegionInTable(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    List<HRegion> regions = server.getOnlineRegions(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    assertTrue("Didn't find any regions for primary table!", regions.size() > 
0);
+    // flush and compact all the regions of the primary table
+    for (HRegion region : regions) {
+       region.flushcache();
+      region.compactStores(true);
+    }
+
+    // check flow run for one flow many apps
+    checkFlowRunTable(cluster, user, flow, runid, c1, 3);
+  }
+
+
+  private void checkFlowRunTable(String cluster, String user, String flow,
+      long runid, Configuration c1, int valueCount) throws IOException {
+    Scan s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+    s.setStartRow(startRow);
+    String clusterStop = cluster + "1";
+    byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
+    s.setStopRow(stopRow);
+    Connection conn = ConnectionFactory.createConnection(c1);
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    ResultScanner scanner = table1.getScanner(s);
+
+    int rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO
+          .getBytes());
+      assertEquals(valueCount, values.size());
+
+      rowCount++;
+      // check metric1
+      byte[] q = ColumnHelper.getColumnQualifier(
+          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
+      assertTrue(values.containsKey(q));
+      assertEquals(141, Bytes.toLong(values.get(q)));
+
+      // check metric2
+      q = ColumnHelper.getColumnQualifier(
+          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
+      assertTrue(values.containsKey(q));
+      assertEquals(57, Bytes.toLong(values.get(q)));
+    }
+    assertEquals(1, rowCount);
+  }
+
+
+  private FlowScanner getFlowScannerForTestingCompaction() {
+    // create a FlowScanner object with the sole purpose of invoking a process
+    // summation;
+    CompactionRequest request = new CompactionRequest();
+    request.setIsMajor(true, true);
+    // okay to pass in nulls for the constructor arguments
+    // because all we want to do is invoke the process summation
+    FlowScanner fs = new FlowScanner(null, -1, null,
+        (request.isMajor() == true ? FlowScannerOperation.MAJOR_COMPACTION
+            : FlowScannerOperation.MINOR_COMPACTION));
+    assertNotNull(fs);
+    return fs;
+  }
+
+  @Test
+  public void checkProcessSummationMoreCellsSumFinal2()
+      throws IOException {
+    long cellValue1 = 1236L;
+    long cellValue2 = 28L;
+    long cellValue3 = 1236L;
+    long cellValue4 = 1236L;
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+
+    // note down the current timestamp
+    long currentTimestamp = System.currentTimeMillis();
+    long cell1Ts = 1200120L;
+    long cell2Ts = TimestampGenerator.getSupplementedTimestamp(
+        System.currentTimeMillis(),"application_123746661110_11202");
+    long cell3Ts = 1277719L;
+    long cell4Ts = currentTimestamp - 10;
+
+    SortedSet<Cell> currentColumnCells = new 
TreeSet<Cell>(KeyValue.COMPARATOR);
+
+    List<Tag> tags = new ArrayList<>();
+    Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+        "application_1234588888_91188");
+    tags.add(t);
+    byte[] tagByteArray = Tag.fromList(tags);
+    // create a cell with a VERY old timestamp and attribute SUM_FINAL
+    Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
+    currentColumnCells.add(c1);
+
+    tags = new ArrayList<>();
+    t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+        "application_12700000001_29102");
+    tags.add(t);
+    tagByteArray = Tag.fromList(tags);
+    // create a cell with a recent timestamp and attribute SUM_FINAL
+    Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
+    currentColumnCells.add(c2);
+
+    tags = new ArrayList<>();
+    t = new Tag(AggregationOperation.SUM.getTagType(),
+        "application_191780000000001_8195");
+    tags.add(t);
+    tagByteArray = Tag.fromList(tags);
+    // create a cell with a VERY old timestamp but has attribute SUM
+    Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
+    currentColumnCells.add(c3);
+
+    tags = new ArrayList<>();
+    t = new Tag(AggregationOperation.SUM.getTagType(),
+        "application_191780000000001_98104");
+    tags.add(t);
+    tagByteArray = Tag.fromList(tags);
+    // create a cell with a VERY old timestamp but has attribute SUM
+    Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
+    currentColumnCells.add(c4);
+
+    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
+        LongConverter.getInstance(), currentTimestamp);
+    assertNotNull(cells);
+
+    // we should be getting back 4 cells
+    // one is the flow sum cell
+    // two are the cells with SUM attribute
+    // one cell with SUM_FINAL
+    assertEquals(4, cells.size());
+
+    for (int i = 0; i < cells.size(); i++) {
+      Cell returnedCell = cells.get(0);
+      assertNotNull(returnedCell);
+
+      long returnTs = returnedCell.getTimestamp();
+      long returnValue = Bytes.toLong(CellUtil
+          .cloneValue(returnedCell));
+      if (returnValue == cellValue2) {
+        assertTrue(returnTs == cell2Ts);
+      } else if (returnValue == cellValue3) {
+        assertTrue(returnTs == cell3Ts);
+      } else if (returnValue == cellValue4) {
+        assertTrue(returnTs == cell4Ts);
+      } else if (returnValue == cellValue1) {
+        assertTrue(returnTs != cell1Ts);
+        assertTrue(returnTs > cell1Ts);
+        assertTrue(returnTs >= currentTimestamp);
+      } else {
+        // raise a failure since we expect only these two values back
+        Assert.fail();
+      }
+    }
+  }
+
+  // tests with many cells
+  // of type SUM and SUM_FINAL
+  // all cells of SUM_FINAL will expire
+  @Test
+  public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+    int count = 200000;
+
+    long cellValueFinal = 1000L;
+    long cellValueNotFinal = 28L;
+
+    // note down the current timestamp
+    long currentTimestamp = System.currentTimeMillis();
+    long cellTsFinalStart = 10001120L;
+    long cellTsFinal = cellTsFinalStart;
+    long cellTsNotFinalStart = currentTimestamp - 5;
+    long cellTsNotFinal = cellTsNotFinalStart;
+
+    SortedSet<Cell> currentColumnCells = new 
TreeSet<Cell>(KeyValue.COMPARATOR);
+    List<Tag> tags = null;
+    Tag t = null;
+    Cell c1 = null;
+
+    // insert SUM_FINAL cells
+    for (int i = 0; i < count; i++) {
+      tags = new ArrayList<>();
+      t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+          "application_123450000" + i + "01_19" + i);
+      tags.add(t);
+      byte[] tagByteArray = Tag.fromList(tags);
+      // create a cell with a VERY old timestamp and attribute SUM_FINAL
+      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+          cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
+      currentColumnCells.add(c1);
+      cellTsFinal++;
+    }
+
+    // add SUM cells
+    for (int i = 0; i < count; i++) {
+      tags = new ArrayList<>();
+      t = new Tag(AggregationOperation.SUM.getTagType(),
+          "application_1987650000" + i + "83_911" + i);
+      tags.add(t);
+      byte[] tagByteArray = Tag.fromList(tags);
+      // create a cell with attribute SUM
+      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+          cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
+      currentColumnCells.add(c1);
+      cellTsNotFinal++;
+    }
+
+    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
+        LongConverter.getInstance(), currentTimestamp);
+    assertNotNull(cells);
+
+    // we should be getting back count + 1 cells
+    // one is the flow sum cell
+    // others are the cells with SUM attribute
+    assertEquals(count + 1, cells.size());
+
+    for (int i = 0; i < cells.size(); i++) {
+      Cell returnedCell = cells.get(0);
+      assertNotNull(returnedCell);
+
+      long returnTs = returnedCell.getTimestamp();
+      long returnValue = Bytes.toLong(CellUtil
+          .cloneValue(returnedCell));
+      if (returnValue == (count * cellValueFinal)) {
+        assertTrue(returnTs > (cellTsFinalStart + count));
+        assertTrue(returnTs >= currentTimestamp);
+      } else if ((returnValue >= cellValueNotFinal)
+          && (returnValue <= cellValueNotFinal * count)) {
+        assertTrue(returnTs >= cellTsNotFinalStart);
+        assertTrue(returnTs <= cellTsNotFinalStart * count);
+      } else {
+        // raise a failure since we expect only these values back
+        Assert.fail();
+      }
+    }
+  }
+
+  // tests with many cells
+  // of type SUM and SUM_FINAL
+  // NOT cells of SUM_FINAL will expire
+  @Test
+  public void checkProcessSummationMoreCellsSumFinalVariedTags() throws 
IOException {
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+    int countFinal = 20100;
+    int countNotFinal = 1000;
+    int countFinalNotExpire = 7009;
+
+    long cellValueFinal = 1000L;
+    long cellValueNotFinal = 28L;
+
+    // note down the current timestamp
+    long currentTimestamp = System.currentTimeMillis();
+    long cellTsFinalStart = 10001120L;
+    long cellTsFinal = cellTsFinalStart;
+
+    long cellTsFinalStartNotExpire = 
TimestampGenerator.getSupplementedTimestamp(
+        System.currentTimeMillis(), "application_10266666661166_118821");
+    long cellTsFinalNotExpire = cellTsFinalStartNotExpire;
+
+    long cellTsNotFinalStart = currentTimestamp - 5;
+    long cellTsNotFinal = cellTsNotFinalStart;
+
+    SortedSet<Cell> currentColumnCells = new 
TreeSet<Cell>(KeyValue.COMPARATOR);
+    List<Tag> tags = null;
+    Tag t = null;
+    Cell c1 = null;
+
+    // insert SUM_FINAL cells which will expire
+    for (int i = 0; i < countFinal; i++) {
+      tags = new ArrayList<>();
+      t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+          "application_123450000" + i + "01_19" + i);
+      tags.add(t);
+      byte[] tagByteArray = Tag.fromList(tags);
+      // create a cell with a VERY old timestamp and attribute SUM_FINAL
+      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+          cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
+      currentColumnCells.add(c1);
+      cellTsFinal++;
+    }
+
+    // insert SUM_FINAL cells which will NOT expire
+    for (int i = 0; i < countFinalNotExpire; i++) {
+      tags = new ArrayList<>();
+      t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+          "application_123450000" + i + "01_19" + i);
+      tags.add(t);
+      byte[] tagByteArray = Tag.fromList(tags);
+      // create a cell with a VERY old timestamp and attribute SUM_FINAL
+      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+          cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
+      currentColumnCells.add(c1);
+      cellTsFinalNotExpire++;
+    }
+
+    // add SUM cells
+    for (int i = 0; i < countNotFinal; i++) {
+      tags = new ArrayList<>();
+      t = new Tag(AggregationOperation.SUM.getTagType(),
+          "application_1987650000" + i + "83_911" + i);
+      tags.add(t);
+      byte[] tagByteArray = Tag.fromList(tags);
+      // create a cell with attribute SUM
+      c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+          cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
+      currentColumnCells.add(c1);
+      cellTsNotFinal++;
+    }
+
+    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
+        LongConverter.getInstance(), currentTimestamp);
+    assertNotNull(cells);
+
+    // we should be getting back
+    // countNotFinal + countFinalNotExpire + 1 cells
+    // one is the flow sum cell
+    // count = the cells with SUM attribute
+    // count = the cells with SUM_FINAL attribute but not expired
+    assertEquals(countFinalNotExpire + countNotFinal + 1, cells.size());
+
+    for (int i = 0; i < cells.size(); i++) {
+      Cell returnedCell = cells.get(0);
+      assertNotNull(returnedCell);
+
+      long returnTs = returnedCell.getTimestamp();
+      long returnValue = Bytes.toLong(CellUtil
+          .cloneValue(returnedCell));
+      if (returnValue == (countFinal * cellValueFinal)) {
+        assertTrue(returnTs > (cellTsFinalStart + countFinal));
+        assertTrue(returnTs >= currentTimestamp);
+      } else if (returnValue == cellValueNotFinal) {
+        assertTrue(returnTs >= cellTsNotFinalStart);
+        assertTrue(returnTs <= cellTsNotFinalStart + countNotFinal);
+      } else if (returnValue == cellValueFinal){
+        assertTrue(returnTs >= cellTsFinalStartNotExpire);
+        assertTrue(returnTs <= cellTsFinalStartNotExpire + 
countFinalNotExpire);
+      } else {
+        // raise a failure since we expect only these values back
+        Assert.fail();
+      }
+    }
+  }
+
+  @Test
+  public void testProcessSummationMoreCellsSumFinal() throws IOException {
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+    // note down the current timestamp
+    long currentTimestamp = System.currentTimeMillis();
+    long cellValue1 = 1236L;
+    long cellValue2 = 28L;
+
+    List<Tag> tags = new ArrayList<>();
+    Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+        "application_1234588888_999888");
+    tags.add(t);
+    byte[] tagByteArray = Tag.fromList(tags);
+    SortedSet<Cell> currentColumnCells = new 
TreeSet<Cell>(KeyValue.COMPARATOR);
+
+    // create a cell with a VERY old timestamp and attribute SUM_FINAL
+    Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        120L, Bytes.toBytes(cellValue1), tagByteArray);
+    currentColumnCells.add(c1);
+
+    tags = new ArrayList<>();
+    t = new Tag(AggregationOperation.SUM.getTagType(),
+        "application_100000000001_119101");
+    tags.add(t);
+    tagByteArray = Tag.fromList(tags);
+
+    // create a cell with a VERY old timestamp but has attribute SUM
+    Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        130L, Bytes.toBytes(cellValue2), tagByteArray);
+    currentColumnCells.add(c2);
+    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
+        LongConverter.getInstance(), currentTimestamp);
+    assertNotNull(cells);
+
+    // we should be getting back two cells
+    // one is the flow sum cell
+    // another is the cell with SUM attribute
+    assertEquals(2, cells.size());
+
+    Cell returnedCell = cells.get(0);
+    assertNotNull(returnedCell);
+    long inputTs1 = c1.getTimestamp();
+    long inputTs2 = c2.getTimestamp();
+
+    long returnTs = returnedCell.getTimestamp();
+    long returnValue = Bytes.toLong(CellUtil
+        .cloneValue(returnedCell));
+    // the returned Ts will be far greater than input ts as well as the noted
+    // current timestamp
+    if (returnValue == cellValue2) {
+      assertTrue(returnTs == inputTs2);
+    } else if (returnValue == cellValue1) {
+      assertTrue(returnTs >= currentTimestamp);
+      assertTrue(returnTs != inputTs1);
+    } else {
+      // raise a failure since we expect only these two values back
+      Assert.fail();
+    }
+  }
+
+  @Test
+  public void testProcessSummationOneCellSumFinal() throws IOException {
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+
+    // note down the current timestamp
+    long currentTimestamp = System.currentTimeMillis();
+    List<Tag> tags = new ArrayList<>();
+    Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+        "application_123458888888_999888");
+    tags.add(t);
+    byte[] tagByteArray = Tag.fromList(tags);
+    SortedSet<Cell> currentColumnCells = new 
TreeSet<Cell>(KeyValue.COMPARATOR);
+
+    // create a cell with a VERY old timestamp
+    Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        120L, Bytes.toBytes(1110L), tagByteArray);
+    currentColumnCells.add(c1);
+
+    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
+        LongConverter.getInstance(), currentTimestamp);
+    assertNotNull(cells);
+    // we should not get the same cell back
+    // but we get back the flow cell
+    assertEquals(1, cells.size());
+
+    Cell returnedCell = cells.get(0);
+    // it's NOT the same cell
+    assertNotEquals(c1, returnedCell);
+    long inputTs = c1.getTimestamp();
+    long returnTs = returnedCell.getTimestamp();
+    // the returned Ts will be far greater than input ts as well as the noted
+    // current timestamp
+    assertTrue(returnTs > inputTs);
+    assertTrue(returnTs >= currentTimestamp);
+  }
+
+  @Test
+  public void testProcessSummationOneCell() throws IOException {
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+
+    // note down the current timestamp
+    long currentTimestamp = System.currentTimeMillis();
+
+    // try for 1 cell with tag SUM
+    List<Tag> tags = new ArrayList<>();
+    Tag t = new Tag(AggregationOperation.SUM.getTagType(),
+        "application_123458888888_999888");
+    tags.add(t);
+    byte[] tagByteArray = Tag.fromList(tags);
+
+    SortedSet<Cell> currentColumnCells = new 
TreeSet<Cell>(KeyValue.COMPARATOR);
+
+    Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+        currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
+    currentColumnCells.add(c1);
+    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
+        LongConverter.getInstance(), currentTimestamp);
+    assertNotNull(cells);
+    // we expect the same cell back
+    assertEquals(1, cells.size());
+    Cell c2 = cells.get(0);
+    assertEquals(c1, c2);
+    assertEquals(currentTimestamp, c2.getTimestamp());
+  }
+
+  @Test
+  public void testProcessSummationEmpty() throws IOException {
+    FlowScanner fs = getFlowScannerForTestingCompaction();
+    long currentTimestamp = System.currentTimeMillis();
+
+    SortedSet<Cell> currentColumnCells = null;
+    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
+        LongConverter.getInstance(), currentTimestamp);
+    assertNotNull(cells);
+    assertEquals(0, cells.size());
+
+    currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
+    cells = fs.processSummationMajorCompaction(currentColumnCells,
+        LongConverter.getInstance(), currentTimestamp);
+    assertNotNull(cells);
+    assertEquals(0, cells.size());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+}

Reply via email to