http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
index 8e806bc..aa2bfda 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
@@ -46,8 +46,11 @@ import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 import com.google.common.base.Preconditions;
@@ -150,13 +153,13 @@ class ApplicationEntityReader extends GenericEntityReader 
{
     EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
     // If INFO field has to be retrieved, add a filter for fetching columns
     // with INFO column prefix.
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
+    if (hasField(fieldsToRetrieve, Field.INFO)) {
       infoFamilyColsFilter.addFilter(
           TimelineFilterUtils.createHBaseQualifierFilter(
               CompareOp.EQUAL, ApplicationColumnPrefix.INFO));
     }
     TimelineFilterList relatesTo = getFilters().getRelatesTo();
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+    if (hasField(fieldsToRetrieve, Field.RELATES_TO)) {
       // If RELATES_TO field has to be retrieved, add a filter for fetching
       // columns with RELATES_TO column prefix.
       infoFamilyColsFilter.addFilter(
@@ -169,12 +172,11 @@ class ApplicationEntityReader extends GenericEntityReader 
{
       // matched after fetching rows from HBase.
       Set<String> relatesToCols =
           TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createFiltersFromColumnQualifiers(
-              ApplicationColumnPrefix.RELATES_TO, relatesToCols));
+      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+          ApplicationColumnPrefix.RELATES_TO, relatesToCols));
     }
     TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
       // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
       // columns with IS_RELATED_TO column prefix.
       infoFamilyColsFilter.addFilter(
@@ -187,12 +189,11 @@ class ApplicationEntityReader extends GenericEntityReader 
{
       // matched after fetching rows from HBase.
       Set<String> isRelatedToCols =
           TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createFiltersFromColumnQualifiers(
-              ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols));
+      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+          ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols));
     }
     TimelineFilterList eventFilters = getFilters().getEventFilters();
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+    if (hasField(fieldsToRetrieve, Field.EVENTS)) {
       // If EVENTS field has to be retrieved, add a filter for fetching columns
       // with EVENT column prefix.
       infoFamilyColsFilter.addFilter(
@@ -205,9 +206,8 @@ class ApplicationEntityReader extends GenericEntityReader {
       // fetching rows from HBase.
       Set<String> eventCols =
           TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createFiltersFromColumnQualifiers(
-              ApplicationColumnPrefix.EVENT, eventCols));
+      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+          ApplicationColumnPrefix.EVENT, eventCols));
     }
     return infoFamilyColsFilter;
   }
@@ -222,25 +222,25 @@ class ApplicationEntityReader extends GenericEntityReader 
{
   private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
     EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
     // Events not required.
-    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+    if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
       infoColFamilyList.addFilter(
           TimelineFilterUtils.createHBaseQualifierFilter(
               CompareOp.NOT_EQUAL, ApplicationColumnPrefix.EVENT));
     }
     // info not required.
-    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
+    if (!hasField(fieldsToRetrieve, Field.INFO)) {
       infoColFamilyList.addFilter(
           TimelineFilterUtils.createHBaseQualifierFilter(
               CompareOp.NOT_EQUAL, ApplicationColumnPrefix.INFO));
     }
     // is related to not required.
-    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) 
{
+    if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
       infoColFamilyList.addFilter(
           TimelineFilterUtils.createHBaseQualifierFilter(
               CompareOp.NOT_EQUAL, ApplicationColumnPrefix.IS_RELATED_TO));
     }
     // relates to not required.
-    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+    if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
       infoColFamilyList.addFilter(
           TimelineFilterUtils.createHBaseQualifierFilter(
               CompareOp.NOT_EQUAL, ApplicationColumnPrefix.RELATES_TO));
@@ -308,9 +308,10 @@ class ApplicationEntityReader extends GenericEntityReader {
   protected Result getResult(Configuration hbaseConf, Connection conn,
       FilterList filterList) throws IOException {
     TimelineReaderContext context = getContext();
-    byte[] rowKey =
-        ApplicationRowKey.getRowKey(context.getClusterId(), 
context.getUserId(),
+    ApplicationRowKey applicationRowKey =
+        new ApplicationRowKey(context.getClusterId(), context.getUserId(),
             context.getFlowName(), context.getFlowRunId(), context.getAppId());
+    byte[] rowKey = applicationRowKey.getRowKey();
     Get get = new Get(rowKey);
     get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
     if (filterList != null && !filterList.getFilters().isEmpty()) {
@@ -345,10 +346,13 @@ class ApplicationEntityReader extends GenericEntityReader 
{
     TimelineReaderContext context = getContext();
     if (isSingleEntityRead()) {
       // Get flow context information from AppToFlow table.
-      if (context.getFlowName() == null || context.getFlowRunId() == null ||
-          context.getUserId() == null) {
-        FlowContext flowContext = lookupFlowContext(
-            context.getClusterId(), context.getAppId(), hbaseConf, conn);
+      if (context.getFlowName() == null || context.getFlowRunId() == null
+          || context.getUserId() == null) {
+        AppToFlowRowKey appToFlowRowKey =
+            new AppToFlowRowKey(context.getClusterId(), context.getAppId());
+        FlowContext flowContext =
+            lookupFlowContext(appToFlowRowKey,
+                hbaseConf, conn);
         context.setFlowName(flowContext.getFlowName());
         context.setFlowRunId(flowContext.getFlowRunId());
         context.setUserId(flowContext.getUserId());
@@ -367,15 +371,13 @@ class ApplicationEntityReader extends GenericEntityReader 
{
       Connection conn, FilterList filterList) throws IOException {
     Scan scan = new Scan();
     TimelineReaderContext context = getContext();
-    if (context.getFlowRunId() != null) {
-      scan.setRowPrefixFilter(ApplicationRowKey.
-          getRowKeyPrefix(context.getClusterId(), context.getUserId(),
-              context.getFlowName(), context.getFlowRunId()));
-    } else {
-      scan.setRowPrefixFilter(ApplicationRowKey.
-          getRowKeyPrefix(context.getClusterId(), context.getUserId(),
-              context.getFlowName()));
-    }
+    // Whether or not flowRunID is null doesn't matter, the
+    // ApplicationRowKeyPrefix will do the right thing.
+    RowKeyPrefix<ApplicationRowKey> applicationRowKeyPrefix =
+        new ApplicationRowKeyPrefix(context.getClusterId(),
+            context.getUserId(), context.getFlowName(),
+            context.getFlowRunId());
+    scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix());
     FilterList newList = new FilterList();
     newList.addFilter(new PageFilter(getFilters().getLimit()));
     if (filterList != null && !filterList.getFilters().isEmpty()) {
@@ -409,15 +411,14 @@ class ApplicationEntityReader extends GenericEntityReader 
{
     boolean checkIsRelatedTo =
         !isSingleEntityRead() && filters.getIsRelatedTo() != null &&
         filters.getIsRelatedTo().getFilterList().size() > 0;
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) ||
-        checkIsRelatedTo) {
-      TimelineStorageUtils.readRelationship(
-          entity, result, ApplicationColumnPrefix.IS_RELATED_TO, true);
+    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) {
+      readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
+          true);
       if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
           filters.getIsRelatedTo())) {
         return null;
       }
-      if (!TimelineStorageUtils.hasField(fieldsToRetrieve,
+      if (!hasField(fieldsToRetrieve,
           Field.IS_RELATED_TO)) {
         entity.getIsRelatedToEntities().clear();
       }
@@ -430,29 +431,27 @@ class ApplicationEntityReader extends GenericEntityReader 
{
     boolean checkRelatesTo =
         !isSingleEntityRead() && filters.getRelatesTo() != null &&
         filters.getRelatesTo().getFilterList().size() > 0;
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) ||
+    if (hasField(fieldsToRetrieve, Field.RELATES_TO) ||
         checkRelatesTo) {
-      TimelineStorageUtils.readRelationship(
-          entity, result, ApplicationColumnPrefix.RELATES_TO, false);
+      readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
+          false);
       if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
           filters.getRelatesTo())) {
         return null;
       }
-      if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+      if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
         entity.getRelatesToEntities().clear();
       }
     }
 
     // fetch info if fieldsToRetrieve contains INFO or ALL.
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
-      TimelineStorageUtils.readKeyValuePairs(
-          entity, result, ApplicationColumnPrefix.INFO, false);
+    if (hasField(fieldsToRetrieve, Field.INFO)) {
+      readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
     }
 
     // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) {
-      TimelineStorageUtils.readKeyValuePairs(
-          entity, result, ApplicationColumnPrefix.CONFIG, true);
+    if (hasField(fieldsToRetrieve, Field.CONFIGS)) {
+      readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
     }
 
     // fetch events and match event filters if they exist. If event filters do
@@ -462,21 +461,19 @@ class ApplicationEntityReader extends GenericEntityReader 
{
     boolean checkEvents =
         !isSingleEntityRead() && filters.getEventFilters() != null &&
         filters.getEventFilters().getFilterList().size() > 0;
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) ||
-        checkEvents) {
-      TimelineStorageUtils.readEvents(
-          entity, result, ApplicationColumnPrefix.EVENT);
+    if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) {
+      readEvents(entity, result, ApplicationColumnPrefix.EVENT);
       if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
           filters.getEventFilters())) {
         return null;
       }
-      if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+      if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
         entity.getEvents().clear();
       }
     }
 
     // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) {
