YARN-5109. timestamps are stored unencoded causing parse errors (Varun Saxena 
via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e2229377
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e2229377
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e2229377

Branch: refs/heads/YARN-2928
Commit: e2229377b0a4bcc54cff1dd4adf4e5b5c0a27bc1
Parents: 10b26bb
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu May 26 21:39:16 2016 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Thu May 26 21:39:16 2016 -0700

----------------------------------------------------------------------
 .../storage/TestHBaseTimelineStorage.java       | 145 ++++++---
 .../flow/TestHBaseStorageFlowActivity.java      |   8 +-
 .../reader/filter/TimelineFilterUtils.java      |  20 +-
 .../storage/HBaseTimelineWriterImpl.java        |  67 +++--
 .../application/ApplicationColumnPrefix.java    |  65 ++--
 .../storage/application/ApplicationRowKey.java  |  50 +---
 .../application/ApplicationRowKeyConverter.java | 130 ++++++++
 .../storage/apptoflow/AppToFlowRowKey.java      |  20 +-
 .../apptoflow/AppToFlowRowKeyConverter.java     |  96 ++++++
 .../storage/common/AppIdKeyConverter.java       | 101 +++++++
 .../storage/common/ColumnHelper.java            | 175 +++++------
 .../storage/common/ColumnPrefix.java            |  43 +--
 .../storage/common/EventColumnName.java         |  48 +++
 .../common/EventColumnNameConverter.java        | 105 +++++++
 .../storage/common/KeyConverter.java            |  41 +++
 .../storage/common/LongKeyConverter.java        |  68 +++++
 .../storage/common/Separator.java               | 198 ++++++++++++-
 .../storage/common/StringKeyConverter.java      |  59 ++++
 .../storage/common/TimelineStorageUtils.java    | 199 ++-----------
 .../storage/entity/EntityColumnPrefix.java      |  48 +--
 .../storage/entity/EntityRowKey.java            |  67 +----
 .../storage/entity/EntityRowKeyConverter.java   | 143 +++++++++
 .../storage/flow/FlowActivityColumnPrefix.java  |  38 +--
 .../storage/flow/FlowActivityRowKey.java        |  41 +--
 .../flow/FlowActivityRowKeyConverter.java       | 115 ++++++++
 .../storage/flow/FlowRunColumnPrefix.java       |  82 ++----
 .../storage/flow/FlowRunRowKey.java             |  41 +--
 .../storage/flow/FlowRunRowKeyConverter.java    | 120 ++++++++
 .../storage/flow/FlowScanner.java               |   9 +-
 .../reader/FlowActivityEntityReader.java        |  12 +-
 .../storage/reader/TimelineEntityReader.java    |   4 +-
 .../storage/common/TestKeyConverters.java       | 293 +++++++++++++++++++
 .../storage/common/TestSeparator.java           |  82 +++++-
 .../common/TestTimelineStorageUtils.java        |  56 ----
 34 files changed, 1988 insertions(+), 801 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 68135a0..bcf2d2c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
 import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -50,25 +50,28 @@ import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -482,7 +485,6 @@ public class TestHBaseTimelineStorage {
     }
   }
 
