http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
new file mode 100644
index 0000000..b58bbe3
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
@@ -0,0 +1,804 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFileSystemTimelineReaderImpl {
+
+  private static final String ROOT_DIR =
+      FileSystemTimelineReaderImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
+  private FileSystemTimelineReaderImpl reader;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    loadEntityData();
+    // Create app flow mapping file.
+    CSVFormat format =
+        CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
+    String appFlowMappingFile = ROOT_DIR + "/entities/cluster1/" +
+        FileSystemTimelineReaderImpl.APP_FLOW_MAPPING_FILE;
+    try (PrintWriter out =
+        new PrintWriter(new BufferedWriter(
+            new FileWriter(appFlowMappingFile, true)));
+        CSVPrinter printer = new CSVPrinter(out, format)){
+      printer.printRecord("app1", "user1", "flow1", 1);
+      printer.printRecord("app2", "user1", "flow1,flow", 1);
+      printer.close();
+    }
+    (new File(ROOT_DIR)).deleteOnExit();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    FileUtils.deleteDirectory(new File(ROOT_DIR));
+  }
+
+  @Before
+  public void init() throws Exception {
+    reader = new FileSystemTimelineReaderImpl();
+    Configuration conf = new YarnConfiguration();
+    conf.set(FileSystemTimelineReaderImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+        ROOT_DIR);
+    reader.init(conf);
+  }
+
+  private static void writeEntityFile(TimelineEntity entity, File dir)
+      throws Exception {
+    if (!dir.exists()) {
+      if (!dir.mkdirs()) {
+        throw new IOException("Could not create directories for " + dir);
+      }
+    }
+    String fileName = dir.getAbsolutePath() + "/" + entity.getId() + ".thist";
+    try (PrintWriter out =
+        new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)))){
+      out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      out.write("\n");
+      out.close();
+    }
+  }
+
+  private static void loadEntityData() throws Exception {
+    File appDir = new File(ROOT_DIR +
+        "/entities/cluster1/user1/flow1/1/app1/app/");
+    TimelineEntity entity11 = new TimelineEntity();
+    entity11.setId("id_1");
+    entity11.setType("app");
+    entity11.setCreatedTime(1425016502000L);
+    Map<String, Object> info1 = new HashMap<String, Object>();
+    info1.put("info1", "val1");
+    info1.put("info2", "val5");
+    entity11.addInfo(info1);
+    TimelineEvent event = new TimelineEvent();
+    event.setId("event_1");
+    event.setTimestamp(1425016502003L);
+    entity11.addEvent(event);
+    Set<TimelineMetric> metrics = new HashSet<TimelineMetric>();
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setId("metric1");
+    metric1.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric1.addValue(1425016502006L, 113);
+    metrics.add(metric1);
+    TimelineMetric metric2 = new TimelineMetric();
+    metric2.setId("metric2");
+    metric2.setType(TimelineMetric.Type.TIME_SERIES);
+    metric2.addValue(1425016502016L, 34);
+    metrics.add(metric2);
+    entity11.setMetrics(metrics);
+    Map<String, String> configs = new HashMap<String, String>();
+    configs.put("config_1", "127");
+    entity11.setConfigs(configs);
+    entity11.addRelatesToEntity("flow", "flow1");
+    entity11.addIsRelatedToEntity("type1", "tid1_1");
+    writeEntityFile(entity11, appDir);
+    TimelineEntity entity12 = new TimelineEntity();
+    entity12.setId("id_1");
+    entity12.setType("app");
+    configs.clear();
+    configs.put("config_2", "23");
+    configs.put("config_3", "abc");
+    entity12.addConfigs(configs);
+    metrics.clear();
+    TimelineMetric metric12 = new TimelineMetric();
+    metric12.setId("metric2");
+    metric12.setType(TimelineMetric.Type.TIME_SERIES);
+    metric12.addValue(1425016502032L, 48);
+    metric12.addValue(1425016502054L, 51);
+    metrics.add(metric12);
+    TimelineMetric metric3 = new TimelineMetric();
+    metric3.setId("metric3");
+    metric3.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric3.addValue(1425016502060L, 23L);
+    metrics.add(metric3);
+    entity12.setMetrics(metrics);
+    entity12.addIsRelatedToEntity("type1", "tid1_2");
+    entity12.addIsRelatedToEntity("type2", "tid2_1`");
+    TimelineEvent event15 = new TimelineEvent();
+    event15.setId("event_5");
+    event15.setTimestamp(1425016502017L);
+    entity12.addEvent(event15);
+    writeEntityFile(entity12, appDir);
+
+    TimelineEntity entity2 = new TimelineEntity();
+    entity2.setId("id_2");
+    entity2.setType("app");
+    entity2.setCreatedTime(1425016501050L);
+    Map<String, Object> info2 = new HashMap<String, Object>();
+    info1.put("info2", 4);
+    entity2.addInfo(info2);
+    Map<String, String> configs2 = new HashMap<String, String>();
+    configs2.put("config_1", "129");
+    configs2.put("config_3", "def");
+    entity2.setConfigs(configs2);
+    TimelineEvent event2 = new TimelineEvent();
+    event2.setId("event_2");
+    event2.setTimestamp(1425016501003L);
+    entity2.addEvent(event2);
+    Set<TimelineMetric> metrics2 = new HashSet<TimelineMetric>();
+    TimelineMetric metric21 = new TimelineMetric();
+    metric21.setId("metric1");
+    metric21.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric21.addValue(1425016501006L, 300);
+    metrics2.add(metric21);
+    TimelineMetric metric22 = new TimelineMetric();
+    metric22.setId("metric2");
+    metric22.setType(TimelineMetric.Type.TIME_SERIES);
+    metric22.addValue(1425016501056L, 31);
+    metric22.addValue(1425016501084L, 70);
+    metrics2.add(metric22);
+    TimelineMetric metric23 = new TimelineMetric();
+    metric23.setId("metric3");
+    metric23.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric23.addValue(1425016502060L, 23L);
+    metrics2.add(metric23);
+    entity2.setMetrics(metrics2);
+    entity2.addRelatesToEntity("flow", "flow2");
+    writeEntityFile(entity2, appDir);
+
+    TimelineEntity entity3 = new TimelineEntity();
+    entity3.setId("id_3");
+    entity3.setType("app");
+    entity3.setCreatedTime(1425016501050L);
+    Map<String, Object> info3 = new HashMap<String, Object>();
+    info3.put("info2", 3.5);
+    info3.put("info4", 20);
+    entity3.addInfo(info3);
+    Map<String, String> configs3 = new HashMap<String, String>();
+    configs3.put("config_1", "123");
+    configs3.put("config_3", "abc");
+    entity3.setConfigs(configs3);
+    TimelineEvent event3 = new TimelineEvent();
+    event3.setId("event_2");
+    event3.setTimestamp(1425016501003L);
+    entity3.addEvent(event3);
+    TimelineEvent event4 = new TimelineEvent();
+    event4.setId("event_4");
+    event4.setTimestamp(1425016502006L);
+    entity3.addEvent(event4);
+    Set<TimelineMetric> metrics3 = new HashSet<TimelineMetric>();
+    TimelineMetric metric31 = new TimelineMetric();
+    metric31.setId("metric1");
+    metric31.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric31.addValue(1425016501006L, 124);
+    metrics3.add(metric31);
+    TimelineMetric metric32 = new TimelineMetric();
+    metric32.setId("metric2");
+    metric32.setType(TimelineMetric.Type.TIME_SERIES);
+    metric32.addValue(1425016501056L, 31);
+    metric32.addValue(1425016501084L, 74);
+    metrics3.add(metric32);
+    entity3.setMetrics(metrics3);
+    entity3.addIsRelatedToEntity("type1", "tid1_2");
+    writeEntityFile(entity3, appDir);
+
+    TimelineEntity entity4 = new TimelineEntity();
+    entity4.setId("id_4");
+    entity4.setType("app");
+    entity4.setCreatedTime(1425016502050L);
+    TimelineEvent event44 = new TimelineEvent();
+    event44.setId("event_4");
+    event44.setTimestamp(1425016502003L);
+    entity4.addEvent(event44);
+    writeEntityFile(entity4, appDir);
+
+    File appDir2 = new File(ROOT_DIR +
+            "/entities/cluster1/user1/flow1,flow/1/app2/app/");
+    TimelineEntity entity5 = new TimelineEntity();
+    entity5.setId("id_5");
+    entity5.setType("app");
+    entity5.setCreatedTime(1425016502050L);
+    writeEntityFile(entity5, appDir2);
+  }
+
+  public TimelineReader getTimelineReader() {
+    return reader;
+  }
+
+  @Test
+  public void testGetEntityDefaultView() throws Exception {
+    // If no fields are specified, entity is returned with default view i.e.
+    // only the id, type and created time.
+    TimelineEntity result = reader.getEntity(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", "id_1"),
+        new TimelineDataToRetrieve(null, null, null, null));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals((Long)1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(0, result.getConfigs().size());
+    Assert.assertEquals(0, result.getMetrics().size());
+  }
+
+  @Test
+  public void testGetEntityByClusterAndApp() throws Exception {
+    // Cluster and AppId should be enough to get an entity.
+    TimelineEntity result = reader.getEntity(
+        new TimelineReaderContext("cluster1", null, null, null, "app1", "app",
+        "id_1"),
+        new TimelineDataToRetrieve(null, null, null, null));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals((Long)1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(0, result.getConfigs().size());
+    Assert.assertEquals(0, result.getMetrics().size());
+  }
+
+  /** This test checks whether we can handle commas in app flow mapping csv. */
+  @Test
+  public void testAppFlowMappingCsv() throws Exception {
+    // Test getting an entity by cluster and app where flow entry
+    // in app flow mapping csv has commas.
+    TimelineEntity result = reader.getEntity(
+        new TimelineReaderContext("cluster1", null, null, null, "app2",
+        "app", "id_5"),
+        new TimelineDataToRetrieve(null, null, null, null));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_5")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals((Long)1425016502050L, result.getCreatedTime());
+  }
+
+  @Test
+  public void testGetEntityCustomFields() throws Exception {
+    // Specified fields in addition to default view will be returned.
+    TimelineEntity result = reader.getEntity(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", "id_1"),
+        new TimelineDataToRetrieve(null, null,
+        EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS), null));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals((Long)1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(3, result.getConfigs().size());
+    Assert.assertEquals(3, result.getMetrics().size());
+    Assert.assertEquals(2, result.getInfo().size());
+    // No events will be returned
+    Assert.assertEquals(0, result.getEvents().size());
+  }
+
+  @Test
+  public void testGetEntityAllFields() throws Exception {
+    // All fields of TimelineEntity will be returned.
+    TimelineEntity result = reader.getEntity(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", "id_1"),
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals((Long)1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(3, result.getConfigs().size());
+    Assert.assertEquals(3, result.getMetrics().size());
+    // All fields including events will be returned.
+    Assert.assertEquals(2, result.getEvents().size());
+  }
+
+  @Test
+  public void testGetAllEntities() throws Exception {
+    Set<TimelineEntity> result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null), new TimelineEntityFilters(),
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
+    // All 4 entities will be returned
+    Assert.assertEquals(4, result.size());
+  }
+
+  @Test
+  public void testGetEntitiesWithLimit() throws Exception {
+    Set<TimelineEntity> result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(2L, null, null, null, null, null, null,
+        null, null), new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    // Needs to be rewritten once hashcode and equals for
+    // TimelineEntity is implemented
+    // Entities with id_1 and id_4 should be returned,
+    // based on created time, descending.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) {
+        Assert.fail("Entity not sorted by created time");
+      }
+    }
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(3L, null, null, null, null, null, null,
+        null, null), new TimelineDataToRetrieve());
+    // Even though 2 entities out of 4 have same created time, one entity
+    // is left out due to limit
+    Assert.assertEquals(3, result.size());
+  }
+
+  @Test
+  public void testGetEntitiesByTimeWindows() throws Exception {
+    // Get entities based on created time start and end time range.
+    Set<TimelineEntity> result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, 1425016502030L, 1425016502060L, null,
+        null, null, null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    // Only one entity with ID id_4 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on created time range");
+      }
+    }
+
+    // Get entities if only created time end is specified.
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+            "app", null),
+            new TimelineEntityFilters(null, null, 1425016502010L, null, null,
+            null, null, null, null),
+            new TimelineDataToRetrieve());
+    Assert.assertEquals(3, result.size());
+    for (TimelineEntity entity : result) {
+      if (entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on created time range");
+      }
+    }
+
+    // Get entities if only created time start is specified.
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, 1425016502010L, null, null, null,
+        null, null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on created time range");
+      }
+    }
+  }
+
+  @Test
+  public void testGetFilteredEntities() throws Exception {
+    // Get entities based on info filters.
+    TimelineFilterList infoFilterList = new TimelineFilterList();
+    infoFilterList.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
+    Set<TimelineEntity> result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, infoFilterList,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    // Only one entity with ID id_3 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on info filters");
+      }
+    }
+
+    // Get entities based on config filters.
+    TimelineFilterList confFilterList = new TimelineFilterList();
+    confFilterList.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", 
"123"));
+    confFilterList.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", 
"abc"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on config filters");
+      }
+    }
+
+    // Get entities based on event filters.
+    TimelineFilterList eventFilters = new TimelineFilterList();
+    eventFilters.addFilter(
+        new TimelineExistsFilter(TimelineCompareOp.EQUAL, "event_2"));
+    eventFilters.addFilter(
+        new TimelineExistsFilter(TimelineCompareOp.EQUAL, "event_4"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        null, eventFilters),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on event filters");
+      }
+    }
+
+    // Get entities based on metric filters.
+    TimelineFilterList metricFilterList = new TimelineFilterList();
+    metricFilterList.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.GREATER_OR_EQUAL, "metric3", 0L));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    // Two entities with IDs' id_1 and id_2 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on metric filters");
+      }
+    }
+
+    // Get entities based on complex config filters.
+    TimelineFilterList list1 = new TimelineFilterList();
+    list1.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", 
"129"));
+    list1.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", 
"def"));
+    TimelineFilterList list2 = new TimelineFilterList();
+    list2.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23"));
+    list2.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", 
"abc"));
+    TimelineFilterList confFilterList1 =
+        new TimelineFilterList(Operator.OR, list1, list2);
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList1, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on config filters");
+      }
+    }
+
+    TimelineFilterList list3 = new TimelineFilterList();
+    list3.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.NOT_EQUAL, "config_1", "123"));
+    list3.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.NOT_EQUAL, "config_3", "abc"));
+    TimelineFilterList list4 = new TimelineFilterList();
+    list4.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23"));
+    TimelineFilterList confFilterList2 =
+        new TimelineFilterList(Operator.OR, list3, list4);
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList2, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on config filters");
+      }
+    }
+
+    TimelineFilterList confFilterList3 = new TimelineFilterList();
+    confFilterList3.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.NOT_EQUAL, "config_1", "127"));
+    confFilterList3.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.NOT_EQUAL, "config_3", "abc"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList3, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for(TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on config filters");
+      }
+    }
+
+    TimelineFilterList confFilterList4 = new TimelineFilterList();
+    confFilterList4.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.EQUAL, "config_dummy", "dummy"));
+    confFilterList4.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.EQUAL, "config_3", "def"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList4, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(0, result.size());
+
+    TimelineFilterList confFilterList5 = new TimelineFilterList(Operator.OR);
+    confFilterList5.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.EQUAL, "config_dummy", "dummy"));
+    confFilterList5.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.EQUAL, "config_3", "def"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList5, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on config filters");
+      }
+    }
+
+    // Get entities based on complex metric filters.
+    TimelineFilterList list6 = new TimelineFilterList();
+    list6.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.GREATER_THAN, "metric1", 200));
+    list6.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.EQUAL, "metric3", 23));
+    TimelineFilterList list7 = new TimelineFilterList();
+    list7.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.GREATER_OR_EQUAL, "metric2", 74));
+    TimelineFilterList metricFilterList1 =
+        new TimelineFilterList(Operator.OR, list6, list7);
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList1, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    // Two entities with IDs' id_2 and id_3 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_2") && !entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on metric filters");
+      }
+    }
+
+    TimelineFilterList metricFilterList2 = new TimelineFilterList();
+    metricFilterList2.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_THAN, "metric2", 70));
+    metricFilterList2.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList2, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1")) {
+        Assert.fail("Incorrect filtering based on metric filters");
+      }
+    }
+
+    TimelineFilterList metricFilterList3 = new TimelineFilterList();
+    metricFilterList3.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_THAN, "dummy_metric", 30));
+    metricFilterList3.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList3, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(0, result.size());
+
+    TimelineFilterList metricFilterList4 = new TimelineFilterList(Operator.OR);
+    metricFilterList4.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_THAN, "dummy_metric", 30));
+    metricFilterList4.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList4, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on metric filters");
+      }
+    }
+
+    TimelineFilterList metricFilterList5 =
+        new TimelineFilterList(new TimelineCompareFilter(
+            TimelineCompareOp.NOT_EQUAL, "metric2", 74));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList5, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on metric filters");
+      }
+    }
+
+    TimelineFilterList infoFilterList1 = new TimelineFilterList();
+    infoFilterList1.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
+    infoFilterList1.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.NOT_EQUAL, "info4", 20));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, 
infoFilterList1,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(0, result.size());
+
+    TimelineFilterList infoFilterList2 = new TimelineFilterList(Operator.OR);
+    infoFilterList2.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
+    infoFilterList2.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info1", "val1"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, 
infoFilterList2,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on info filters");
+      }
+    }
+
+    TimelineFilterList infoFilterList3 = new TimelineFilterList();
+    infoFilterList3.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1));
+    infoFilterList3.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, 
infoFilterList3,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(0, result.size());
+
+    TimelineFilterList infoFilterList4 = new TimelineFilterList(Operator.OR);
+    infoFilterList4.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1));
+    infoFilterList4.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, 
infoFilterList4,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1")) {
+        Assert.fail("Incorrect filtering based on info filters");
+      }
+    }
+  }
+
+  @Test
+  public void testGetEntitiesByRelations() throws Exception {
+    // Get entities based on relatesTo.
+    TimelineFilterList relatesTo = new TimelineFilterList(Operator.OR);
+    Set<Object> relatesToIds =
+        new HashSet<Object>(Arrays.asList((Object)"flow1"));
+    relatesTo.addFilter(new TimelineKeyValuesFilter(
+        TimelineCompareOp.EQUAL, "flow", relatesToIds));
+    Set<TimelineEntity> result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, relatesTo, null, null,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    // Only one entity with ID id_1 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1")) {
+        Assert.fail("Incorrect filtering based on relatesTo");
+      }
+    }
+
+    // Get entities based on isRelatedTo.
+    TimelineFilterList isRelatedTo = new TimelineFilterList(Operator.OR);
+    Set<Object> isRelatedToIds =
+        new HashSet<Object>(Arrays.asList((Object)"tid1_2"));
+    isRelatedTo.addFilter(new TimelineKeyValuesFilter(
+        TimelineCompareOp.EQUAL, "type1", isRelatedToIds));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, isRelatedTo, null,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    // Two entities with IDs' id_1 and id_3 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on isRelatedTo");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
new file mode 100644
index 0000000..15be494
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.Test;
+
+public class TestFileSystemTimelineWriterImpl {
+
+  /**
+   * Unit test for PoC YARN 3264.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testWriteEntityToFile() throws Exception {
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entity = new TimelineEntity();
+    String id = "hello";
+    String type = "world";
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(1425016501000L);
+    te.addEntity(entity);
+
+    TimelineMetric metric = new TimelineMetric();
+    String metricId = "CPU";
+    metric.setId(metricId);
+    metric.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+    metric.addValue(1425016501000L, 1234567L);
+
+    TimelineEntity entity2 = new TimelineEntity();
+    String id2 = "metric";
+    String type2 = "app";
+    entity2.setId(id2);
+    entity2.setType(type2);
+    entity2.setCreatedTime(1425016503000L);
+    entity2.addMetric(metric);
+    te.addEntity(entity2);
+
+    Map<String, TimelineMetric> aggregatedMetrics =
+        new HashMap<String, TimelineMetric>();
+    aggregatedMetrics.put(metricId, metric);
+
+    FileSystemTimelineWriterImpl fsi = null;
+    try {
+      fsi = new FileSystemTimelineWriterImpl();
+      fsi.init(new YarnConfiguration());
+      fsi.start();
+      fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 
12345678L,
+          "app_id", te);
+
+      String fileName = fsi.getOutputRoot() +
+          "/entities/cluster_id/user_id/flow_name/flow_version/12345678/" +
+          "app_id/" + type + "/" + id +
+          FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      Path path = Paths.get(fileName);
+      File f = new File(fileName);
+      assertTrue(f.exists() && !f.isDirectory());
+      List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
+      // ensure there's only one entity + 1 new line
+      assertTrue("data size is:" + data.size(), data.size() == 2);
+      String d = data.get(0);
+      // confirm the contents same as what was written
+      assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+
+      // verify aggregated metrics
+      String fileName2 = fsi.getOutputRoot() +
+          
"/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/"
+          + type2 + "/" + id2 +
+          FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      Path path2 = Paths.get(fileName2);
+      File file = new File(fileName2);
+      assertTrue(file.exists() && !file.isDirectory());
+      List<String> data2 = Files.readAllLines(path2, StandardCharsets.UTF_8);
+      // ensure there's only one entity + 1 new line
+      assertTrue("data size is:" + data.size(), data2.size() == 2);
+      String metricToString = data2.get(0);
+      // confirm the contents same as what was written
+      assertEquals(metricToString,
+          TimelineUtils.dumpTimelineRecordtoJSON(entity2));
+
+      // delete the directory
+      File outputDir = new File(fsi.getOutputRoot());
+      FileUtils.deleteDirectory(outputDir);
+      assertTrue(!(f.exists()));
+    } finally {
+      if (fsi != null) {
+        fsi.close();
+        FileUtils.deleteDirectory(new File(fsi.getOutputRoot()));
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
new file mode 100644
index 0000000..58df970
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Test;
+
+public class TestKeyConverters {
+
+  @Test
+  public void testAppIdKeyConverter() {
+    AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter();
+    long currentTs = System.currentTimeMillis();
+    ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1);
+    ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2);
+    ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1);
+    String appIdStr1 = appId1.toString();
+    String appIdStr2 = appId2.toString();
+    String appIdStr3 = appId3.toString();
+    byte[] appIdBytes1 = appIdKeyConverter.encode(appIdStr1);
+    byte[] appIdBytes2 = appIdKeyConverter.encode(appIdStr2);
+    byte[] appIdBytes3 = appIdKeyConverter.encode(appIdStr3);
+    // App ids' should be encoded in a manner wherein descending order
+    // is maintained.
+    assertTrue(
+        "Ordering of app ids' is incorrect",
+        Bytes.compareTo(appIdBytes1, appIdBytes2) > 0
+            && Bytes.compareTo(appIdBytes1, appIdBytes3) > 0
+            && Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
+    String decodedAppId1 = appIdKeyConverter.decode(appIdBytes1);
+    String decodedAppId2 = appIdKeyConverter.decode(appIdBytes2);
+    String decodedAppId3 = appIdKeyConverter.decode(appIdBytes3);
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr1.equals(decodedAppId1));
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr2.equals(decodedAppId2));
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr3.equals(decodedAppId3));
+  }
+
+  @Test
+  public void testEventColumnNameConverter() {
+    String eventId = "=foo_=eve=nt=";
+    byte[] valSepBytes = Bytes.toBytes(Separator.VALUES.getValue());
+    byte[] maxByteArr =
+        Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length);
+    byte[] ts = Bytes.add(valSepBytes, maxByteArr);
+    Long eventTs = Bytes.toLong(ts);
+    byte[] byteEventColName =
+        new EventColumnName(eventId, eventTs, null).getColumnQualifier();
+    KeyConverter<EventColumnName> eventColumnNameConverter =
+        new EventColumnNameConverter();
+    EventColumnName eventColName =
+        eventColumnNameConverter.decode(byteEventColName);
+    assertEquals(eventId, eventColName.getId());
+    assertEquals(eventTs, eventColName.getTimestamp());
+    assertNull(eventColName.getInfoKey());
+
+    String infoKey = "f=oo_event_in=fo=_key";
+    byteEventColName =
+        new EventColumnName(eventId, eventTs, infoKey).getColumnQualifier();
+    eventColName = eventColumnNameConverter.decode(byteEventColName);
+    assertEquals(eventId, eventColName.getId());
+    assertEquals(eventTs, eventColName.getTimestamp());
+    assertEquals(infoKey, eventColName.getInfoKey());
+  }
+
+  @Test
+  public void testLongKeyConverter() {
+    LongKeyConverter longKeyConverter = new LongKeyConverter();
+    confirmLongKeyConverter(longKeyConverter, Long.MIN_VALUE);
+    confirmLongKeyConverter(longKeyConverter, -1234567890L);
+    confirmLongKeyConverter(longKeyConverter, -128L);
+    confirmLongKeyConverter(longKeyConverter, -127L);
+    confirmLongKeyConverter(longKeyConverter, -1L);
+    confirmLongKeyConverter(longKeyConverter, 0L);
+    confirmLongKeyConverter(longKeyConverter, 1L);
+    confirmLongKeyConverter(longKeyConverter, 127L);
+    confirmLongKeyConverter(longKeyConverter, 128L);
+    confirmLongKeyConverter(longKeyConverter, 1234567890L);
+    confirmLongKeyConverter(longKeyConverter, Long.MAX_VALUE);
+  }
+
+  private void confirmLongKeyConverter(LongKeyConverter longKeyConverter,
+      Long testValue) {
+    Long decoded = longKeyConverter.decode(longKeyConverter.encode(testValue));
+    assertEquals(testValue, decoded);
+  }
+
+  @Test
+  public void testStringKeyConverter() {
+    StringKeyConverter stringKeyConverter = new StringKeyConverter();
+    String phrase = "QuackAttack now!";
+
+    for (int i = 0; i < phrase.length(); i++) {
+      String sub = phrase.substring(i, phrase.length());
+      confirmStrignKeyConverter(stringKeyConverter, sub);
+      confirmStrignKeyConverter(stringKeyConverter, sub + sub);
+    }
+  }
+
+  private void confirmStrignKeyConverter(StringKeyConverter stringKeyConverter,
+      String testValue) {
+    String decoded =
+        stringKeyConverter.decode(stringKeyConverter.encode(testValue));
+    assertEquals(testValue, decoded);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
new file mode 100644
index 0000000..368b060
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.junit.Test;
+
+
+public class TestRowKeys {
+
+  private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue();
+  private final static byte[] QUALIFIER_SEP_BYTES = Bytes
+      .toBytes(QUALIFIER_SEP);
+  private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster";
+  private final static String USER = QUALIFIER_SEP + "user";
+  private final static String FLOW_NAME = "dummy_" + QUALIFIER_SEP + "flow"
+      + QUALIFIER_SEP;
+  private final static Long FLOW_RUN_ID;
+  private final static String APPLICATION_ID;
+  static {
+    long runid = Long.MAX_VALUE - 900L;
+    byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE);
+    byte[] byteArr = Bytes.toBytes(runid);
+    int sepByteLen = QUALIFIER_SEP_BYTES.length;
+    if (sepByteLen <= byteArr.length) {
+      for (int i = 0; i < sepByteLen; i++) {
+        byteArr[i] = (byte) (longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]);
+      }
+    }
+    FLOW_RUN_ID = Bytes.toLong(byteArr);
+    long clusterTs = System.currentTimeMillis();
+    byteArr = Bytes.toBytes(clusterTs);
+    if (sepByteLen <= byteArr.length) {
+      for (int i = 0; i < sepByteLen; i++) {
+        byteArr[byteArr.length - sepByteLen + i] =
+            (byte) (longMaxByteArr[byteArr.length - sepByteLen + i] -
+                QUALIFIER_SEP_BYTES[i]);
+      }
+    }
+    clusterTs = Bytes.toLong(byteArr);
+    int seqId = 222;
+    APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString();
+  }
+
+  private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) {
+    int sepLen = QUALIFIER_SEP_BYTES.length;
+    for (int i = 0; i < sepLen; i++) {
+      assertTrue(
+          "Row key prefix not encoded properly.",
+          byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] ==
+              QUALIFIER_SEP_BYTES[i]);
+    }
+  }
+
+  @Test
+  public void testApplicationRowKey() {
+    byte[] byteRowKey =
+        new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
+            APPLICATION_ID).getRowKey();
+    ApplicationRowKey rowKey = ApplicationRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
+    assertEquals(APPLICATION_ID, rowKey.getAppId());
+
+    byte[] byteRowKeyPrefix =
+        new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID)
+            .getRowKeyPrefix();
+    byte[][] splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix,
+            new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+                Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+                Separator.VARIABLE_SIZE});
+    assertEquals(5, splits.length);
+    assertEquals(0, splits[4].length);
+    assertEquals(FLOW_NAME,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
+    assertEquals(FLOW_RUN_ID,
+        (Long) LongConverter.invertLong(Bytes.toLong(splits[3])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+
+    byteRowKeyPrefix =
+        new ApplicationRowKeyPrefix(CLUSTER, USER, 
FLOW_NAME).getRowKeyPrefix();
+    splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
+    assertEquals(4, splits.length);
+    assertEquals(0, splits[3].length);
+    assertEquals(FLOW_NAME,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+
+  /**
+   * Tests the converters indirectly through the public methods of the
+   * corresponding rowkey.
+   */
+  @Test
+  public void testAppToFlowRowKey() {
+    byte[] byteRowKey = new AppToFlowRowKey(CLUSTER,
+        APPLICATION_ID).getRowKey();
+    AppToFlowRowKey rowKey = AppToFlowRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(APPLICATION_ID, rowKey.getAppId());
+  }
+
+  @Test
+  public void testEntityRowKey() {
+    String entityId = "!ent!ity!!id!";
+    String entityType = "entity!Type";
+    byte[] byteRowKey =
+        new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
+            entityType, entityId).getRowKey();
+    EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
+    assertEquals(APPLICATION_ID, rowKey.getAppId());
+    assertEquals(entityType, rowKey.getEntityType());
+    assertEquals(entityId, rowKey.getEntityId());
+
+    byte[] byteRowKeyPrefix =
+        new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
+            APPLICATION_ID, entityType).getRowKeyPrefix();
+    byte[][] splits =
+        Separator.QUALIFIERS.split(
+            byteRowKeyPrefix,
+            new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+                Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+                AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
+                Separator.VARIABLE_SIZE});
+    assertEquals(7, splits.length);
+    assertEquals(0, splits[6].length);
+    assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4]));
+    assertEquals(entityType,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+
+    byteRowKeyPrefix =
+        new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
+            APPLICATION_ID).getRowKeyPrefix();
+    splits =
+        Separator.QUALIFIERS.split(
+            byteRowKeyPrefix,
+            new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+                Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+                AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE});
+    assertEquals(6, splits.length);
+    assertEquals(0, splits[5].length);
+    AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter();
+    assertEquals(APPLICATION_ID, appIdKeyConverter.decode(splits[4]));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+
+  @Test
+  public void testFlowActivityRowKey() {
+    Long ts = 1459900830000L;
+    Long dayTimestamp = TimelineStorageUtils.getTopOfTheDayTimestamp(ts);
+    byte[] byteRowKey =
+        new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey();
+    FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(dayTimestamp, rowKey.getDayTimestamp());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+
+    byte[] byteRowKeyPrefix =
+        new FlowActivityRowKeyPrefix(CLUSTER).getRowKeyPrefix();
+    byte[][] splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
+    assertEquals(2, splits.length);
+    assertEquals(0, splits[1].length);
+    assertEquals(CLUSTER,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+
+    byteRowKeyPrefix =
+        new FlowActivityRowKeyPrefix(CLUSTER, ts).getRowKeyPrefix();
+    splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix,
+            new int[] {Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+                Separator.VARIABLE_SIZE});
+    assertEquals(3, splits.length);
+    assertEquals(0, splits[2].length);
+    assertEquals(CLUSTER,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
+    assertEquals(ts,
+        (Long) LongConverter.invertLong(Bytes.toLong(splits[1])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+
+  @Test
+  public void testFlowRunRowKey() {
+    byte[] byteRowKey =
+        new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID).getRowKey();
+    FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
+
+    byte[] byteRowKeyPrefix =
+        new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null).getRowKey();
+    byte[][] splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
+    assertEquals(4, splits.length);
+    assertEquals(0, splits[3].length);
+    assertEquals(FLOW_NAME,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
new file mode 100644
index 0000000..7d37206
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class TestSeparator {
+
+  private static String villain = "Dr. Heinz Doofenshmirtz";
+  private static String special =
+      ".   *   |   ?   +   \t   (   )   [   ]   {   }   ^   $  \\ \"  %";
+
+  /**
+   *
+   */
+  @Test
+  public void testEncodeDecodeString() {
+
+    for (Separator separator : Separator.values()) {
+      testEncodeDecode(separator, "");
+      testEncodeDecode(separator, " ");
+      testEncodeDecode(separator, "!");
+      testEncodeDecode(separator, "?");
+      testEncodeDecode(separator, "&");
+      testEncodeDecode(separator, "+");
+      testEncodeDecode(separator, "\t");
+      testEncodeDecode(separator, "Dr.");
+      testEncodeDecode(separator, "Heinz");
+      testEncodeDecode(separator, "Doofenshmirtz");
+      testEncodeDecode(separator, villain);
+      testEncodeDecode(separator, special);
+
+      assertNull(separator.encode(null));
+
+    }
+  }
+
+  private void testEncodeDecode(Separator separator, String token) {
+    String encoded = separator.encode(token);
+    String decoded = separator.decode(encoded);
+    String msg = "token:" + token + " separator:" + separator + ".";
+    assertEquals(msg, token, decoded);
+  }
+
+  @Test
+  public void testEncodeDecode() {
+    testEncodeDecode("Dr.", Separator.QUALIFIERS);
+    testEncodeDecode("Heinz", Separator.QUALIFIERS, Separator.QUALIFIERS);
+    testEncodeDecode("Doofenshmirtz", Separator.QUALIFIERS, null,
+        Separator.QUALIFIERS);
+    testEncodeDecode("&Perry", Separator.QUALIFIERS, Separator.VALUES, null);
+    testEncodeDecode("the ", Separator.QUALIFIERS, Separator.SPACE);
+    testEncodeDecode("Platypus...", (Separator) null);
+    testEncodeDecode("The what now ?!?", Separator.QUALIFIERS,
+        Separator.VALUES, Separator.SPACE);
+
+  }
+  @Test
+  public void testEncodedValues() {
+    testEncodeDecode("Double-escape %2$ and %9$ or %%2$ or %%3$, nor  %%%2$" +
+        "= no problem!",
+        Separator.QUALIFIERS, Separator.VALUES, Separator.SPACE, 
Separator.TAB);
+  }
+
+  @Test
+  public void testSplits() {
+    byte[] maxLongBytes = Bytes.toBytes(Long.MAX_VALUE);
+    byte[] maxIntBytes = Bytes.toBytes(Integer.MAX_VALUE);
+    for (Separator separator : Separator.values()) {
+      String str1 = "cl" + separator.getValue() + "us";
+      String str2 = separator.getValue() + "rst";
+      byte[] sepByteArr = Bytes.toBytes(separator.getValue());
+      byte[] longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes,
+          sepByteArr.length, Bytes.SIZEOF_LONG - sepByteArr.length));
+      byte[] intVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxIntBytes,
+          sepByteArr.length, Bytes.SIZEOF_INT - sepByteArr.length));
+      byte[] arr = separator.join(
+          Bytes.toBytes(separator.encode(str1)), longVal1Arr,
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
+      int[] sizes = {Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+          Separator.VARIABLE_SIZE, Bytes.SIZEOF_INT};
+      byte[][] splits = separator.split(arr, sizes);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
+
+      longVal1Arr = Bytes.add(Bytes.copy(maxLongBytes, 0, Bytes.SIZEOF_LONG -
+          sepByteArr.length), sepByteArr);
+      intVal1Arr = Bytes.add(Bytes.copy(maxIntBytes, 0, Bytes.SIZEOF_INT -
+          sepByteArr.length), sepByteArr);
+      arr = separator.join(Bytes.toBytes(separator.encode(str1)), longVal1Arr,
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
+      splits = separator.split(arr, sizes);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
+
+      longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes,
+          sepByteArr.length, 4 - sepByteArr.length), sepByteArr);
+      longVal1Arr = Bytes.add(longVal1Arr, Bytes.copy(maxLongBytes, 4, 3 -
+              sepByteArr.length), sepByteArr);
+      arr = separator.join(Bytes.toBytes(separator.encode(str1)), longVal1Arr,
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
+      splits = separator.split(arr, sizes);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
+
+      arr = separator.join(Bytes.toBytes(separator.encode(str1)),
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr, longVal1Arr);
+      int[] sizes1 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+          Bytes.SIZEOF_INT, Bytes.SIZEOF_LONG};
+      splits = separator.split(arr, sizes1);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[1])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[2]));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[3]));
+
+      try {
+        int[] sizes2 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+            Bytes.SIZEOF_INT, 7};
+        splits = separator.split(arr, sizes2);
+        fail("Exception should have been thrown.");
+      } catch (IllegalArgumentException e) {}
+
+      try {
+        int[] sizes2 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, 2,
+            Bytes.SIZEOF_LONG};
+        splits = separator.split(arr, sizes2);
+        fail("Exception should have been thrown.");
+      } catch (IllegalArgumentException e) {}
+    }
+  }
+
+  /**
+   * Simple test to encode and decode using the same separators and confirm 
that
+   * we end up with the same as what we started with.
+   *
+   * @param token
+   * @param separators
+   */
+  private static void testEncodeDecode(String token, Separator... separators) {
+    byte[] encoded = Separator.encode(token, separators);
+    String decoded = Separator.decode(encoded, separators);
+    assertEquals(token, decoded);
+  }
+
+  @Test
+  public void testJoinStripped() {
+    List<String> stringList = new ArrayList<String>(0);
+    stringList.add("nothing");
+
+    String joined = Separator.VALUES.joinEncoded(stringList);
+    Iterable<String> split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(stringList, split));
+
+    stringList = new ArrayList<String>(3);
+    stringList.add("a");
+    stringList.add("b?");
+    stringList.add("c");
+
+    joined = Separator.VALUES.joinEncoded(stringList);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(stringList, split));
+
+    String[] stringArray1 = {"else"};
+    joined = Separator.VALUES.joinEncoded(stringArray1);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray1), split));
+
+    String[] stringArray2 = {"d", "e?", "f"};
+    joined = Separator.VALUES.joinEncoded(stringArray2);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray2), split));
+
+    List<String> empty = new ArrayList<String>(0);
+    split = Separator.VALUES.splitEncoded(null);
+    assertTrue(Iterables.elementsEqual(empty, split));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/resources/log4j.properties
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/resources/log4j.properties
new file mode 100644
index 0000000..81a3f6a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} 
(%F:%M(%L)) - %m%n

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
index 80544bd..1b6b1d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
@@ -43,5 +43,7 @@
     <module>hadoop-yarn-server-tests</module>
     <module>hadoop-yarn-server-applicationhistoryservice</module>
     <module>hadoop-yarn-server-timeline-pluginstorage</module>
+    <module>hadoop-yarn-server-timelineservice</module>
+    <module>hadoop-yarn-server-timelineservice-hbase-tests</module>
   </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServer.md
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServer.md
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServer.md
index f20bd2c..f09909b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServer.md
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServer.md
@@ -88,7 +88,7 @@ Current status
 Future Plans
 
 1. Future releases will introduce a next generation timeline service
-which is scalable and reliable, "Timeline Server v2".
+which is scalable and reliable, ["Timeline Service 
v2"](./TimelineServiceV2.html).
 1. The expanded features of this service *may not* be available to
 applications using the Timeline Server v1 REST API. That includes extended
 data structures as well as the ability of the client to failover between 
Timeline Server instances.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to