+    if (hasField(fieldsToRetrieve, Field.METRICS)) {
       readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
     }
     return entity;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
index faecd14..9ba5e38 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
@@ -35,9 +35,11 @@ import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrie
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
 
 import com.google.common.base.Preconditions;
@@ -50,6 +52,12 @@ class FlowActivityEntityReader extends TimelineEntityReader {
   private static final FlowActivityTable FLOW_ACTIVITY_TABLE =
       new FlowActivityTable();
 
+  /**
+   * Used to convert Long key components to and from storage format.
+   */
+  private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
+
+
   public FlowActivityEntityReader(TimelineReaderContext ctxt,
       TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
     super(ctxt, entityFilters, toRetrieve, true);
@@ -105,15 +113,14 @@ class FlowActivityEntityReader extends 
TimelineEntityReader {
     if (getFilters().getCreatedTimeBegin() == 0L &&
         getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) {
        // All records have to be chosen.
-      scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
+      scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId)
+          .getRowKeyPrefix());
     } else {
-      scan.setStartRow(
-          FlowActivityRowKey.getRowKeyPrefix(clusterId,
-              getFilters().getCreatedTimeEnd()));
-      scan.setStopRow(
-          FlowActivityRowKey.getRowKeyPrefix(clusterId,
-              (getFilters().getCreatedTimeBegin() <= 0 ? 0 :
-              (getFilters().getCreatedTimeBegin() - 1))));
+      scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters()
+          .getCreatedTimeEnd()).getRowKeyPrefix());
+      scan.setStopRow(new FlowActivityRowKeyPrefix(clusterId, (getFilters()
+          .getCreatedTimeBegin() <= 0 ? 0
+          : (getFilters().getCreatedTimeBegin() - 1))).getRowKeyPrefix());
     }
     // use the page filter to limit the result to the page size
     // the scanner may still return more than the limit; therefore we need to
@@ -137,8 +144,7 @@ class FlowActivityEntityReader extends TimelineEntityReader 
{
     // get the list of run ids along with the version that are associated with
     // this flow on this day
     Map<Long, Object> runIdsMap =
-        FlowActivityColumnPrefix.RUN_ID.readResults(result,
-            LongKeyConverter.getInstance());
+        FlowActivityColumnPrefix.RUN_ID.readResults(result, longKeyConverter);
     for (Map.Entry<Long, Object> e : runIdsMap.entrySet()) {
       Long runId = e.getKey();
       String version = (String)e.getValue();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
index e1695ef..986a28f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
@@ -28,12 +28,12 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.FamilyFilter;
 import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
 import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
@@ -43,11 +43,12 @@ import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 
@@ -81,8 +82,8 @@ class FlowRunEntityReader extends TimelineEntityReader {
   @Override
   protected void validateParams() {
     Preconditions.checkNotNull(getContext(), "context shouldn't be null");
-    Preconditions.checkNotNull(
-        getDataToRetrieve(), "data to retrieve shouldn't be null");
+    Preconditions.checkNotNull(getDataToRetrieve(),
+        "data to retrieve shouldn't be null");
     Preconditions.checkNotNull(getContext().getClusterId(),
         "clusterId shouldn't be null");
     Preconditions.checkNotNull(getContext().getUserId(),
@@ -97,8 +98,8 @@ class FlowRunEntityReader extends TimelineEntityReader {
     if (!isSingleEntityRead() && fieldsToRetrieve != null) {
       for (Field field : fieldsToRetrieve) {
         if (field != Field.ALL && field != Field.METRICS) {
-          throw new BadRequestException("Invalid field " + field +
-              " specified while querying flow runs.");
+          throw new BadRequestException("Invalid field " + field
+              + " specified while querying flow runs.");
         }
       }
     }
@@ -119,23 +120,22 @@ class FlowRunEntityReader extends TimelineEntityReader {
     Long createdTimeBegin = getFilters().getCreatedTimeBegin();
     Long createdTimeEnd = getFilters().getCreatedTimeEnd();
     if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createSingleColValueFiltersByRange(
-          FlowRunColumn.MIN_START_TIME, createdTimeBegin, createdTimeEnd));
+      listBasedOnFilters.addFilter(TimelineFilterUtils
+          .createSingleColValueFiltersByRange(FlowRunColumn.MIN_START_TIME,
+              createdTimeBegin, createdTimeEnd));
     }
     // Filter based on metric filters.
     TimelineFilterList metricFilters = getFilters().getMetricFilters();
     if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createHBaseFilterList(
-              FlowRunColumnPrefix.METRIC, metricFilters));
+      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
+          FlowRunColumnPrefix.METRIC, metricFilters));
     }
     return listBasedOnFilters;
   }
 
   /**
-   * Add {@link QualifierFilter} filters to filter list for each column of
-   * flow run table.
+   * Add {@link QualifierFilter} filters to filter list for each column of flow
+   * run table.
    *
    * @return filter list to which qualifier filters have been added.
    */
@@ -153,20 +153,19 @@ class FlowRunEntityReader extends TimelineEntityReader {
     FilterList list = new FilterList(Operator.MUST_PASS_ONE);
     // By default fetch everything in INFO column family.
     FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL,
-           new BinaryComparator(FlowRunColumnFamily.INFO.getBytes()));
+        new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(
+            FlowRunColumnFamily.INFO.getBytes()));
     TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
     // If multiple entities have to be retrieved, check if metrics have to be
     // retrieved and if not, add a filter so that metrics can be excluded.
     // Metrics are always returned if we are reading a single entity.