-
   @Test
   public void testWriteNullApplicationToHBase() throws Exception {
     TimelineEntities te = new TimelineEntities();
@@ -494,7 +496,7 @@ public class TestHBaseTimelineStorage {
 
     // add the info map in Timeline Entity
     Map<String, Object> infoMap = new HashMap<String, Object>();
-    infoMap.put("infoMapKey1", "infoMapValue1");
+    infoMap.put("in fo M apK  ey1", "infoMapValue1");
     infoMap.put("infoMapKey2", 10);
     entity.addInfo(infoMap);
 
@@ -517,6 +519,7 @@ public class TestHBaseTimelineStorage {
       // retrieve the row
       Scan scan = new Scan();
       scan.setStartRow(Bytes.toBytes(cluster));
+      scan.setStopRow(Bytes.toBytes(cluster + "1"));
       Connection conn = ConnectionFactory.createConnection(c1);
       ResultScanner resultScanner = new ApplicationTable()
           .getResultScanner(c1, conn, scan);
@@ -626,7 +629,7 @@ public class TestHBaseTimelineStorage {
       hbi.start();
       String cluster = "cluster_test_write_app";
       String user = "user1";
-      String flow = "some_flow_name";
+      String flow = "s!ome_f\tlow  _n am!e";
       String flowVersion = "AB7822C10F1111";
       long runid = 1002345678919L;
       hbi.write(cluster, user, flow, flowVersion, runid, appId, te);
@@ -670,7 +673,8 @@ public class TestHBaseTimelineStorage {
       assertEquals(cTime, cTime1);
 
       Map<String, Object> infoColumns =
-          ApplicationColumnPrefix.INFO.readResults(result);
+          ApplicationColumnPrefix.INFO.readResults(result,
+              StringKeyConverter.getInstance());
       assertEquals(infoMap, infoColumns);
 
       // Remember isRelatedTo is of type Map<String, Set<String>>
@@ -706,11 +710,13 @@ public class TestHBaseTimelineStorage {
 
       // Configuration
       Map<String, Object> configColumns =
-          ApplicationColumnPrefix.CONFIG.readResults(result);
+          ApplicationColumnPrefix.CONFIG.readResults(result,
+              StringKeyConverter.getInstance());
       assertEquals(conf, configColumns);
 
       NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
-          ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
+          ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(
+              result, StringKeyConverter.getInstance());
 
       NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
       matchMetrics(metricValues, metricMap);
@@ -868,7 +874,8 @@ public class TestHBaseTimelineStorage {
           assertEquals(cTime1, cTime);
 
           Map<String, Object> infoColumns =
-              EntityColumnPrefix.INFO.readResults(result);
+              EntityColumnPrefix.INFO.readResults(result,
+                  StringKeyConverter.getInstance());
           assertEquals(infoMap, infoColumns);
 
           // Remember isRelatedTo is of type Map<String, Set<String>>
@@ -906,11 +913,12 @@ public class TestHBaseTimelineStorage {
 
           // Configuration
           Map<String, Object> configColumns =
-              EntityColumnPrefix.CONFIG.readResults(result);
+              EntityColumnPrefix.CONFIG.readResults(result, 
StringKeyConverter.getInstance());
           assertEquals(conf, configColumns);
 
           NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
-              EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
+              EntityColumnPrefix.METRIC.readResultsWithTimestamps(
+                  result, StringKeyConverter.getInstance());
 
           NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
           matchMetrics(metricValues, metricMap);
@@ -963,7 +971,7 @@ public class TestHBaseTimelineStorage {
   }
 
   private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
-      String flow, long runid, String appName, TimelineEntity te) {
+      String flow, Long runid, String appName, TimelineEntity te) {
 
     EntityRowKey key = EntityRowKey.parseRowKey(rowKey);
 
@@ -978,7 +986,7 @@ public class TestHBaseTimelineStorage {
   }
 
   private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
-      String user, String flow, long runid, String appName) {
+      String user, String flow, Long runid, String appName) {
 
     ApplicationRowKey key = ApplicationRowKey.parseRowKey(rowKey);
 
@@ -995,7 +1003,7 @@ public class TestHBaseTimelineStorage {
     TimelineEvent event = new TimelineEvent();
     String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
     event.setId(eventId);
-    long expTs = 1436512802000L;
+    Long expTs = 1436512802000L;
     event.setTimestamp(expTs);
     String expKey = "foo_event";
     Object expVal = "test";
@@ -1038,20 +1046,18 @@ public class TestHBaseTimelineStorage {
       assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
           appName));
 
-      Map<?, Object> eventsResult =
-          ApplicationColumnPrefix.EVENT.
-              readResultsHavingCompoundColumnQualifiers(result);
+      Map<EventColumnName, Object> eventsResult =
+          ApplicationColumnPrefix.EVENT.readResults(result,
+              EventColumnNameConverter.getInstance());
       // there should be only one event
       assertEquals(1, eventsResult.size());
-      for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
+      for (Map.Entry<EventColumnName, Object> e : eventsResult.entrySet()) {
+        EventColumnName eventColumnName = e.getKey();
         // the qualifier is a compound key
         // hence match individual values
-        byte[][] karr = (byte[][])e.getKey();
-        assertEquals(3, karr.length);
-        assertEquals(eventId, Bytes.toString(karr[0]));
-        assertEquals(
-            TimelineStorageUtils.invertLong(expTs), Bytes.toLong(karr[1]));
-        assertEquals(expKey, Bytes.toString(karr[2]));
+        assertEquals(eventId, eventColumnName.getId());
+        assertEquals(expTs, eventColumnName.getTimestamp());
+        assertEquals(expKey, eventColumnName.getInfoKey());
         Object value = e.getValue();
         // there should be only one timestamp and value
         assertEquals(expVal, value.toString());
@@ -1076,7 +1082,7 @@ public class TestHBaseTimelineStorage {
       assertEquals(1, events.size());
       for (TimelineEvent e : events) {
         assertEquals(eventId, e.getId());
-        assertEquals(expTs, e.getTimestamp());
+        assertEquals(expTs, Long.valueOf(e.getTimestamp()));
         Map<String,Object> info = e.getInfo();
         assertEquals(1, info.size());
         for (Map.Entry<String, Object> infoEntry : info.entrySet()) {
@@ -1095,9 +1101,9 @@ public class TestHBaseTimelineStorage {
   @Test
   public void testEventsWithEmptyInfo() throws IOException {
     TimelineEvent event = new TimelineEvent();
-    String eventId = "foo_event_id";
+    String eventId = "foo_ev e  nt_id";
     event.setId(eventId);
-    long expTs = 1436512802000L;
+    Long expTs = 1436512802000L;
     event.setTimestamp(expTs);
 
     final TimelineEntity entity = new TimelineEntity();
@@ -1142,21 +1148,19 @@ public class TestHBaseTimelineStorage {
           assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
               entity));
 
-          Map<?, Object> eventsResult =
-              EntityColumnPrefix.EVENT.
-                  readResultsHavingCompoundColumnQualifiers(result);
+          Map<EventColumnName, Object> eventsResult =
+              EntityColumnPrefix.EVENT.readResults(result,
+                  EventColumnNameConverter.getInstance());
           // there should be only one event
           assertEquals(1, eventsResult.size());
-          for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
+          for (Map.Entry<EventColumnName, Object> e : eventsResult.entrySet()) 
{
+            EventColumnName eventColumnName = e.getKey();
             // the qualifier is a compound key
             // hence match individual values
-            byte[][] karr = (byte[][])e.getKey();
-            assertEquals(3, karr.length);
-            assertEquals(eventId, Bytes.toString(karr[0]));
-            assertEquals(TimelineStorageUtils.invertLong(expTs),
-                Bytes.toLong(karr[1]));
+            assertEquals(eventId, eventColumnName.getId());
+            assertEquals(expTs,eventColumnName.getTimestamp());
             // key must be empty
-            assertEquals(0, karr[2].length);
+            assertNull(eventColumnName.getInfoKey());
             Object value = e.getValue();
             // value should be empty
             assertEquals("", value.toString());
@@ -1184,7 +1188,7 @@ public class TestHBaseTimelineStorage {
       assertEquals(1, events.size());
       for (TimelineEvent e : events) {
         assertEquals(eventId, e.getId());
-        assertEquals(expTs, e.getTimestamp());
+        assertEquals(expTs, Long.valueOf(e.getTimestamp()));
         Map<String,Object> info = e.getInfo();
         assertTrue(info == null || info.isEmpty());
       }
@@ -1195,6 +1199,67 @@ public class TestHBaseTimelineStorage {
   }
 
   @Test
+  public void testEventsEscapeTs() throws IOException {
+    TimelineEvent event = new TimelineEvent();
+    String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
+    event.setId(eventId);
+    long expTs = 1463567041056L;
+    event.setTimestamp(expTs);
+    String expKey = "f==o o_e ve\tnt";
+    Object expVal = "test";
+    event.addInfo(expKey, expVal);
+
+    final TimelineEntity entity = new ApplicationEntity();
+    entity.setId(ApplicationId.newInstance(0, 1).toString());
+    entity.addEvent(event);
+
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entity);
+
+    HBaseTimelineWriterImpl hbi = null;
+    try {
+      Configuration c1 = util.getConfiguration();
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.start();
+      String cluster = "clus!ter_\ttest_ev  ents";
+      String user = "user2";
+      String flow = "other_flow_name";
+      String flowVersion = "1111F01C2287BA";
+      long runid = 1009876543218L;
+      String appName = "application_123465899910_2001";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
+      hbi.stop();
+
+      // read the timeline entity using the reader this time
+      TimelineEntity e1 = reader.getEntity(
+          new TimelineReaderContext(cluster, user, flow, runid, appName,
+          entity.getType(), entity.getId()),
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+      assertNotNull(e1);
+      // check the events
+      NavigableSet<TimelineEvent> events = e1.getEvents();
+      // there should be only one event
+      assertEquals(1, events.size());
+      for (TimelineEvent e : events) {
+        assertEquals(eventId, e.getId());
+        assertEquals(expTs, e.getTimestamp());
+        Map<String,Object> info = e.getInfo();
+        assertEquals(1, info.size());
+        for (Map.Entry<String, Object> infoEntry : info.entrySet()) {
+          assertEquals(expKey, infoEntry.getKey());
+          assertEquals(expVal, infoEntry.getValue());
+        }
+      }
+    } finally {
+      if (hbi != null) {
+        hbi.stop();
+        hbi.close();
+      }
+    }
+  }
+
+  @Test
   public void testNonIntegralMetricValues() throws IOException {
     TimelineEntities teApp = new TimelineEntities();
     ApplicationEntity entityApp = new ApplicationEntity();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index 6b23b6c..072332d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -170,7 +170,7 @@ public class TestHBaseStorageFlowActivity {
     assertEquals(cluster, flowActivityRowKey.getClusterId());
     assertEquals(user, flowActivityRowKey.getUserId());
     assertEquals(flow, flowActivityRowKey.getFlowName());
-    long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
+    Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
     assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
     assertEquals(1, values.size());
     checkFlowActivityRunId(runid, flowVersion, values);
@@ -194,7 +194,7 @@ public class TestHBaseStorageFlowActivity {
         assertEquals(cluster, flowActivity.getCluster());
         assertEquals(user, flowActivity.getUser());
         assertEquals(flow, flowActivity.getFlowName());
-        assertEquals(dayTs, flowActivity.getDate().getTime());
+        assertEquals(dayTs, Long.valueOf(flowActivity.getDate().getTime()));
         Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
         assertEquals(1, flowRuns.size());
       }
@@ -294,7 +294,7 @@ public class TestHBaseStorageFlowActivity {
       assertEquals(cluster, flowActivityRowKey.getClusterId());
       assertEquals(user, flowActivityRowKey.getUserId());
       assertEquals(flow, flowActivityRowKey.getFlowName());
-      long dayTs = 
TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+      Long dayTs = 
TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
       assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
       assertEquals(1, values.size());
       checkFlowActivityRunId(runid, flowVersion, values);
@@ -429,7 +429,7 @@ public class TestHBaseStorageFlowActivity {
       assertEquals(cluster, flowActivityRowKey.getClusterId());
       assertEquals(user, flowActivityRowKey.getUserId());
       assertEquals(flow, flowActivityRowKey.getFlowName());
-      long dayTs = 
TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
+      Long dayTs = 
TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
       assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
 
       Map<byte[], byte[]> values = result

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
index 8cae410..036746b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
@@ -31,10 +31,14 @@ import 
org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 
@@ -205,6 +209,17 @@ public final class TimelineFilterUtils {
     return singleColValFilter;
   }
 
+  private static <T> byte[] createColQualifierPrefix(ColumnPrefix<T> colPrefix,
+      String column) {
+    if (colPrefix == ApplicationColumnPrefix.EVENT ||
+        colPrefix == EntityColumnPrefix.EVENT) {
+      return EventColumnNameConverter.getInstance().encode(
+          new EventColumnName(column, null, null));
+    } else {
+      return StringKeyConverter.getInstance().encode(column);
+    }
+  }
+
   /**
    * Create a filter list of qualifier filters based on passed set of columns.
    *
@@ -219,8 +234,7 @@ public final class TimelineFilterUtils {
     for (String column : columns) {
       // For columns which have compound column qualifiers (eg. events), we 
need
       // to include the required separator.
-      byte[] compoundColQual =
-          colPrefix.getCompoundColQualBytes(column, (byte[])null);
+      byte[] compoundColQual = createColQualifierPrefix(colPrefix, column);
       list.addFilter(new QualifierFilter(CompareOp.EQUAL,
           new BinaryPrefixComparator(
               colPrefix.getColumnPrefixBytes(compoundColQual))));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index fe4671f..f8b5a65 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -37,7 +36,6 @@ import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
@@ -46,7 +44,11 @@ import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
@@ -194,7 +196,7 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
       long activityTimeStamp) throws IOException {
     byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, activityTimeStamp,
         userId, flowName);
-    byte[] qualifier = GenericObjectMapper.write(flowRunId);
+    byte[] qualifier = LongKeyConverter.getInstance().encode(flowRunId);
     FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
         null, flowVersion,
         AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
@@ -278,7 +280,8 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
   private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
       Attribute... attributes) throws IOException {
     for (TimelineMetric metric : metrics) {
-      String metricColumnQualifier = metric.getId();
+      byte[] metricColumnQualifier =
+          StringKeyConverter.getInstance().encode(metric.getId());
       Map<Long, Number> timeseries = metric.getValues();
       for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
         Long timestamp = timeseriesEntry.getKey();
@@ -316,8 +319,9 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
       // id3?id4?id5
       String compoundValue =
           Separator.VALUES.joinEncoded(connectedEntity.getValue());
-      columnPrefix.store(rowKey, table, connectedEntity.getKey(), null,
-          compoundValue);
+      columnPrefix.store(rowKey, table,
+          StringKeyConverter.getInstance().encode(connectedEntity.getKey()),
+          null, compoundValue);
     }
   }
 
@@ -337,7 +341,8 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
       if (info != null) {
         for (Map.Entry<String, Object> entry : info.entrySet()) {
           ApplicationColumnPrefix.INFO.store(rowKey, applicationTable,
-              entry.getKey(), null, entry.getValue());
+              StringKeyConverter.getInstance().encode(entry.getKey()), null,
+              entry.getValue());
         }
       }
     } else {
@@ -349,8 +354,9 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
       Map<String, Object> info = te.getInfo();
       if (info != null) {
         for (Map.Entry<String, Object> entry : info.entrySet()) {
-          EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(),
-              null, entry.getValue());
+          EntityColumnPrefix.INFO.store(rowKey, entityTable,
+              StringKeyConverter.getInstance().encode(entry.getKey()), null,
+              entry.getValue());
         }
       }
     }
@@ -365,11 +371,13 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
       return;
     }
     for (Map.Entry<String, String> entry : config.entrySet()) {
+      byte[] configKey =
+          StringKeyConverter.getInstance().encode(entry.getKey());
       if (isApplication) {
         ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable,
-            entry.getKey(), null, entry.getValue());
+            configKey, null, entry.getValue());
       } else {
-        EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(),
+        EntityColumnPrefix.CONFIG.store(rowKey, entityTable, configKey,
             null, entry.getValue());
       }
     }
@@ -383,7 +391,8 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
       boolean isApplication) throws IOException {
     if (metrics != null) {
       for (TimelineMetric metric : metrics) {
-        String metricColumnQualifier = metric.getId();
+        byte[] metricColumnQualifier =
+            StringKeyConverter.getInstance().encode(metric.getId());
         Map<Long, Number> timeseries = metric.getValues();
         for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
           Long timestamp = timeseriesEntry.getKey();
@@ -416,41 +425,31 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
                   "! Using the current timestamp");
               eventTimestamp = System.currentTimeMillis();
             }
-            byte[] eventTs =
-                Bytes.toBytes(TimelineStorageUtils.invertLong(eventTimestamp));
+            EventColumnNameConverter converter =
+                EventColumnNameConverter.getInstance();
             Map<String, Object> eventInfo = event.getInfo();
             if ((eventInfo == null) || (eventInfo.size() == 0)) {
+              byte[] columnQualifierBytes = converter.encode(
+                  new EventColumnName(eventId, eventTimestamp, null));
               if (isApplication) {
-                byte[] compoundColumnQualifierBytes =
-                    ApplicationColumnPrefix.EVENT.
-                        getCompoundColQualBytes(eventId, eventTs, null);
                 ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
-                    compoundColumnQualifierBytes, null,
-                    TimelineStorageUtils.EMPTY_BYTES);
+                    columnQualifierBytes, null, Separator.EMPTY_BYTES);
               } else {
-                byte[] compoundColumnQualifierBytes =
-                    EntityColumnPrefix.EVENT.
-                        getCompoundColQualBytes(eventId, eventTs, null);
                 EntityColumnPrefix.EVENT.store(rowKey, entityTable,
-                    compoundColumnQualifierBytes, null,
-                    TimelineStorageUtils.EMPTY_BYTES);
+                    columnQualifierBytes, null, Separator.EMPTY_BYTES);
               }
             } else {
               for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
-                // eventId?infoKey
-                byte[] infoKey = Bytes.toBytes(info.getKey());
+                // eventId=infoKey
+                byte[] columnQualifierBytes = converter.encode(
+                    new EventColumnName(eventId, eventTimestamp,
+                        info.getKey()));
                 if (isApplication) {
-                  byte[] compoundColumnQualifierBytes =
-                      ApplicationColumnPrefix.EVENT.
-                          getCompoundColQualBytes(eventId, eventTs, infoKey);
                   ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
-                      compoundColumnQualifierBytes, null, info.getValue());
+                      columnQualifierBytes, null, info.getValue());
                 } else {
-                  byte[] compoundColumnQualifierBytes =
-                      EntityColumnPrefix.EVENT.
-                          getCompoundColQualBytes(eventId, eventTs, infoKey);
                   EntityColumnPrefix.EVENT.store(rowKey, entityTable,
-                      compoundColumnQualifierBytes, null, info.getValue());
+                      columnQualifierBytes, null, info.getValue());
                 }
               } // for info: eventInfo
             }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
index 1dfc4db..0febc67 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
@@ -27,9 +27,10 @@ import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
@@ -56,7 +57,7 @@ public enum ApplicationColumnPrefix implements 
ColumnPrefix<ApplicationTable> {
   /**
    * Lifecycle events for an application.
    */
-  EVENT(ApplicationColumnFamily.INFO, "e", true),
+  EVENT(ApplicationColumnFamily.INFO, "e"),
 
   /**
    * Config column stores configuration with config key as the column name.
@@ -78,7 +79,6 @@ public enum ApplicationColumnPrefix implements 
ColumnPrefix<ApplicationTable> {
    */
   private final String columnPrefix;
   private final byte[] columnPrefixBytes;
-  private final boolean compoundColQual;
 
   /**
    * Private constructor, meant to be used by the enum definition.
@@ -88,18 +88,7 @@ public enum ApplicationColumnPrefix implements 
ColumnPrefix<ApplicationTable> {
    */
   private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
       String columnPrefix) {
-    this(columnFamily, columnPrefix, false, GenericConverter.getInstance());
-  }
-
-  private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
-      String columnPrefix, boolean compoundColQual) {
-    this(columnFamily, columnPrefix, compoundColQual,
-        GenericConverter.getInstance());
-  }
-
-  private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
-      String columnPrefix, ValueConverter converter) {
-    this(columnFamily, columnPrefix, false, converter);
+    this(columnFamily, columnPrefix, GenericConverter.getInstance());
   }
 
   /**
@@ -111,7 +100,7 @@ public enum ApplicationColumnPrefix implements 
ColumnPrefix<ApplicationTable> {
    * this column prefix.
    */
   private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
-      String columnPrefix, boolean compoundColQual, ValueConverter converter) {
+      String columnPrefix, ValueConverter converter) {
     column = new ColumnHelper<ApplicationTable>(columnFamily, converter);
     this.columnFamily = columnFamily;
     this.columnPrefix = columnPrefix;
@@ -122,7 +111,6 @@ public enum ApplicationColumnPrefix implements 
ColumnPrefix<ApplicationTable> {
       this.columnPrefixBytes =
           Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
     }
-    this.compoundColQual = compoundColQual;
   }
 
   /**
@@ -149,15 +137,6 @@ public enum ApplicationColumnPrefix implements 
ColumnPrefix<ApplicationTable> {
     return columnFamily.getBytes();
   }
 
-  @Override
-  public byte[] getCompoundColQualBytes(String qualifier,
-      byte[]...components) {
-    if (!compoundColQual) {
-      return ColumnHelper.getColumnQualifier(null, qualifier);
-    }
-    return ColumnHelper.getCompoundColumnQualifierBytes(qualifier, components);
-  }
-
   /*
    * (non-Javadoc)
    *
@@ -232,25 +211,12 @@ public enum ApplicationColumnPrefix implements 
ColumnPrefix<ApplicationTable> {
    *
    * @see
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
-   * #readResults(org.apache.hadoop.hbase.client.Result)
-   */
-  public Map<String, Object> readResults(Result result) throws IOException {
-    return column.readResults(result, columnPrefixBytes);
-  }
-
-  /**
-   * @param result from which to read columns
-   * @return the latest values of columns in the column family. The column
-   *         qualifier is returned as a list of parts, each part a byte[]. This
-   *         is to facilitate returning byte arrays of values that were not
-   *         Strings. If they can be treated as Strings, you should use
-   *         {@link #readResults(Result)} instead.
-   * @throws IOException if any problem occurs while reading results.
+   * #readResults(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
    */
-  public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result 
result)
-      throws IOException {
-    return column.readResultsHavingCompoundColumnQualifiers(result,
-        columnPrefixBytes);
+  public <K> Map<K, Object> readResults(Result result,
+      KeyConverter<K> keyConverter) throws IOException {
+    return column.readResults(result, columnPrefixBytes, keyConverter);
   }
 
   /*
@@ -258,11 +224,14 @@ public enum ApplicationColumnPrefix implements 
ColumnPrefix<ApplicationTable> {
    *
    * @see
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
-   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
    */
-  public <V> NavigableMap<String, NavigableMap<Long, V>>
-      readResultsWithTimestamps(Result result) throws IOException {
-    return column.readResultsWithTimestamps(result, columnPrefixBytes);
+  public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
+      throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes,
+        keyConverter);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
index ad2aa7a..e476b21 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
@@ -15,11 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.timelineservice.storage.application;
 
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
 
 /**
  * Represents a rowkey for the application table.
@@ -28,11 +25,11 @@ public class ApplicationRowKey {
   private final String clusterId;
   private final String userId;
   private final String flowName;
-  private final long flowRunId;
+  private final Long flowRunId;
   private final String appId;
 
   public ApplicationRowKey(String clusterId, String userId, String flowName,
-      long flowRunId, String appId) {
+      Long flowRunId, String appId) {
     this.clusterId = clusterId;
     this.userId = userId;
     this.flowName = flowName;
@@ -52,7 +49,7 @@ public class ApplicationRowKey {
     return flowName;
   }
 
-  public long getFlowRunId() {
+  public Long getFlowRunId() {
     return flowRunId;
   }
 
@@ -71,9 +68,8 @@ public class ApplicationRowKey {
    */
   public static byte[] getRowKeyPrefix(String clusterId, String userId,
       String flowName) {
-    byte[] first = Bytes.toBytes(
-        Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowName));
-    return Separator.QUALIFIERS.join(first, new byte[0]);
+    return ApplicationRowKeyConverter.getInstance().encode(
+        new ApplicationRowKey(clusterId, userId, flowName, null, null));
   }
 
   /**
@@ -88,10 +84,8 @@ public class ApplicationRowKey {
    */
   public static byte[] getRowKeyPrefix(String clusterId, String userId,
       String flowName, Long flowRunId) {
-    byte[] first = Bytes.toBytes(
-        Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowName));
-    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
-    return Separator.QUALIFIERS.join(first, second, new byte[0]);
+    return ApplicationRowKeyConverter.getInstance().encode(
+        new ApplicationRowKey(clusterId, userId, flowName, flowRunId, null));
   }
 
   /**
@@ -107,14 +101,8 @@ public class ApplicationRowKey {
    */
   public static byte[] getRowKey(String clusterId, String userId,
       String flowName, Long flowRunId, String appId) {
-    byte[] first =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId,
-            flowName));
-    // Note that flowRunId is a long, so we can't encode them all at the same
-    // time.
-    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
-    byte[] third = TimelineStorageUtils.encodeAppId(appId);
-    return Separator.QUALIFIERS.join(first, second, third);
+    return ApplicationRowKeyConverter.getInstance().encode(
+        new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId));
   }
 
   /**
@@ -124,22 +112,6 @@ public class ApplicationRowKey {
    * @return An <cite>ApplicationRowKey</cite> object.
    */
   public static ApplicationRowKey parseRowKey(byte[] rowKey) {
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
-
-    if (rowKeyComponents.length < 5) {
-      throw new IllegalArgumentException("the row key is not valid for " +
-          "an application");
-    }
-
-    String clusterId =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
-    String userId =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
-    String flowName =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
-    long flowRunId =
-        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
-    String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
-    return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, 
appId);
+    return ApplicationRowKeyConverter.getInstance().decode(rowKey);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java
new file mode 100644
index 0000000..3b054a5
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+
+/**
+ * Encodes and decodes row key for application table.
+ * The row key is of the form : clusterId!userName!flowName!flowRunId!appId.
+ * flowRunId is a long, appId is encoded and decoded using
+ * {@link AppIdKeyConverter} and rest are strings.
+ */
+public final class ApplicationRowKeyConverter implements
+    KeyConverter<ApplicationRowKey> {
+  private static final ApplicationRowKeyConverter INSTANCE =
+      new ApplicationRowKeyConverter();
+
+  public static ApplicationRowKeyConverter getInstance() {
+    return INSTANCE;
+  }
+
+  private ApplicationRowKeyConverter() {
+  }
+
+  // Application row key is of the form
+  // clusterId!userName!flowName!flowRunId!appId with each segment separated
+  // by !. The sizes below indicate sizes of each one of these segements in
+  // sequence. clusterId, userName and flowName are strings. flowrunId is a 
long
+  // hence 8 bytes in size. app id is represented as 12 bytes with cluster
+  // timestamp part of appid being 8 bytes(long) and seq id being 4 bytes(int).
+  // Strings are variable in size (i.e. end whenever separator is encountered).
+  // This is used while decoding and helps in determining where to split.
+  private static final int[] SEGMENT_SIZES = {
+      Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, 
Separator.VARIABLE_SIZE,
+      Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize() };
+
+  /*
+   * (non-Javadoc)
+   *
+   * Encodes ApplicationRowKey object into a byte array with each
+   * component/field in ApplicationRowKey separated by Separator#QUALIFIERS.
+   * This leads to an application table row key of the form
+   * clusterId!userName!flowName!flowRunId!appId
+   * If flowRunId in passed ApplicationRowKey object is null (and the fields
+   * preceding it i.e. clusterId, userId and flowName are not null), this
+   * returns a row key prefix of the form clusterId!userName!flowName! and if
+   * appId in ApplicationRowKey is null (other 4 components are not null), this
+   * returns a row key prefix of the form 
clusterId!userName!flowName!flowRunId!
+   * flowRunId is inverted while encoding as it helps maintain a descending
+   * order for row keys in application table.
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #encode(java.lang.Object)
+   */
+  @Override
+  public byte[] encode(ApplicationRowKey rowKey) {
+    byte[] cluster = Separator.encode(rowKey.getClusterId(),
+        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
+    byte[] user = Separator.encode(rowKey.getUserId(),
+        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
+    byte[] flow = Separator.encode(rowKey.getFlowName(),
+        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
+    byte[] first = Separator.QUALIFIERS.join(cluster, user, flow);
+    // Note that flowRunId is a long, so we can't encode them all at the same
+    // time.
+    if (rowKey.getFlowRunId() == null) {
+      return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
+    }
+    byte[] second = Bytes.toBytes(
+        TimelineStorageUtils.invertLong(rowKey.getFlowRunId()));
+    if (rowKey.getAppId() == null || rowKey.getAppId().isEmpty()) {
+      return Separator.QUALIFIERS.join(first, second, Separator.EMPTY_BYTES);
+    }
+    byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId());
+    return Separator.QUALIFIERS.join(first, second, third);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Decodes an application row key of the form
+   * clusterId!userName!flowName!flowRunId!appId represented in byte format and
+   * converts it into an ApplicationRowKey object.flowRunId is inverted while
+   * decoding as it was inverted while encoding.
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #decode(byte[])
+   */
+  @Override
+  public ApplicationRowKey decode(byte[] rowKey) {
+    byte[][] rowKeyComponents =
+        Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+    if (rowKeyComponents.length != 5) {
+      throw new IllegalArgumentException("the row key is not valid for " +
+          "an application");
+    }
+    String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    String userId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    Long flowRunId =
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
+    String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]);
+    return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, 
appId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
index 3085bb1..6a38e32 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
@@ -17,10 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
 
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
 /**
  * Represents a rowkey for the app_flow table.
  */
@@ -50,9 +46,8 @@ public class AppToFlowRowKey {
    * @return byte array with the row key
    */
   public static byte[] getRowKey(String clusterId, String appId) {
-    byte[] first = Bytes.toBytes(clusterId);
-    byte[] second = TimelineStorageUtils.encodeAppId(appId);
-    return Separator.QUALIFIERS.join(first, second);
+    return AppToFlowRowKeyConverter.getInstance().encode(
+        new AppToFlowRowKey(clusterId, appId));
   }
 
   /**
@@ -62,15 +57,6 @@ public class AppToFlowRowKey {
    * @return an <cite>AppToFlowRowKey</cite> object.
    */
   public static AppToFlowRowKey parseRowKey(byte[] rowKey) {
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
-
-    if (rowKeyComponents.length < 2) {
-      throw new IllegalArgumentException("the row key is not valid for " +
-          "the app-to-flow table");
-    }
-
-    String clusterId = Bytes.toString(rowKeyComponents[0]);
-    String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[1]);
-    return new AppToFlowRowKey(clusterId, appId);
+    return AppToFlowRowKeyConverter.getInstance().decode(rowKey);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java
new file mode 100644
index 0000000..0f0b879d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Encodes and decodes row key for app_flow table.
+ * The row key is of the form : clusterId!appId.
+ * clusterId is a string and appId is encoded/decoded using
+ * {@link AppIdKeyConverter}.
+ */
+public final class AppToFlowRowKeyConverter
+    implements KeyConverter<AppToFlowRowKey> {
+  private static final AppToFlowRowKeyConverter INSTANCE =
+      new AppToFlowRowKeyConverter();
+
+  public static AppToFlowRowKeyConverter getInstance() {
+    return INSTANCE;
+  }
+
+  private AppToFlowRowKeyConverter() {
+  }
+
+  // App to flow row key is of the form clusterId!appId with the 2 segments
+  // separated by !. The sizes below indicate sizes of both of these segments
+  // in sequence. clusterId is a string. appId is represented as 12 bytes with
+  // cluster Timestamp part of appid being 8 bytes(long) and seq id being 4
+  // bytes(int).
+  // Strings are variable in size (i.e. end whenever separator is encountered).
+  // This is used while decoding and helps in determining where to split.
+  private static final int[] SEGMENT_SIZES = {
+      Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT };
+
+  /*
+   * (non-Javadoc)
+   *
+   * Encodes AppToFlowRowKey object into a byte array with each component/field
+   * in AppToFlowRowKey separated by Separator#QUALIFIERS. This leads to an
+   * app to flow table row key of the form clusterId!appId
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #encode(java.lang.Object)
+   */
+  @Override
+  public byte[] encode(AppToFlowRowKey rowKey) {
+    byte[] first = Separator.encode(rowKey.getClusterId(),
+        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
+    byte[] second = AppIdKeyConverter.getInstance().encode(rowKey.getAppId());
+    return Separator.QUALIFIERS.join(first, second);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Decodes an app to flow row key of the form clusterId!appId represented in
+   * byte format and converts it into an AppToFlowRowKey object.
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #decode(byte[])
+   */
+  @Override
+  public AppToFlowRowKey decode(byte[] rowKey) {
+    byte[][] rowKeyComponents =
+        Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+    if (rowKeyComponents.length != 2) {
+      throw new IllegalArgumentException("the row key is not valid for " +
+          "the app-to-flow table");
+    }
+    String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[1]);
+    return new AppToFlowRowKey(clusterId, appId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
new file mode 100644
index 0000000..a173b0f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/**
+ * Encodes and decodes {@link ApplicationId} for row keys.
+ * App ID is stored in row key as 12 bytes, cluster timestamp section of app id
+ * (long - 8 bytes) followed by sequence id section of app id (int - 4 bytes).
+ */
+public final class AppIdKeyConverter implements KeyConverter<String> {
+  private static final AppIdKeyConverter INSTANCE = new AppIdKeyConverter();
+
+  public static AppIdKeyConverter getInstance() {
+    return INSTANCE;
+  }
+
+  private AppIdKeyConverter() {
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Converts/encodes a string app Id into a byte representation for (row) 
keys.
+   * For conversion, we extract cluster timestamp and sequence id from the
+   * string app id (calls ConverterUtils#toApplicationId(String) for
+   * conversion) and then store it in a byte array of length 12 (8 bytes (long)
+   * for cluster timestamp followed 4 bytes(int) for sequence id). Both cluster
+   * timestamp and sequence id are inverted so that the most recent cluster
+   * timestamp and highest sequence id appears first in the table (i.e.
+   * application id appears in a descending order).
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #encode(java.lang.Object)
+   */
+  @Override
+  public byte[] encode(String appIdStr) {
+    ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
+    byte[] appIdBytes = new byte[getKeySize()];
+    byte[] clusterTs = Bytes.toBytes(
+        TimelineStorageUtils.invertLong(appId.getClusterTimestamp()));
+    System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
+    byte[] seqId = 
Bytes.toBytes(TimelineStorageUtils.invertInt(appId.getId()));
+    System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, 
Bytes.SIZEOF_INT);
+    return appIdBytes;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Converts/decodes a 12 byte representation of app id for (row) keys to an
+   * app id in string format which can be returned back to client.
+   * For decoding, 12 bytes are interpreted as 8 bytes of inverted cluster
+   * timestamp(long) followed by 4 bytes of inverted sequence id(int). Calls
+   * ApplicationId#toString to generate string representation of app id.
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #decode(byte[])
+   */
+  @Override
+  public String decode(byte[] appIdBytes) {
+    if (appIdBytes.length != getKeySize()) {
+      throw new IllegalArgumentException("Invalid app id in byte format");
+    }
+    long clusterTs = TimelineStorageUtils.invertLong(
+        Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
+    int seqId = TimelineStorageUtils.invertInt(
+        Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
+    return ApplicationId.newInstance(clusterTs, seqId).toString();
+  }
+
+  /**
+   * Returns the size of app id after encoding.
+   *
+   * @return size of app id after encoding.
+   */
+  public static int getKeySize() {
+    return Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
index 759bf27..be55db5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
@@ -166,19 +166,22 @@ public class ColumnHelper<T> {
    * @param result from which to reads data with timestamps
    * @param columnPrefixBytes optional prefix to limit columns. If null all
    *          columns are returned.
+   * @param <K> identifies the type of column name(indicated by type of key
+   *     converter).
    * @param <V> the type of the values. The values will be cast into that type.
+   * @param keyConverter used to convert column bytes to the appropriate key
+   *     type.
    * @return the cell values at each respective time in for form
    *         {@literal {idA={timestamp1->value1}, idA={timestamp2->value2},
    *         idB={timestamp3->value3}, idC={timestamp1->value4}}}
    * @throws IOException if any problem occurs while reading results.
    */
   @SuppressWarnings("unchecked")
-  public <V> NavigableMap<String, NavigableMap<Long, V>>
-      readResultsWithTimestamps(Result result, byte[] columnPrefixBytes)
-          throws IOException {
+  public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result, byte[] columnPrefixBytes,
+          KeyConverter<K> keyConverter) throws IOException {
 
-    NavigableMap<String, NavigableMap<Long, V>> results =
-        new TreeMap<String, NavigableMap<Long, V>>();
+    NavigableMap<K, NavigableMap<Long, V>> results = new TreeMap<>();
 
     if (result != null) {
       NavigableMap<
@@ -192,13 +195,17 @@ public class ColumnHelper<T> {
       if (columnCellMap != null) {
         for (Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap
             .entrySet()) {
-          String columnName = null;
+          K converterColumnKey = null;
           if (columnPrefixBytes == null) {
             if (LOG.isDebugEnabled()) {
               LOG.debug("null prefix was specified; returning all columns");
             }
-            // Decode the spaces we encoded in the column name.
-            columnName = Separator.decode(entry.getKey(), Separator.SPACE);
+            try {
+              converterColumnKey = keyConverter.decode(entry.getKey());
+            } catch (IllegalArgumentException iae) {
+              LOG.error("Illegal column found, skipping this column.", iae);
+              continue;
+            }
           } else {
             // A non-null prefix means columns are actually of the form
             // prefix!columnNameRemainder
@@ -207,13 +214,18 @@ public class ColumnHelper<T> {
             byte[] actualColumnPrefixBytes = columnNameParts[0];
             if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
                 && columnNameParts.length == 2) {
-              // This is the prefix that we want
-              columnName = Separator.decode(columnNameParts[1]);
+              try {
+                // This is the prefix that we want
+                converterColumnKey = keyConverter.decode(columnNameParts[1]);
+              } catch (IllegalArgumentException iae) {
+                LOG.error("Illegal column found, skipping this column.", iae);
+                continue;
+              }
             }
           }
 
           // If this column has the prefix we want
-          if (columnName != null) {
+          if (converterColumnKey != null) {
             NavigableMap<Long, V> cellResults =
                 new TreeMap<Long, V>();
             NavigableMap<Long, byte[]> cells = entry.getValue();
@@ -226,7 +238,7 @@ public class ColumnHelper<T> {
                     value);
               }
             }
-            results.put(columnName, cellResults);
+            results.put(converterColumnKey, cellResults);
           }
         } // for entry : columnCellMap
       } // if columnCellMap != null
@@ -235,20 +247,24 @@ public class ColumnHelper<T> {
   }
 
   /**
+   * @param <K> identifies the type of column name(indicated by type of key
+   *     converter).
    * @param result from which to read columns
    * @param columnPrefixBytes optional prefix to limit columns. If null all
    *        columns are returned.
-   * @return the latest values of columns in the column family. This assumes
-   *         that the column name parts are all Strings by default. If the
-   *         column name parts should be treated natively and not be converted
-   *         back and forth from Strings, you should use
-   *         {@link #readResultsHavingCompoundColumnQualifiers(Result, byte[])}
-   *         instead.
+   * @param keyConverter used to convert column bytes to the appropriate key
+   *          type.
+   * @return the latest values of columns in the column family. If the column
+   *         prefix is null, the column qualifier is returned as Strings. For a
+   *         non-null column prefix bytes, the column qualifier is returned as
+   *         a list of parts, each part a byte[]. This is to facilitate
+   *         returning byte arrays of values that were not Strings.
    * @throws IOException if any problem occurs while reading results.
    */
-  public Map<String, Object> readResults(Result result,
-      byte[] columnPrefixBytes) throws IOException {
-    Map<String, Object> results = new HashMap<String, Object>();
+  public <K> Map<K, Object> readResults(Result result,
+      byte[] columnPrefixBytes, KeyConverter<K> keyConverter)
+      throws IOException {
+    Map<K, Object> results = new HashMap<K, Object>();
 
     if (result != null) {
       Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
@@ -256,84 +272,40 @@ public class ColumnHelper<T> {
         byte[] columnKey = entry.getKey();
         if (columnKey != null && columnKey.length > 0) {
 
-          String columnName = null;
+          K converterColumnKey = null;
           if (columnPrefixBytes == null) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("null prefix was specified; returning all columns");
+            try {
+              converterColumnKey = keyConverter.decode(columnKey);
+            } catch (IllegalArgumentException iae) {
+              LOG.error("Illegal column found, skipping this column.", iae);
+              continue;
             }
-            // Decode the spaces we encoded in the column name.
-            columnName = Separator.decode(columnKey, Separator.SPACE);
           } else {
             // A non-null prefix means columns are actually of the form
             // prefix!columnNameRemainder
-            byte[][] columnNameParts =
-                Separator.QUALIFIERS.split(columnKey, 2);
-            byte[] actualColumnPrefixBytes = columnNameParts[0];
-            if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
-                && columnNameParts.length == 2) {
-              // This is the prefix that we want
-              // if the column name is a compound qualifier
-              // with non string datatypes, the following decode will not
-              // work correctly since it considers all components to be String
-              // invoke the readResultsHavingCompoundColumnQualifiers function
-              columnName = Separator.decode(columnNameParts[1]);
+            byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 
2);
+            if (columnNameParts.length > 0) {
+              byte[] actualColumnPrefixBytes = columnNameParts[0];
+              // If this is the prefix that we want
+              if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+                  && columnNameParts.length == 2) {
+                try {
+                  converterColumnKey = keyConverter.decode(columnNameParts[1]);
+                } catch (IllegalArgumentException iae) {
+                  LOG.error("Illegal column found, skipping this column.", 
iae);
+                  continue;
+                }
+              }
             }
-          }
+          } // if-else
 
-          // If this column has the prefix we want
-          if (columnName != null) {
+          // If the columnPrefix is null (we want all columns), or the actual
+          // prefix matches the given prefix we want this column
+          if (converterColumnKey != null) {
             Object value = converter.decodeValue(entry.getValue());
-            results.put(columnName, value);
-          }
-        }
-      } // for entry
-    }
-    return results;
-  }
-
-  /**
-   * @param result from which to read columns
-   * @param columnPrefixBytes optional prefix to limit columns. If null all
-   *        columns are returned.
-   * @return the latest values of columns in the column family. If the column
-   *         prefix is null, the column qualifier is returned as Strings. For a
-   *         non-null column prefix bytes, the column qualifier is returned as
-   *         a list of parts, each part a byte[]. This is to facilitate
-   *         returning byte arrays of values that were not Strings.
-   * @throws IOException if any problem occurs while reading results.
-   */
-  public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result 
result,
-      byte[] columnPrefixBytes) throws IOException {
-    // handle the case where the column prefix is null
-    // it is the same as readResults() so simply delegate to that 
implementation
-    if (columnPrefixBytes == null) {
-      return readResults(result, null);
-    }
-
-    Map<byte[][], Object> results = new HashMap<byte[][], Object>();
-
-    if (result != null) {
-      Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
-      for (Entry<byte[], byte[]> entry : columns.entrySet()) {
-        byte[] columnKey = entry.getKey();
-        if (columnKey != null && columnKey.length > 0) {
-          // A non-null prefix means columns are actually of the form
-          // prefix!columnNameRemainder
-          // with a compound column qualifier, we are presuming existence of a
-          // prefix
-          byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 2);
-          if (columnNameParts.length > 0) {
-            byte[] actualColumnPrefixBytes = columnNameParts[0];
-            if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
-                && columnNameParts.length == 2) {
-              // This is the prefix that we want
-              byte[][] columnQualifierParts =
-                  Separator.VALUES.split(columnNameParts[1]);
-              Object value = converter.decodeValue(entry.getValue());
-              // we return the columnQualifier in parts since we don't know
-              // which part is of which data type
-              results.put(columnQualifierParts, value);
-            }
+            // we return the columnQualifier in parts since we don't know
+            // which part is of which data type.
+            results.put(converterColumnKey, value);
           }
         }
       } // for entry
@@ -353,8 +325,9 @@ public class ColumnHelper<T> {
   public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
       String qualifier) {
 
-    // We don't want column names to have spaces
-    byte[] encodedQualifier = Bytes.toBytes(Separator.SPACE.encode(qualifier));
+    // We don't want column names to have spaces / tabs.
+    byte[] encodedQualifier =
+        Separator.encode(qualifier, Separator.SPACE, Separator.TAB);
     if (columnPrefixBytes == null) {
       return encodedQualifier;
     }
@@ -367,22 +340,6 @@ public class ColumnHelper<T> {
   }
 
   /**
-   * Create a compound column qualifier by combining qualifier and components.
-   *
-   * @param qualifier Column QUalifier.
-   * @param components Other components.
-   * @return a byte array representing compound column qualifier.
-   */
-  public static byte[] getCompoundColumnQualifierBytes(String qualifier,
-      byte[]...components) {
-    byte[] colQualBytes = Bytes.toBytes(Separator.VALUES.encode(qualifier));
-    for (int i = 0; i < components.length; i++) {
-      colQualBytes = Separator.VALUES.join(colQualBytes, components[i]);
-    }
-    return colQualBytes;
-  }
-
-  /**
    * @param columnPrefixBytes The byte representation for the column prefix.
    *          Should not contain {@link Separator#QUALIFIERS}.
    * @param qualifier for the remainder of the column.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
index e4b7f16..89aa013 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -91,37 +91,33 @@ public interface ColumnPrefix<T> {
   Object readResult(Result result, String qualifier) throws IOException;
 
   /**
-   * @param result from which to read columns
+   *
+   * @param <K> identifies the type of key converter.
+   * @param result from which to read columns.
+   * @param keyConverter used to convert column bytes to the appropriate key
+   *          type
    * @return the latest values of columns in the column family with this prefix
    *         (or all of them if the prefix value is null).
    * @throws IOException if there is any exception encountered while reading
-   *     results.
+   *           results.
    */
-  Map<String, Object> readResults(Result result) throws IOException;
+  <K> Map<K, Object> readResults(Result result, KeyConverter<K> keyConverter)
+      throws IOException;
 
   /**
-   * @param result from which to reads data with timestamps
+   * @param result from which to reads data with timestamps.
+   * @param <K> identifies the type of key converter.
    * @param <V> the type of the values. The values will be cast into that type.
+   * @param keyConverter used to convert column bytes to the appropriate key
+   *     type.
    * @return the cell values at each respective time in for form
    *         {@literal {idA={timestamp1->value1}, idA={timestamp2->value2},
    *         idB={timestamp3->value3}, idC={timestamp1->value4}}}
    * @throws IOException if there is any exception encountered while reading
    *     result.
    */
-  <V> NavigableMap<String, NavigableMap<Long, V>>
-      readResultsWithTimestamps(Result result) throws IOException;
-
-  /**
-   * @param result from which to read columns
-   * @return the latest values of columns in the column family. The column
-   *         qualifier is returned as a list of parts, each part a byte[]. This
-   *         is to facilitate returning byte arrays of values that were not
-   *         Strings. If they can be treated as Strings, you should use
-   *         {@link #readResults(Result)} instead.
-   * @throws IOException if any problem occurs while reading results.
-   */
-  Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
-      throws IOException;
+  <K, V> NavigableMap<K, NavigableMap<Long, V>> readResultsWithTimestamps(
+      Result result, KeyConverter<K> keyConverter) throws IOException;
 
   /**
    * @param qualifierPrefix Column qualifier or prefix of qualifier.
@@ -146,15 +142,4 @@ public interface ColumnPrefix<T> {
    * @return a {@link ValueConverter} implementation.
    */
   ValueConverter getValueConverter();
-
-  /**
-   * Get compound column qualifier bytes if the column qualifier is a compound
-   * qualifier. Returns the qualifier passed as bytes if the column is not a
-   * compound column qualifier.
-   *
-   * @param qualifier Column Qualifier.
-   * @param components Other components.
-   * @return byte array representing compound column qualifier.
-   */
-  byte[] getCompoundColQualBytes(String qualifier, byte[]...components);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
new file mode 100644
index 0000000..6018f86
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * Encapsulates information about Event column names for application and entity
+ * tables. Used while encoding/decoding event column names.
+ */
+public class EventColumnName {
+
+  private final String id;
+  private final Long timestamp;
+  private final String infoKey;
+
+  public EventColumnName(String id, Long timestamp, String infoKey) {
+    this.id = id;
+    this.timestamp = timestamp;
+    this.infoKey = infoKey;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  public String getInfoKey() {
+    return infoKey;
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to