-    if (!isSingleEntityRead() && !TimelineStorageUtils.hasField(
-        dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) {
+    if (!isSingleEntityRead()
+        && !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) {
       FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
       infoColFamilyList.addFilter(infoColumnFamily);
-      infoColFamilyList.addFilter(
-          new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(
-              FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""))));
+      infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL,
+          new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC
+              .getColumnPrefixBytes(""))));
       list.addFilter(infoColFamilyList);
     } else {
       // Check if metricsToRetrieve are specified and if they are, create a
@@ -176,14 +175,13 @@ class FlowRunEntityReader extends TimelineEntityReader {
       // (in augmentParams()).
       TimelineFilterList metricsToRetrieve =
           dataToRetrieve.getMetricsToRetrieve();
-      if (metricsToRetrieve != null &&
-          !metricsToRetrieve.getFilterList().isEmpty()) {
+      if (metricsToRetrieve != null
+          && !metricsToRetrieve.getFilterList().isEmpty()) {
         FilterList infoColFamilyList = new FilterList();
         infoColFamilyList.addFilter(infoColumnFamily);
         FilterList columnsList = updateFixedColumns();
-        columnsList.addFilter(
-            TimelineFilterUtils.createHBaseFilterList(
-                FlowRunColumnPrefix.METRIC, metricsToRetrieve));
+        columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList(
+            FlowRunColumnPrefix.METRIC, metricsToRetrieve));
         infoColFamilyList.addFilter(columnsList);
         list.addFilter(infoColFamilyList);
       }
@@ -195,9 +193,10 @@ class FlowRunEntityReader extends TimelineEntityReader {
   protected Result getResult(Configuration hbaseConf, Connection conn,
       FilterList filterList) throws IOException {
     TimelineReaderContext context = getContext();
-    byte[] rowKey =
-        FlowRunRowKey.getRowKey(context.getClusterId(), context.getUserId(),
+    FlowRunRowKey flowRunRowKey =
+        new FlowRunRowKey(context.getClusterId(), context.getUserId(),
             context.getFlowName(), context.getFlowRunId());
+    byte[] rowKey = flowRunRowKey.getRowKey();
     Get get = new Get(rowKey);
     get.setMaxVersions(Integer.MAX_VALUE);
     if (filterList != null && !filterList.getFilters().isEmpty()) {
@@ -207,13 +206,14 @@ class FlowRunEntityReader extends TimelineEntityReader {
   }
 
   @Override
-  protected ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException {
+  protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
+      FilterList filterList) throws IOException {
     Scan scan = new Scan();
     TimelineReaderContext context = getContext();
-    scan.setRowPrefixFilter(
-        FlowRunRowKey.getRowKeyPrefix(context.getClusterId(),
-            context.getUserId(), context.getFlowName()));
+    RowKeyPrefix<FlowRunRowKey> flowRunRowKeyPrefix =
+        new FlowRunRowKeyPrefix(context.getClusterId(), context.getUserId(),
+            context.getFlowName());
+    scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix());
     FilterList newList = new FilterList();
     newList.addFilter(new PageFilter(getFilters().getLimit()));
     if (filterList != null && !filterList.getFilters().isEmpty()) {
@@ -238,27 +238,27 @@ class FlowRunEntityReader extends TimelineEntityReader {
     }
 
     // read the start time
-    Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
+    Long startTime = (Long) FlowRunColumn.MIN_START_TIME.readResult(result);
     if (startTime != null) {
       flowRun.setStartTime(startTime.longValue());
     }
 
     // read the end time if available
-    Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
+    Long endTime = (Long) FlowRunColumn.MAX_END_TIME.readResult(result);
     if (endTime != null) {
       flowRun.setMaxEndTime(endTime.longValue());
     }
 
     // read the flow version
-    String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result);
+    String version = (String) FlowRunColumn.FLOW_VERSION.readResult(result);
     if (version != null) {
       flowRun.setVersion(version);
     }
 
     // read metrics if its a single entity query or if METRICS are part of
     // fieldsToRetrieve.
-    if (isSingleEntityRead() || TimelineStorageUtils.hasField(
-        getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) {
+    if (isSingleEntityRead()
+        || hasField(getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) 
{
       readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
index 22583b5..4e1ab8a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
@@ -19,6 +19,7 @@ package 
org.apache.hadoop.yarn.server.timelineservice.storage.reader;
 
 import java.io.IOException;
 import java.util.EnumSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -28,11 +29,11 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.FamilyFilter;
 import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
@@ -44,11 +45,16 @@ import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 
@@ -66,6 +72,12 @@ class GenericEntityReader extends TimelineEntityReader {
    */
   private final AppToFlowTable appToFlowTable = new AppToFlowTable();
 
+  /**
+   * Used to convert strings key components to and from storage format.
+   */
+  private final KeyConverter<String> stringKeyConverter =
+      new StringKeyConverter();
+
   public GenericEntityReader(TimelineReaderContext ctxt,
       TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve,
       boolean sortedKeys) {
@@ -95,32 +107,29 @@ class GenericEntityReader extends TimelineEntityReader {
     long createdTimeBegin = filters.getCreatedTimeBegin();
     long createdTimeEnd = filters.getCreatedTimeEnd();
     if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createSingleColValueFiltersByRange(
-              EntityColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd));
+      listBasedOnFilters.addFilter(TimelineFilterUtils
+          .createSingleColValueFiltersByRange(EntityColumn.CREATED_TIME,
+              createdTimeBegin, createdTimeEnd));
     }
     // Create filter list based on metric filters and add it to
     // listBasedOnFilters.
     TimelineFilterList metricFilters = filters.getMetricFilters();
     if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createHBaseFilterList(
-              EntityColumnPrefix.METRIC, metricFilters));
+      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
+          EntityColumnPrefix.METRIC, metricFilters));
     }
     // Create filter list based on config filters and add it to
     // listBasedOnFilters.
     TimelineFilterList configFilters = filters.getConfigFilters();
     if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createHBaseFilterList(
-              EntityColumnPrefix.CONFIG, configFilters));
+      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
+          EntityColumnPrefix.CONFIG, configFilters));
     }
     // Create filter list based on info filters and add it to 
listBasedOnFilters
     TimelineFilterList infoFilters = filters.getInfoFilters();
     if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createHBaseFilterList(
-              EntityColumnPrefix.INFO, infoFilters));
+      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
+          EntityColumnPrefix.INFO, infoFilters));
     }
     return listBasedOnFilters;
   }
@@ -130,10 +139,10 @@ class GenericEntityReader extends TimelineEntityReader {
    *
    * @return true if we need to fetch some of the columns, false otherwise.
    */
-  private static boolean fetchPartialEventCols(TimelineFilterList eventFilters,
+  private boolean fetchPartialEventCols(TimelineFilterList eventFilters,
       EnumSet<Field> fieldsToRetrieve) {
     return (eventFilters != null && !eventFilters.getFilterList().isEmpty() &&
-        !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS));
+        !hasField(fieldsToRetrieve, Field.EVENTS));
   }
 
   /**
@@ -141,10 +150,10 @@ class GenericEntityReader extends TimelineEntityReader {
    *
    * @return true if we need to fetch some of the columns, false otherwise.
    */
-  private static boolean fetchPartialRelatesToCols(TimelineFilterList 
relatesTo,
+  private boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo,
       EnumSet<Field> fieldsToRetrieve) {
     return (relatesTo != null && !relatesTo.getFilterList().isEmpty() &&
-        !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO));
+        !hasField(fieldsToRetrieve, Field.RELATES_TO));
   }
 
   /**
@@ -152,10 +161,10 @@ class GenericEntityReader extends TimelineEntityReader {
    *
    * @return true if we need to fetch some of the columns, false otherwise.
    */
-  private static boolean fetchPartialIsRelatedToCols(
-      TimelineFilterList isRelatedTo, EnumSet<Field> fieldsToRetrieve) {
+  private boolean fetchPartialIsRelatedToCols(TimelineFilterList isRelatedTo,
+      EnumSet<Field> fieldsToRetrieve) {
     return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() &&
-        !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO));
+        !hasField(fieldsToRetrieve, Field.IS_RELATED_TO));
   }
 
   /**
@@ -163,19 +172,20 @@ class GenericEntityReader extends TimelineEntityReader {
    * relatesto and isrelatedto from info family.
    *
    * @return true, if we need to fetch only some of the columns, false if we
-   *     need to fetch all the columns under info column family.
+   *         need to fetch all the columns under info column family.
    */
   protected boolean fetchPartialColsFromInfoFamily() {
     EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
     TimelineEntityFilters filters = getFilters();
-    return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) 
||
-        fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) ||
-        fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), 
fieldsToRetrieve);
+    return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve)
+        || fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve)
+        || fetchPartialIsRelatedToCols(filters.getIsRelatedTo(),
+            fieldsToRetrieve);
   }
 
   /**
-   * Check if we need to create filter list based on fields. We need to create
-   * a filter list iff all fields need not be retrieved or we have some 
specific
+   * Check if we need to create filter list based on fields. We need to create 
a
+   * filter list iff all fields need not be retrieved or we have some specific
    * fields or metrics to retrieve. We also need to create a filter list if we
    * have relationships(relatesTo/isRelatedTo) and event filters specified for
    * the query.
@@ -188,22 +198,24 @@ class GenericEntityReader extends TimelineEntityReader {
     // be retrieved, also check if we have some metrics or configs to
     // retrieve specified for the query because then a filter list will have
     // to be created.
-    boolean flag = !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) ||
-        (dataToRetrieve.getConfsToRetrieve() != null &&
-        !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) ||
-        (dataToRetrieve.getMetricsToRetrieve() != null &&
-        !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty());
+    boolean flag =
+        !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL)
+            || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve
+                .getConfsToRetrieve().getFilterList().isEmpty())
+            || (dataToRetrieve.getMetricsToRetrieve() != null && 
!dataToRetrieve
+                .getMetricsToRetrieve().getFilterList().isEmpty());
     // Filters need to be checked only if we are reading multiple entities. If
     // condition above is false, we check if there are relationships(relatesTo/
     // isRelatedTo) and event filters specified for the query.
     if (!flag && !isSingleEntityRead()) {
       TimelineEntityFilters filters = getFilters();
-      flag = (filters.getEventFilters() != null &&
-          !filters.getEventFilters().getFilterList().isEmpty()) ||
-          (filters.getIsRelatedTo() != null &&
-          !filters.getIsRelatedTo().getFilterList().isEmpty()) ||
-          (filters.getRelatesTo() != null &&
-          !filters.getRelatesTo().getFilterList().isEmpty());
+      flag =
+          (filters.getEventFilters() != null && !filters.getEventFilters()
+              .getFilterList().isEmpty())
+              || (filters.getIsRelatedTo() != null && !filters.getIsRelatedTo()
+                  .getFilterList().isEmpty())
+              || (filters.getRelatesTo() != null && !filters.getRelatesTo()
+                  .getFilterList().isEmpty());
     }
     return flag;
   }
@@ -216,8 +228,8 @@ class GenericEntityReader extends TimelineEntityReader {
    */
   protected void updateFixedColumns(FilterList list) {
     for (EntityColumn column : EntityColumn.values()) {
-      list.addFilter(new QualifierFilter(CompareOp.EQUAL,
-          new BinaryComparator(column.getColumnQualifierBytes())));
+      list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(
+          column.getColumnQualifierBytes())));
     }
   }
 
@@ -226,30 +238,29 @@ class GenericEntityReader extends TimelineEntityReader {
    * qualifiers in the info column family will be returned in result.
    *
    * @param isApplication If true, it means operations are to be performed for
-   *     application table, otherwise for entity table.
+   *          application table, otherwise for entity table.
    * @return filter list.
    * @throws IOException if any problem occurs while creating filter list.
    */
-  private FilterList createFilterListForColsOfInfoFamily()
-      throws IOException {
+  private FilterList createFilterListForColsOfInfoFamily() throws IOException {
     FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
     // Add filters for each column in entity table.
     updateFixedColumns(infoFamilyColsFilter);
     EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
     // If INFO field has to be retrieved, add a filter for fetching columns
     // with INFO column prefix.
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
+    if (hasField(fieldsToRetrieve, Field.INFO)) {
+      infoFamilyColsFilter
+          .addFilter(TimelineFilterUtils.createHBaseQualifierFilter(
               CompareOp.EQUAL, EntityColumnPrefix.INFO));
     }
     TimelineFilterList relatesTo = getFilters().getRelatesTo();
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+    if (hasField(fieldsToRetrieve, Field.RELATES_TO)) {
       // If RELATES_TO field has to be retrieved, add a filter for fetching
       // columns with RELATES_TO column prefix.
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, EntityColumnPrefix.RELATES_TO));
+      infoFamilyColsFilter.addFilter(TimelineFilterUtils
+          .createHBaseQualifierFilter(CompareOp.EQUAL,
+              EntityColumnPrefix.RELATES_TO));
     } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
       // Even if fields to retrieve does not contain RELATES_TO, we still
       // need to have a filter to fetch some of the column qualifiers if
@@ -257,17 +268,16 @@ class GenericEntityReader extends TimelineEntityReader {
       // matched after fetching rows from HBase.
       Set<String> relatesToCols =
           TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createFiltersFromColumnQualifiers(
-              EntityColumnPrefix.RELATES_TO, relatesToCols));
+      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+          EntityColumnPrefix.RELATES_TO, relatesToCols));
     }
     TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
       // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
       // columns with IS_RELATED_TO column prefix.
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, EntityColumnPrefix.IS_RELATED_TO));
+      infoFamilyColsFilter.addFilter(TimelineFilterUtils
+          .createHBaseQualifierFilter(CompareOp.EQUAL,
+              EntityColumnPrefix.IS_RELATED_TO));
     } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
       // Even if fields to retrieve does not contain IS_RELATED_TO, we still
       // need to have a filter to fetch some of the column qualifiers if
@@ -275,27 +285,26 @@ class GenericEntityReader extends TimelineEntityReader {
       // matched after fetching rows from HBase.
       Set<String> isRelatedToCols =
           TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createFiltersFromColumnQualifiers(
-              EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols));
+      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+          EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols));
     }
     TimelineFilterList eventFilters = getFilters().getEventFilters();
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+    if (hasField(fieldsToRetrieve, Field.EVENTS)) {
       // If EVENTS field has to be retrieved, add a filter for fetching columns
       // with EVENT column prefix.
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
+      infoFamilyColsFilter
+          .addFilter(TimelineFilterUtils.createHBaseQualifierFilter(
               CompareOp.EQUAL, EntityColumnPrefix.EVENT));
-    } else if (eventFilters != null && 
!eventFilters.getFilterList().isEmpty()){
+    } else if (eventFilters != null &&
+        !eventFilters.getFilterList().isEmpty()) {
       // Even if fields to retrieve does not contain EVENTS, we still need to
       // have a filter to fetch some of the column qualifiers on the basis of
       // event filters specified. Event filters will then be matched after
       // fetching rows from HBase.
       Set<String> eventCols =
           TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createFiltersFromColumnQualifiers(
-              EntityColumnPrefix.EVENT, eventCols));
+      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+          EntityColumnPrefix.EVENT, eventCols));
     }
     return infoFamilyColsFilter;
   }
@@ -310,28 +319,28 @@ class GenericEntityReader extends TimelineEntityReader {
   private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
     EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
     // Events not required.
-    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
-      infoColFamilyList.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.NOT_EQUAL, EntityColumnPrefix.EVENT));
+    if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
+      infoColFamilyList.addFilter(TimelineFilterUtils
+          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+              EntityColumnPrefix.EVENT));
     }
     // info not required.
-    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
-      infoColFamilyList.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.NOT_EQUAL, EntityColumnPrefix.INFO));
+    if (!hasField(fieldsToRetrieve, Field.INFO)) {
+      infoColFamilyList.addFilter(TimelineFilterUtils
+          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+              EntityColumnPrefix.INFO));
     }
     // is related to not required.
-    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) 
{
-      infoColFamilyList.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.NOT_EQUAL, EntityColumnPrefix.IS_RELATED_TO));
+    if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+      infoColFamilyList.addFilter(TimelineFilterUtils
+          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+              EntityColumnPrefix.IS_RELATED_TO));
     }
     // relates to not required.
-    if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
-      infoColFamilyList.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.NOT_EQUAL, EntityColumnPrefix.RELATES_TO));
+    if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+      infoColFamilyList.addFilter(TimelineFilterUtils
+          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
+              EntityColumnPrefix.RELATES_TO));
     }
   }
 
@@ -348,18 +357,18 @@ class GenericEntityReader extends TimelineEntityReader {
     // CONFS to fields to retrieve in augmentParams() even if not specified.
     if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
       // Create a filter list for configs.
-      listBasedOnFields.addFilter(TimelineFilterUtils.
-          createFilterForConfsOrMetricsToRetrieve(
-              dataToRetrieve.getConfsToRetrieve(),
-              EntityColumnFamily.CONFIGS, EntityColumnPrefix.CONFIG));
+      listBasedOnFields.addFilter(TimelineFilterUtils
+          .createFilterForConfsOrMetricsToRetrieve(
+              dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS,
+              EntityColumnPrefix.CONFIG));
     }
 
     // Please note that if metricsToRetrieve is specified, we would have added
     // METRICS to fields to retrieve in augmentParams() even if not specified.
     if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
       // Create a filter list for metrics.
-      listBasedOnFields.addFilter(TimelineFilterUtils.
-          createFilterForConfsOrMetricsToRetrieve(
+      listBasedOnFields.addFilter(TimelineFilterUtils
+          .createFilterForConfsOrMetricsToRetrieve(
               dataToRetrieve.getMetricsToRetrieve(),
               EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC));
     }
@@ -375,8 +384,8 @@ class GenericEntityReader extends TimelineEntityReader {
     FilterList infoColFamilyList = new FilterList();
     // By default fetch everything in INFO column family.
     FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL,
-           new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
+        new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(
+            EntityColumnFamily.INFO.getBytes()));
     infoColFamilyList.addFilter(infoColumnFamily);
     if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
       // We can fetch only some of the columns from info family.
@@ -394,27 +403,27 @@ class GenericEntityReader extends TimelineEntityReader {
   /**
    * Looks up flow context from AppToFlow table.
    *
-   * @param clusterId Cluster Id.
-   * @param appId App Id.
+   * @param appToFlowRowKey to identify Cluster and App Ids.
    * @param hbaseConf HBase configuration.
    * @param conn HBase Connection.
    * @return flow context information.
    * @throws IOException if any problem occurs while fetching flow information.
    */
-  protected FlowContext lookupFlowContext(String clusterId, String appId,
+  protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
       Configuration hbaseConf, Connection conn) throws IOException {
-    byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
+    byte[] rowKey = appToFlowRowKey.getRowKey();
     Get get = new Get(rowKey);
     Result result = appToFlowTable.getResult(hbaseConf, conn, get);
     if (result != null && !result.isEmpty()) {
-      return new FlowContext(
-          AppToFlowColumn.USER_ID.readResult(result).toString(),
-          AppToFlowColumn.FLOW_ID.readResult(result).toString(),
-          
((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
+      return new FlowContext(AppToFlowColumn.USER_ID.readResult(result)
+          .toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(),
+          ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result))
+          .longValue());
     } else {
       throw new NotFoundException(
-          "Unable to find the context flow ID and flow run ID for clusterId=" +
-          clusterId + ", appId=" + appId);
+          "Unable to find the context flow ID and flow run ID for clusterId="
+              + appToFlowRowKey.getClusterId() + ", appId="
+              + appToFlowRowKey.getAppId());
     }
   }
 
@@ -425,17 +434,21 @@ class GenericEntityReader extends TimelineEntityReader {
     private final String userId;
     private final String flowName;
     private final Long flowRunId;
+
     public FlowContext(String user, String flowName, Long flowRunId) {
       this.userId = user;
       this.flowName = flowName;
       this.flowRunId = flowRunId;
     }
+
     protected String getUserId() {
       return userId;
     }
+
     protected String getFlowName() {
       return flowName;
     }
+
     protected Long getFlowRunId() {
       return flowRunId;
     }
@@ -444,8 +457,8 @@ class GenericEntityReader extends TimelineEntityReader {
   @Override
   protected void validateParams() {
     Preconditions.checkNotNull(getContext(), "context shouldn't be null");
-    Preconditions.checkNotNull(
-        getDataToRetrieve(), "data to retrieve shouldn't be null");
+    Preconditions.checkNotNull(getDataToRetrieve(),
+        "data to retrieve shouldn't be null");
     Preconditions.checkNotNull(getContext().getClusterId(),
         "clusterId shouldn't be null");
     Preconditions.checkNotNull(getContext().getAppId(),
@@ -463,11 +476,13 @@ class GenericEntityReader extends TimelineEntityReader {
       throws IOException {
     TimelineReaderContext context = getContext();
     // In reality all three should be null or neither should be null
-    if (context.getFlowName() == null || context.getFlowRunId() == null ||
-        context.getUserId() == null) {
+    if (context.getFlowName() == null || context.getFlowRunId() == null
+        || context.getUserId() == null) {
       // Get flow context information from AppToFlow table.
-      FlowContext flowContext = lookupFlowContext(
-          context.getClusterId(), context.getAppId(), hbaseConf, conn);
+      AppToFlowRowKey appToFlowRowKey =
+          new AppToFlowRowKey(context.getClusterId(), context.getAppId());
+      FlowContext flowContext =
+          lookupFlowContext(appToFlowRowKey, hbaseConf, conn);
       context.setFlowName(flowContext.flowName);
       context.setFlowRunId(flowContext.flowRunId);
       context.setUserId(flowContext.userId);
@@ -485,9 +500,9 @@ class GenericEntityReader extends TimelineEntityReader {
       FilterList filterList) throws IOException {
     TimelineReaderContext context = getContext();
     byte[] rowKey =
-        EntityRowKey.getRowKey(context.getClusterId(), context.getUserId(),
+        new EntityRowKey(context.getClusterId(), context.getUserId(),
             context.getFlowName(), context.getFlowRunId(), context.getAppId(),
-            context.getEntityType(), context.getEntityId());
+            context.getEntityType(), context.getEntityId()).getRowKey();
     Get get = new Get(rowKey);
     get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
     if (filterList != null && !filterList.getFilters().isEmpty()) {
@@ -497,15 +512,17 @@ class GenericEntityReader extends TimelineEntityReader {
   }
 
   @Override
-  protected ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException {
+  protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
+      FilterList filterList) throws IOException {
     // Scan through part of the table to find the entities belong to one app
     // and one type
     Scan scan = new Scan();
     TimelineReaderContext context = getContext();
-    scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
-        context.getClusterId(), context.getUserId(), context.getFlowName(),
-        context.getFlowRunId(), context.getAppId(), context.getEntityType()));
+    RowKeyPrefix<EntityRowKey> entityRowKeyPrefix =
+        new EntityRowKeyPrefix(context.getClusterId(), context.getUserId(),
+            context.getFlowName(), context.getFlowRunId(), context.getAppId(),
+            context.getEntityType());
+    scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix());
     scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
     if (filterList != null && !filterList.getFilters().isEmpty()) {
       scan.setFilter(filterList);
@@ -535,18 +552,16 @@ class GenericEntityReader extends TimelineEntityReader {
     // locally as relevant HBase filters to filter out rows on the basis of
     // isRelatedTo are not set in HBase scan.
     boolean checkIsRelatedTo =
-        !isSingleEntityRead() && filters.getIsRelatedTo() != null &&
-        filters.getIsRelatedTo().getFilterList().size() > 0;
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) ||
-        checkIsRelatedTo) {
-      TimelineStorageUtils.readRelationship(
-          entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
-      if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
-          filters.getIsRelatedTo())) {
+        !isSingleEntityRead() && filters.getIsRelatedTo() != null
+            && filters.getIsRelatedTo().getFilterList().size() > 0;
+    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) {
+      readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
+      if (checkIsRelatedTo
+          && !TimelineStorageUtils.matchIsRelatedTo(entity,
+              filters.getIsRelatedTo())) {
         return null;
       }
-      if (!TimelineStorageUtils.hasField(fieldsToRetrieve,
-          Field.IS_RELATED_TO)) {
+      if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
         entity.getIsRelatedToEntities().clear();
       }
     }
@@ -556,31 +571,29 @@ class GenericEntityReader extends TimelineEntityReader {
     // locally as relevant HBase filters to filter out rows on the basis of
     // relatesTo are not set in HBase scan.
     boolean checkRelatesTo =
-        !isSingleEntityRead() && filters.getRelatesTo() != null &&
-        filters.getRelatesTo().getFilterList().size() > 0;
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) ||
-        checkRelatesTo) {
-      TimelineStorageUtils.readRelationship(
-          entity, result, EntityColumnPrefix.RELATES_TO, false);
-      if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
-          filters.getRelatesTo())) {
+        !isSingleEntityRead() && filters.getRelatesTo() != null
+            && filters.getRelatesTo().getFilterList().size() > 0;
+    if (hasField(fieldsToRetrieve, Field.RELATES_TO)
+        || checkRelatesTo) {
+      readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
+      if (checkRelatesTo
+          && !TimelineStorageUtils.matchRelatesTo(entity,
+              filters.getRelatesTo())) {
         return null;
       }
-      if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+      if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
         entity.getRelatesToEntities().clear();
       }
     }
 
     // fetch info if fieldsToRetrieve contains INFO or ALL.
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) {
-      TimelineStorageUtils.readKeyValuePairs(
-          entity, result, EntityColumnPrefix.INFO, false);
+    if (hasField(fieldsToRetrieve, Field.INFO)) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
     }
 
     // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) {
-      TimelineStorageUtils.readKeyValuePairs(
-          entity, result, EntityColumnPrefix.CONFIG, true);
+    if (hasField(fieldsToRetrieve, Field.CONFIGS)) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
     }
 
     // fetch events and match event filters if they exist. If event filters do
@@ -588,24 +601,48 @@ class GenericEntityReader extends TimelineEntityReader {
     // as relevant HBase filters to filter out rows on the basis of events
     // are not set in HBase scan.
     boolean checkEvents =
-        !isSingleEntityRead() && filters.getEventFilters() != null &&
-        filters.getEventFilters().getFilterList().size() > 0;
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) ||
-        checkEvents) {
-      TimelineStorageUtils.readEvents(entity, result, 
EntityColumnPrefix.EVENT);
-      if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
-          filters.getEventFilters())) {
+        !isSingleEntityRead() && filters.getEventFilters() != null
+            && filters.getEventFilters().getFilterList().size() > 0;
+    if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) {
+      readEvents(entity, result, EntityColumnPrefix.EVENT);
+      if (checkEvents
+          && !TimelineStorageUtils.matchEventFilters(entity,
+              filters.getEventFilters())) {
         return null;
       }
-      if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) {
+      if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
         entity.getEvents().clear();
       }
     }
 
     // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
-    if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) {
+    if (hasField(fieldsToRetrieve, Field.METRICS)) {
       readMetrics(entity, result, EntityColumnPrefix.METRIC);
     }
     return entity;
   }
+
+  /**
+   * Helper method for reading key-value pairs for either info or config.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param entity entity to fill.
+   * @param result result from HBase.
+   * @param prefix column prefix.
+   * @param isConfig if true, means we are reading configs, otherwise info.
+   * @throws IOException if any problem is encountered while reading result.
+   */
+  protected <T> void readKeyValuePairs(TimelineEntity entity, Result result,
+      ColumnPrefix<T> prefix, boolean isConfig) throws IOException {
+    // info and configuration are of type Map<String, Object or String>
+    Map<String, Object> columns =
+        prefix.readResults(result, stringKeyConverter);
+    if (isConfig) {
+      for (Map.Entry<String, Object> column : columns.entrySet()) {
+        entity.addConfig(column.getKey(), column.getValue().toString());
+      }
+    } else {
+      entity.addInfo(columns);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
index 852834e..7b294a8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
 
 import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
@@ -30,15 +33,27 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 
 /**
  * The base class for reading and deserializing timeline entities from the
@@ -68,6 +83,12 @@ public abstract class TimelineEntityReader {
   private boolean sortedKeys = false;
 
   /**
+   * Used to convert strings key components to and from storage format.
+   */
+  private final KeyConverter<String> stringKeyConverter =
+      new StringKeyConverter();
+
+  /**
    * Instantiates a reader for multiple-entity reads.
    *
    * @param ctxt Reader context which defines the scope in which query has to 
be
@@ -331,7 +352,7 @@ public abstract class TimelineEntityReader {
       ColumnPrefix<?> columnPrefix) throws IOException {
     NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
         columnPrefix.readResultsWithTimestamps(
-            result, StringKeyConverter.getInstance());
+            result, stringKeyConverter);
     for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
         metricsResult.entrySet()) {
       TimelineMetric metric = new TimelineMetric();
@@ -359,4 +380,117 @@ public abstract class TimelineEntityReader {
   protected void setTable(BaseTable<?> baseTable) {
     this.table = baseTable;
   }
+
+  /**
+   * Check if we have a certain field amongst fields to retrieve. This method
+   * checks against {@link Field#ALL} as well because that would mean field
+   * passed needs to be matched.
+   *
+   * @param fieldsToRetrieve fields to be retrieved.
+   * @param requiredField fields to be checked in fieldsToRetrieve.
+   * @return true if has the required field, false otherwise.
+   */
+  protected boolean hasField(EnumSet<Field> fieldsToRetrieve,
+      Field requiredField) {
+    return fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(requiredField);
+  }
+
+  /**
+   * Create a filter list of qualifier filters based on passed set of columns.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param colPrefix Column Prefix.
+   * @param columns set of column qualifiers.
+   * @return filter list.
+   */
+  protected <T> FilterList createFiltersFromColumnQualifiers(
+      ColumnPrefix<T> colPrefix, Set<String> columns) {
+    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
+    for (String column : columns) {
+      // For columns which have compound column qualifiers (eg. events), we 
need
+      // to include the required separator.
+      byte[] compoundColQual = createColQualifierPrefix(colPrefix, column);
+      list.addFilter(new QualifierFilter(CompareOp.EQUAL,
+          new BinaryPrefixComparator(colPrefix
+              .getColumnPrefixBytes(compoundColQual))));
+    }
+    return list;
+  }
+
+  protected <T> byte[] createColQualifierPrefix(ColumnPrefix<T> colPrefix,
+      String column) {
+    if (colPrefix == ApplicationColumnPrefix.EVENT
+        || colPrefix == EntityColumnPrefix.EVENT) {
+      return new EventColumnName(column, null, null).getColumnQualifier();
+    } else {
+      return stringKeyConverter.encode(column);
+    }
+  }
+
+  /**
+   * Helper method for reading relationship.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param entity entity to fill.
+   * @param result result from HBase.
+   * @param prefix column prefix.
+   * @param isRelatedTo if true, means relationship is to be added to
+   *          isRelatedTo, otherwise its added to relatesTo.
+   * @throws IOException if any problem is encountered while reading result.
+   */
+  protected <T> void readRelationship(TimelineEntity entity, Result result,
+      ColumnPrefix<T> prefix, boolean isRelatedTo) throws IOException {
+    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
+    Map<String, Object> columns =
+        prefix.readResults(result, stringKeyConverter);
+    for (Map.Entry<String, Object> column : columns.entrySet()) {
+      for (String id : Separator.VALUES.splitEncoded(column.getValue()
+          .toString())) {
+        if (isRelatedTo) {
+          entity.addIsRelatedToEntity(column.getKey(), id);
+        } else {
+          entity.addRelatesToEntity(column.getKey(), id);
+        }
+      }
+    }
+  }
+
+  /**
+   * Read events from the entity table or the application table. The column 
name
+   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
+   * if there is no info associated with the event.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param entity entity to fill.
+   * @param result HBase Result.
+   * @param prefix column prefix.
+   * @throws IOException if any problem is encountered while reading result.
+   */
+  protected static <T> void readEvents(TimelineEntity entity, Result result,
+      ColumnPrefix<T> prefix) throws IOException {
+    Map<String, TimelineEvent> eventsMap = new HashMap<>();
+    Map<EventColumnName, Object> eventsResult =
+        prefix.readResults(result, new EventColumnNameConverter());
+    for (Map.Entry<EventColumnName, Object>
+             eventResult : eventsResult.entrySet()) {
+      EventColumnName eventColumnName = eventResult.getKey();
+      String key = eventColumnName.getId() +
+          Long.toString(eventColumnName.getTimestamp());
+      // Retrieve previously seen event to add to it
+      TimelineEvent event = eventsMap.get(key);
+      if (event == null) {
+        // First time we're seeing this event, add it to the eventsMap
+        event = new TimelineEvent();
+        event.setId(eventColumnName.getId());
+        event.setTimestamp(eventColumnName.getTimestamp());
+        eventsMap.put(key, event);
+      }
+      if (eventColumnName.getInfoKey() != null) {
+        event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue());
+      }
+    }
+    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
+    entity.addEvents(eventsSet);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
index 74e4b5d..58df970 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
@@ -24,220 +24,13 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKeyConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyConverter;
 import org.junit.Test;
 
 public class TestKeyConverters {
-  private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue();
-  private final static byte[] QUALIFIER_SEP_BYTES =
-      Bytes.toBytes(QUALIFIER_SEP);
-  private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster";
-  private final static String USER = QUALIFIER_SEP + "user";
-  private final static String FLOW_NAME =
-      "dummy_" + QUALIFIER_SEP + "flow" + QUALIFIER_SEP;
-  private final static Long FLOW_RUN_ID;
-  private final static String APPLICATION_ID;
-  static {
-    long runid = Long.MAX_VALUE - 900L;
-    byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE);
-    byte[] byteArr = Bytes.toBytes(runid);
-    int sepByteLen = QUALIFIER_SEP_BYTES.length;
-    if (sepByteLen <= byteArr.length) {
-      for (int i = 0; i < sepByteLen; i++) {
-        byteArr[i] = (byte)(longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]);
-      }
-    }
-    FLOW_RUN_ID = Bytes.toLong(byteArr);
-    long clusterTs = System.currentTimeMillis();
-    byteArr = Bytes.toBytes(clusterTs);
-    if (sepByteLen <= byteArr.length) {
-      for (int i = 0; i < sepByteLen; i++) {
-        byteArr[byteArr.length - sepByteLen + i] =
-            (byte)(longMaxByteArr[byteArr.length - sepByteLen + i] -
-                QUALIFIER_SEP_BYTES[i]);
-      }
-    }
-    clusterTs = Bytes.toLong(byteArr);
-    int seqId = 222;
-    APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString();
-  }
-
-  private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) {
-    int sepLen = QUALIFIER_SEP_BYTES.length;
-    for (int i = 0; i < sepLen; i++) {
-      assertTrue("Row key prefix not encoded properly.",
-        byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen  + i] ==
-            QUALIFIER_SEP_BYTES[i]);
-    }
-  }
-
-  @Test
-  public void testFlowActivityRowKeyConverter() {
-    Long ts = TimelineStorageUtils.getTopOfTheDayTimestamp(1459900830000L);
-    byte[] byteRowKey = FlowActivityRowKeyConverter.getInstance().encode(
-        new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME));
-    FlowActivityRowKey rowKey =
-        FlowActivityRowKeyConverter.getInstance().decode(byteRowKey);
-    assertEquals(CLUSTER, rowKey.getClusterId());
-    assertEquals(ts, rowKey.getDayTimestamp());
-    assertEquals(USER, rowKey.getUserId());
-    assertEquals(FLOW_NAME, rowKey.getFlowName());
-
-    byte[] byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode(
-        new FlowActivityRowKey(CLUSTER, null, null, null));
-    byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
-        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
-    assertEquals(2, splits.length);
-    assertEquals(0, splits[1].length);
-    assertEquals(CLUSTER,
-        Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-
-    byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode(
-        new FlowActivityRowKey(CLUSTER, ts, null, null));
-    splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
-        Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE });
-    assertEquals(3, splits.length);
-    assertEquals(0, splits[2].length);
-    assertEquals(CLUSTER,
-        Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
-    assertEquals(ts, (Long) TimelineStorageUtils.invertLong(
-        Bytes.toLong(splits[1])));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-  }
-
-  @Test
-  public void testFlowRunRowKeyConverter() {
-    byte[] byteRowKey = FlowRunRowKeyConverter.getInstance().encode(
-        new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID));
-    FlowRunRowKey rowKey =
-        FlowRunRowKeyConverter.getInstance().decode(byteRowKey);
-    assertEquals(CLUSTER, rowKey.getClusterId());
-    assertEquals(USER, rowKey.getUserId());
-    assertEquals(FLOW_NAME, rowKey.getFlowName());
-    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
-
-    byte[] byteRowKeyPrefix = FlowRunRowKeyConverter.getInstance().encode(
-        new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null));
-    byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
-        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
-    assertEquals(4, splits.length);
-    assertEquals(0, splits[3].length);
-    assertEquals(FLOW_NAME,
-        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-  }
-
-  @Test
-  public void testApplicationRowKeyConverter() {
-    byte[] byteRowKey = ApplicationRowKeyConverter.getInstance().encode(
-        new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
-            APPLICATION_ID));
-    ApplicationRowKey rowKey =
-        ApplicationRowKeyConverter.getInstance().decode(byteRowKey);
-    assertEquals(CLUSTER, rowKey.getClusterId());
-    assertEquals(USER, rowKey.getUserId());
-    assertEquals(FLOW_NAME, rowKey.getFlowName());
-    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
-    assertEquals(APPLICATION_ID, rowKey.getAppId());
-
-    byte[] byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode(
-        new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, null));
-    byte[][] splits =
-        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
-            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-            Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
-            Separator.VARIABLE_SIZE });
-    assertEquals(5, splits.length);
-    assertEquals(0, splits[4].length);
-    assertEquals(FLOW_NAME,
-        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
-    assertEquals(FLOW_RUN_ID, (Long)TimelineStorageUtils.invertLong(
-        Bytes.toLong(splits[3])));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-
-    byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode(
-        new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, null, null));
-    splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
-        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
-    assertEquals(4, splits.length);
-    assertEquals(0, splits[3].length);
-    assertEquals(FLOW_NAME,
-        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-  }
-
-  @Test
-  public void testEntityRowKeyConverter() {
-    String entityId = "!ent!ity!!id!";
-    String entityType = "entity!Type";
-    byte[] byteRowKey = EntityRowKeyConverter.getInstance().encode(
-        new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
-            entityType, entityId));
-    EntityRowKey rowKey =
-        EntityRowKeyConverter.getInstance().decode(byteRowKey);
-    assertEquals(CLUSTER, rowKey.getClusterId());
-    assertEquals(USER, rowKey.getUserId());
-    assertEquals(FLOW_NAME, rowKey.getFlowName());
-    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
-    assertEquals(APPLICATION_ID, rowKey.getAppId());
-    assertEquals(entityType, rowKey.getEntityType());
-    assertEquals(entityId, rowKey.getEntityId());
-
-    byte[] byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode(
-        new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
-            entityType, null));
-    byte[][] splits =
-        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
-            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-            Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
-            AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
-            Separator.VARIABLE_SIZE });
-    assertEquals(7, splits.length);
-    assertEquals(0, splits[6].length);
-    assertEquals(APPLICATION_ID,
-        AppIdKeyConverter.getInstance().decode(splits[4]));
-    assertEquals(entityType, Separator.QUALIFIERS.decode(
-        Bytes.toString(splits[5])));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-
-    byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode(
-        new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
-        null, null));
-    splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
-        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-        Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
-        AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE });
-    assertEquals(6, splits.length);
-    assertEquals(0, splits[5].length);
-    assertEquals(APPLICATION_ID,
-        AppIdKeyConverter.getInstance().decode(splits[4]));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-  }
-
-  @Test
-  public void testAppToFlowRowKeyConverter() {
-    byte[] byteRowKey = AppToFlowRowKeyConverter.getInstance().encode(
-        new AppToFlowRowKey(CLUSTER, APPLICATION_ID));
-    AppToFlowRowKey rowKey =
-        AppToFlowRowKeyConverter.getInstance().decode(byteRowKey);
-    assertEquals(CLUSTER, rowKey.getClusterId());
-    assertEquals(APPLICATION_ID, rowKey.getAppId());
-  }
 
   @Test
   public void testAppIdKeyConverter() {
+    AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter();
     long currentTs = System.currentTimeMillis();
     ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1);
     ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2);
@@ -245,18 +38,19 @@ public class TestKeyConverters {
     String appIdStr1 = appId1.toString();
     String appIdStr2 = appId2.toString();
     String appIdStr3 = appId3.toString();
-    byte[] appIdBytes1 = AppIdKeyConverter.getInstance().encode(appIdStr1);
-    byte[] appIdBytes2 = AppIdKeyConverter.getInstance().encode(appIdStr2);
-    byte[] appIdBytes3 = AppIdKeyConverter.getInstance().encode(appIdStr3);
+    byte[] appIdBytes1 = appIdKeyConverter.encode(appIdStr1);
+    byte[] appIdBytes2 = appIdKeyConverter.encode(appIdStr2);
+    byte[] appIdBytes3 = appIdKeyConverter.encode(appIdStr3);
     // App ids' should be encoded in a manner wherein descending order
     // is maintained.
-    assertTrue("Ordering of app ids' is incorrect",
-        Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 &&
-        Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 &&
-        Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
-    String decodedAppId1 = AppIdKeyConverter.getInstance().decode(appIdBytes1);
-    String decodedAppId2 = AppIdKeyConverter.getInstance().decode(appIdBytes2);
-    String decodedAppId3 = AppIdKeyConverter.getInstance().decode(appIdBytes3);
+    assertTrue(
+        "Ordering of app ids' is incorrect",
+        Bytes.compareTo(appIdBytes1, appIdBytes2) > 0
+            && Bytes.compareTo(appIdBytes1, appIdBytes3) > 0
+            && Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
+    String decodedAppId1 = appIdKeyConverter.decode(appIdBytes1);
+    String decodedAppId2 = appIdKeyConverter.decode(appIdBytes2);
+    String decodedAppId3 = appIdKeyConverter.decode(appIdBytes3);
     assertTrue("Decoded app id is not same as the app id encoded",
         appIdStr1.equals(decodedAppId1));
     assertTrue("Decoded app id is not same as the app id encoded",
@@ -273,21 +67,64 @@ public class TestKeyConverters {
         Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length);
     byte[] ts = Bytes.add(valSepBytes, maxByteArr);
     Long eventTs = Bytes.toLong(ts);
-    byte[] byteEventColName = EventColumnNameConverter.getInstance().encode(
-        new EventColumnName(eventId, eventTs, null));
+    byte[] byteEventColName =
+        new EventColumnName(eventId, eventTs, null).getColumnQualifier();
+    KeyConverter<EventColumnName> eventColumnNameConverter =
+        new EventColumnNameConverter();
     EventColumnName eventColName =
-        EventColumnNameConverter.getInstance().decode(byteEventColName);
+        eventColumnNameConverter.decode(byteEventColName);
     assertEquals(eventId, eventColName.getId());
     assertEquals(eventTs, eventColName.getTimestamp());
     assertNull(eventColName.getInfoKey());
 
     String infoKey = "f=oo_event_in=fo=_key";
-    byteEventColName = EventColumnNameConverter.getInstance().encode(
-        new EventColumnName(eventId, eventTs, infoKey));
-    eventColName =
-        EventColumnNameConverter.getInstance().decode(byteEventColName);
+    byteEventColName =
+        new EventColumnName(eventId, eventTs, infoKey).getColumnQualifier();
+    eventColName = eventColumnNameConverter.decode(byteEventColName);
     assertEquals(eventId, eventColName.getId());
     assertEquals(eventTs, eventColName.getTimestamp());
     assertEquals(infoKey, eventColName.getInfoKey());
   }
+
+  @Test
+  public void testLongKeyConverter() {
+    LongKeyConverter longKeyConverter = new LongKeyConverter();
+    confirmLongKeyConverter(longKeyConverter, Long.MIN_VALUE);
+    confirmLongKeyConverter(longKeyConverter, -1234567890L);
+    confirmLongKeyConverter(longKeyConverter, -128L);
+    confirmLongKeyConverter(longKeyConverter, -127L);
+    confirmLongKeyConverter(longKeyConverter, -1L);
+    confirmLongKeyConverter(longKeyConverter, 0L);
+    confirmLongKeyConverter(longKeyConverter, 1L);
+    confirmLongKeyConverter(longKeyConverter, 127L);
+    confirmLongKeyConverter(longKeyConverter, 128L);
+    confirmLongKeyConverter(longKeyConverter, 1234567890L);
+    confirmLongKeyConverter(longKeyConverter, Long.MAX_VALUE);
+  }
+
+  private void confirmLongKeyConverter(LongKeyConverter longKeyConverter,
+      Long testValue) {
+    Long decoded = longKeyConverter.decode(longKeyConverter.encode(testValue));
+    assertEquals(testValue, decoded);
+  }
+
+  @Test
+  public void testStringKeyConverter() {
+    StringKeyConverter stringKeyConverter = new StringKeyConverter();
+    String phrase = "QuackAttack now!";
+
+    for (int i = 0; i < phrase.length(); i++) {
+      String sub = phrase.substring(i, phrase.length());
+      confirmStrignKeyConverter(stringKeyConverter, sub);
+      confirmStrignKeyConverter(stringKeyConverter, sub + sub);
+    }
+  }
+
+  private void confirmStrignKeyConverter(StringKeyConverter stringKeyConverter,
+      String testValue) {
+    String decoded =
+        stringKeyConverter.decode(stringKeyConverter.encode(testValue));
+    assertEquals(testValue, decoded);
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to