diff --git 
new file mode 100644
index 0000000..b58bbe3
--- /dev/null
@@ -0,0 +1,804 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+public class TestFileSystemTimelineReaderImpl {
+  private static final String ROOT_DIR =
+  private FileSystemTimelineReaderImpl reader;
+  @BeforeClass
+  public static void setup() throws Exception {
+    loadEntityData();
+    // Create app flow mapping file.
+    CSVFormat format =
+        CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
+    String appFlowMappingFile = ROOT_DIR + "/entities/cluster1/" +
+        FileSystemTimelineReaderImpl.APP_FLOW_MAPPING_FILE;
+    try (PrintWriter out =
+        new PrintWriter(new BufferedWriter(
+            new FileWriter(appFlowMappingFile, true)));
+        CSVPrinter printer = new CSVPrinter(out, format)){
+      printer.printRecord("app1", "user1", "flow1", 1);
+      printer.printRecord("app2", "user1", "flow1,flow", 1);
+      printer.close();
+    }
+    (new File(ROOT_DIR)).deleteOnExit();
+  }
+  @AfterClass
+  public static void tearDown() throws Exception {
+    FileUtils.deleteDirectory(new File(ROOT_DIR));
+  }
+  @Before
+  public void init() throws Exception {
+    reader = new FileSystemTimelineReaderImpl();
+    Configuration conf = new YarnConfiguration();
+    conf.set(FileSystemTimelineReaderImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+        ROOT_DIR);
+    reader.init(conf);
+  }
+  private static void writeEntityFile(TimelineEntity entity, File dir)
+      throws Exception {
+    if (!dir.exists()) {
+      if (!dir.mkdirs()) {
+        throw new IOException("Could not create directories for " + dir);
+      }
+    }
+    String fileName = dir.getAbsolutePath() + "/" + entity.getId() + ".thist";
+    try (PrintWriter out =
+        new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)))){
+      out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      out.write("\n");
+      out.close();
+    }
+  }
+  private static void loadEntityData() throws Exception {
+    File appDir = new File(ROOT_DIR +
+        "/entities/cluster1/user1/flow1/1/app1/app/");
+    TimelineEntity entity11 = new TimelineEntity();
+    entity11.setId("id_1");
+    entity11.setType("app");
+    entity11.setCreatedTime(1425016502000L);
+    Map<String, Object> info1 = new HashMap<String, Object>();
+    info1.put("info1", "val1");
+    info1.put("info2", "val5");
+    entity11.addInfo(info1);
+    TimelineEvent event = new TimelineEvent();
+    event.setId("event_1");
+    event.setTimestamp(1425016502003L);
+    entity11.addEvent(event);
+    Set<TimelineMetric> metrics = new HashSet<TimelineMetric>();
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setId("metric1");
+    metric1.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric1.addValue(1425016502006L, 113);
+    metrics.add(metric1);
+    TimelineMetric metric2 = new TimelineMetric();
+    metric2.setId("metric2");
+    metric2.setType(TimelineMetric.Type.TIME_SERIES);
+    metric2.addValue(1425016502016L, 34);
+    metrics.add(metric2);
+    entity11.setMetrics(metrics);
+    Map<String, String> configs = new HashMap<String, String>();
+    configs.put("config_1", "127");
+    entity11.setConfigs(configs);
+    entity11.addRelatesToEntity("flow", "flow1");
+    entity11.addIsRelatedToEntity("type1", "tid1_1");
+    writeEntityFile(entity11, appDir);
+    TimelineEntity entity12 = new TimelineEntity();
+    entity12.setId("id_1");
+    entity12.setType("app");
+    configs.clear();
+    configs.put("config_2", "23");
+    configs.put("config_3", "abc");
+    entity12.addConfigs(configs);
+    metrics.clear();
+    TimelineMetric metric12 = new TimelineMetric();
+    metric12.setId("metric2");
+    metric12.setType(TimelineMetric.Type.TIME_SERIES);
+    metric12.addValue(1425016502032L, 48);
+    metric12.addValue(1425016502054L, 51);
+    metrics.add(metric12);
+    TimelineMetric metric3 = new TimelineMetric();
+    metric3.setId("metric3");
+    metric3.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric3.addValue(1425016502060L, 23L);
+    metrics.add(metric3);
+    entity12.setMetrics(metrics);
+    entity12.addIsRelatedToEntity("type1", "tid1_2");
+    entity12.addIsRelatedToEntity("type2", "tid2_1`");
+    TimelineEvent event15 = new TimelineEvent();
+    event15.setId("event_5");
+    event15.setTimestamp(1425016502017L);
+    entity12.addEvent(event15);
+    writeEntityFile(entity12, appDir);
+    TimelineEntity entity2 = new TimelineEntity();
+    entity2.setId("id_2");
+    entity2.setType("app");
+    entity2.setCreatedTime(1425016501050L);
+    Map<String, Object> info2 = new HashMap<String, Object>();
+    info1.put("info2", 4);
+    entity2.addInfo(info2);
+    Map<String, String> configs2 = new HashMap<String, String>();
+    configs2.put("config_1", "129");
+    configs2.put("config_3", "def");
+    entity2.setConfigs(configs2);
+    TimelineEvent event2 = new TimelineEvent();
+    event2.setId("event_2");
+    event2.setTimestamp(1425016501003L);
+    entity2.addEvent(event2);
+    Set<TimelineMetric> metrics2 = new HashSet<TimelineMetric>();
+    TimelineMetric metric21 = new TimelineMetric();
+    metric21.setId("metric1");
+    metric21.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric21.addValue(1425016501006L, 300);
+    metrics2.add(metric21);
+    TimelineMetric metric22 = new TimelineMetric();
+    metric22.setId("metric2");
+    metric22.setType(TimelineMetric.Type.TIME_SERIES);
+    metric22.addValue(1425016501056L, 31);
+    metric22.addValue(1425016501084L, 70);
+    metrics2.add(metric22);
+    TimelineMetric metric23 = new TimelineMetric();
+    metric23.setId("metric3");
+    metric23.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric23.addValue(1425016502060L, 23L);
+    metrics2.add(metric23);
+    entity2.setMetrics(metrics2);
+    entity2.addRelatesToEntity("flow", "flow2");
+    writeEntityFile(entity2, appDir);
+    TimelineEntity entity3 = new TimelineEntity();
+    entity3.setId("id_3");
+    entity3.setType("app");
+    entity3.setCreatedTime(1425016501050L);
+    Map<String, Object> info3 = new HashMap<String, Object>();
+    info3.put("info2", 3.5);
+    info3.put("info4", 20);
+    entity3.addInfo(info3);
+    Map<String, String> configs3 = new HashMap<String, String>();
+    configs3.put("config_1", "123");
+    configs3.put("config_3", "abc");
+    entity3.setConfigs(configs3);
+    TimelineEvent event3 = new TimelineEvent();
+    event3.setId("event_2");
+    event3.setTimestamp(1425016501003L);
+    entity3.addEvent(event3);
+    TimelineEvent event4 = new TimelineEvent();
+    event4.setId("event_4");
+    event4.setTimestamp(1425016502006L);
+    entity3.addEvent(event4);
+    Set<TimelineMetric> metrics3 = new HashSet<TimelineMetric>();
+    TimelineMetric metric31 = new TimelineMetric();
+    metric31.setId("metric1");
+    metric31.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric31.addValue(1425016501006L, 124);
+    metrics3.add(metric31);
+    TimelineMetric metric32 = new TimelineMetric();
+    metric32.setId("metric2");
+    metric32.setType(TimelineMetric.Type.TIME_SERIES);
+    metric32.addValue(1425016501056L, 31);
+    metric32.addValue(1425016501084L, 74);
+    metrics3.add(metric32);
+    entity3.setMetrics(metrics3);
+    entity3.addIsRelatedToEntity("type1", "tid1_2");
+    writeEntityFile(entity3, appDir);
+    TimelineEntity entity4 = new TimelineEntity();
+    entity4.setId("id_4");
+    entity4.setType("app");
+    entity4.setCreatedTime(1425016502050L);
+    TimelineEvent event44 = new TimelineEvent();
+    event44.setId("event_4");
+    event44.setTimestamp(1425016502003L);
+    entity4.addEvent(event44);
+    writeEntityFile(entity4, appDir);
+    File appDir2 = new File(ROOT_DIR +
+            "/entities/cluster1/user1/flow1,flow/1/app2/app/");
+    TimelineEntity entity5 = new TimelineEntity();
+    entity5.setId("id_5");
+    entity5.setType("app");
+    entity5.setCreatedTime(1425016502050L);
+    writeEntityFile(entity5, appDir2);
+  }
+  public TimelineReader getTimelineReader() {
+    return reader;
+  }
+  @Test
+  public void testGetEntityDefaultView() throws Exception {
+    // If no fields are specified, entity is returned with default view i.e.
+    // only the id, type and created time.
+    TimelineEntity result = reader.getEntity(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", "id_1"),
+        new TimelineDataToRetrieve(null, null, null, null));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals((Long)1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(0, result.getConfigs().size());
+    Assert.assertEquals(0, result.getMetrics().size());
+  }
+  @Test
+  public void testGetEntityByClusterAndApp() throws Exception {
+    // Cluster and AppId should be enough to get an entity.
+    TimelineEntity result = reader.getEntity(
+        new TimelineReaderContext("cluster1", null, null, null, "app1", "app",
+        "id_1"),
+        new TimelineDataToRetrieve(null, null, null, null));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals((Long)1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(0, result.getConfigs().size());
+    Assert.assertEquals(0, result.getMetrics().size());
+  }
+  /** This test checks whether we can handle commas in app flow mapping csv. */
+  @Test
+  public void testAppFlowMappingCsv() throws Exception {
+    // Test getting an entity by cluster and app where flow entry
+    // in app flow mapping csv has commas.
+    TimelineEntity result = reader.getEntity(
+        new TimelineReaderContext("cluster1", null, null, null, "app2",
+        "app", "id_5"),
+        new TimelineDataToRetrieve(null, null, null, null));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_5")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals((Long)1425016502050L, result.getCreatedTime());
+  }
+  @Test
+  public void testGetEntityCustomFields() throws Exception {
+    // Specified fields in addition to default view will be returned.
+    TimelineEntity result = reader.getEntity(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", "id_1"),
+        new TimelineDataToRetrieve(null, null,
+        EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS), null));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals((Long)1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(3, result.getConfigs().size());
+    Assert.assertEquals(3, result.getMetrics().size());
+    Assert.assertEquals(2, result.getInfo().size());
+    // No events will be returned
+    Assert.assertEquals(0, result.getEvents().size());
+  }
+  @Test
+  public void testGetEntityAllFields() throws Exception {
+    // All fields of TimelineEntity will be returned.
+    TimelineEntity result = reader.getEntity(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", "id_1"),
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals((Long)1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(3, result.getConfigs().size());
+    Assert.assertEquals(3, result.getMetrics().size());
+    // All fields including events will be returned.
+    Assert.assertEquals(2, result.getEvents().size());
+  }
+  @Test
+  public void testGetAllEntities() throws Exception {
+    Set<TimelineEntity> result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null), new TimelineEntityFilters(),
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
+    // All 4 entities will be returned
+    Assert.assertEquals(4, result.size());
+  }
+  @Test
+  public void testGetEntitiesWithLimit() throws Exception {
+    Set<TimelineEntity> result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(2L, null, null, null, null, null, null,
+        null, null), new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    // Needs to be rewritten once hashcode and equals for
+    // TimelineEntity is implemented
+    // Entities with id_1 and id_4 should be returned,
+    // based on created time, descending.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) {
+"Entity not sorted by created time");
+      }
+    }
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(3L, null, null, null, null, null, null,
+        null, null), new TimelineDataToRetrieve());
+    // Even though 2 entities out of 4 have same created time, one entity
+    // is left out due to limit
+    Assert.assertEquals(3, result.size());
+  }
+  @Test
+  public void testGetEntitiesByTimeWindows() throws Exception {
+    // Get entities based on created time start and end time range.
+    Set<TimelineEntity> result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, 1425016502030L, 1425016502060L, null,
+        null, null, null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    // Only one entity with ID id_4 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_4")) {
+"Incorrect filtering based on created time range");
+      }
+    }
+    // Get entities if only created time end is specified.
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+            "app", null),
+            new TimelineEntityFilters(null, null, 1425016502010L, null, null,
+            null, null, null, null),
+            new TimelineDataToRetrieve());
+    Assert.assertEquals(3, result.size());
+    for (TimelineEntity entity : result) {
+      if (entity.getId().equals("id_4")) {
+"Incorrect filtering based on created time range");
+      }
+    }
+    // Get entities if only created time start is specified.
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, 1425016502010L, null, null, null,
+        null, null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_4")) {
+"Incorrect filtering based on created time range");
+      }
+    }
+  }
+  @Test
+  public void testGetFilteredEntities() throws Exception {
+    // Get entities based on info filters.
+    TimelineFilterList infoFilterList = new TimelineFilterList();
+    infoFilterList.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
+    Set<TimelineEntity> result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, infoFilterList,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    // Only one entity with ID id_3 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_3")) {
+"Incorrect filtering based on info filters");
+      }
+    }
+    // Get entities based on config filters.
+    TimelineFilterList confFilterList = new TimelineFilterList();
+    confFilterList.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", 
+    confFilterList.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", 
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_3")) {
+"Incorrect filtering based on config filters");
+      }
+    }
+    // Get entities based on event filters.
+    TimelineFilterList eventFilters = new TimelineFilterList();
+    eventFilters.addFilter(
+        new TimelineExistsFilter(TimelineCompareOp.EQUAL, "event_2"));
+    eventFilters.addFilter(
+        new TimelineExistsFilter(TimelineCompareOp.EQUAL, "event_4"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        null, eventFilters),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_3")) {
+"Incorrect filtering based on event filters");
+      }
+    }
+    // Get entities based on metric filters.
+    TimelineFilterList metricFilterList = new TimelineFilterList();
+    metricFilterList.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.GREATER_OR_EQUAL, "metric3", 0L));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    // Two entities with IDs' id_1 and id_2 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+"Incorrect filtering based on metric filters");
+      }
+    }
+    // Get entities based on complex config filters.
+    TimelineFilterList list1 = new TimelineFilterList();
+    list1.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", 
+    list1.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", 
+    TimelineFilterList list2 = new TimelineFilterList();
+    list2.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23"));
+    list2.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", 
+    TimelineFilterList confFilterList1 =
+        new TimelineFilterList(Operator.OR, list1, list2);
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList1, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+"Incorrect filtering based on config filters");
+      }
+    }
+    TimelineFilterList list3 = new TimelineFilterList();
+    list3.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.NOT_EQUAL, "config_1", "123"));
+    list3.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.NOT_EQUAL, "config_3", "abc"));
+    TimelineFilterList list4 = new TimelineFilterList();
+    list4.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23"));
+    TimelineFilterList confFilterList2 =
+        new TimelineFilterList(Operator.OR, list3, list4);
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList2, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+"Incorrect filtering based on config filters");
+      }
+    }
+    TimelineFilterList confFilterList3 = new TimelineFilterList();
+    confFilterList3.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.NOT_EQUAL, "config_1", "127"));
+    confFilterList3.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.NOT_EQUAL, "config_3", "abc"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList3, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for(TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_2")) {
+"Incorrect filtering based on config filters");
+      }
+    }
+    TimelineFilterList confFilterList4 = new TimelineFilterList();
+    confFilterList4.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.EQUAL, "config_dummy", "dummy"));
+    confFilterList4.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.EQUAL, "config_3", "def"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList4, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(0, result.size());
+    TimelineFilterList confFilterList5 = new TimelineFilterList(Operator.OR);
+    confFilterList5.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.EQUAL, "config_dummy", "dummy"));
+    confFilterList5.addFilter(new TimelineKeyValueFilter(
+        TimelineCompareOp.EQUAL, "config_3", "def"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null,
+        confFilterList5, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_2")) {
+"Incorrect filtering based on config filters");
+      }
+    }
+    // Get entities based on complex metric filters.
+    TimelineFilterList list6 = new TimelineFilterList();
+    list6.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.GREATER_THAN, "metric1", 200));
+    list6.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.EQUAL, "metric3", 23));
+    TimelineFilterList list7 = new TimelineFilterList();
+    list7.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.GREATER_OR_EQUAL, "metric2", 74));
+    TimelineFilterList metricFilterList1 =
+        new TimelineFilterList(Operator.OR, list6, list7);
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList1, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    // Two entities with IDs' id_2 and id_3 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_2") && !entity.getId().equals("id_3")) {
+"Incorrect filtering based on metric filters");
+      }
+    }
+    TimelineFilterList metricFilterList2 = new TimelineFilterList();
+    metricFilterList2.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_THAN, "metric2", 70));
+    metricFilterList2.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList2, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1")) {
+"Incorrect filtering based on metric filters");
+      }
+    }
+    TimelineFilterList metricFilterList3 = new TimelineFilterList();
+    metricFilterList3.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_THAN, "dummy_metric", 30));
+    metricFilterList3.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList3, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(0, result.size());
+    TimelineFilterList metricFilterList4 = new TimelineFilterList(Operator.OR);
+    metricFilterList4.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_THAN, "dummy_metric", 30));
+    metricFilterList4.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList4, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+"Incorrect filtering based on metric filters");
+      }
+    }
+    TimelineFilterList metricFilterList5 =
+        new TimelineFilterList(new TimelineCompareFilter(
+            TimelineCompareOp.NOT_EQUAL, "metric2", 74));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList5, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+"Incorrect filtering based on metric filters");
+      }
+    }
+    TimelineFilterList infoFilterList1 = new TimelineFilterList();
+    infoFilterList1.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
+    infoFilterList1.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.NOT_EQUAL, "info4", 20));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, 
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(0, result.size());
+    TimelineFilterList infoFilterList2 = new TimelineFilterList(Operator.OR);
+    infoFilterList2.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
+    infoFilterList2.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info1", "val1"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, 
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+"Incorrect filtering based on info filters");
+      }
+    }
+    TimelineFilterList infoFilterList3 = new TimelineFilterList();
+    infoFilterList3.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1));
+    infoFilterList3.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, 
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(0, result.size());
+    TimelineFilterList infoFilterList4 = new TimelineFilterList(Operator.OR);
+    infoFilterList4.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1));
+    infoFilterList4.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5"));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, null, 
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1")) {
+"Incorrect filtering based on info filters");
+      }
+    }
+  }
+  @Test
+  public void testGetEntitiesByRelations() throws Exception {
+    // Get entities based on relatesTo.
+    TimelineFilterList relatesTo = new TimelineFilterList(Operator.OR);
+    Set<Object> relatesToIds =
+        new HashSet<Object>(Arrays.asList((Object)"flow1"));
+    relatesTo.addFilter(new TimelineKeyValuesFilter(
+        TimelineCompareOp.EQUAL, "flow", relatesToIds));
+    Set<TimelineEntity> result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, relatesTo, null, null,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(1, result.size());
+    // Only one entity with ID id_1 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1")) {
+"Incorrect filtering based on relatesTo");
+      }
+    }
+    // Get entities based on isRelatedTo.
+    TimelineFilterList isRelatedTo = new TimelineFilterList(Operator.OR);
+    Set<Object> isRelatedToIds =
+        new HashSet<Object>(Arrays.asList((Object)"tid1_2"));
+    isRelatedTo.addFilter(new TimelineKeyValuesFilter(
+        TimelineCompareOp.EQUAL, "type1", isRelatedToIds));
+    result = reader.getEntities(
+        new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+        "app", null),
+        new TimelineEntityFilters(null, null, null, null, isRelatedTo, null,
+        null, null, null),
+        new TimelineDataToRetrieve());
+    Assert.assertEquals(2, result.size());
+    // Two entities with IDs' id_1 and id_3 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+"Incorrect filtering based on isRelatedTo");
+      }
+    }
+  }
diff --git 
new file mode 100644
index 0000000..15be494
--- /dev/null
@@ -0,0 +1,129 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.Test;
+public class TestFileSystemTimelineWriterImpl {
+  /**
+   * Unit test for PoC YARN 3264.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testWriteEntityToFile() throws Exception {
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entity = new TimelineEntity();
+    String id = "hello";
+    String type = "world";
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(1425016501000L);
+    te.addEntity(entity);
+    TimelineMetric metric = new TimelineMetric();
+    String metricId = "CPU";
+    metric.setId(metricId);
+    metric.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+    metric.addValue(1425016501000L, 1234567L);
+    TimelineEntity entity2 = new TimelineEntity();
+    String id2 = "metric";
+    String type2 = "app";
+    entity2.setId(id2);
+    entity2.setType(type2);
+    entity2.setCreatedTime(1425016503000L);
+    entity2.addMetric(metric);
+    te.addEntity(entity2);
+    Map<String, TimelineMetric> aggregatedMetrics =
+        new HashMap<String, TimelineMetric>();
+    aggregatedMetrics.put(metricId, metric);
+    FileSystemTimelineWriterImpl fsi = null;
+    try {
+      fsi = new FileSystemTimelineWriterImpl();
+      fsi.init(new YarnConfiguration());
+      fsi.start();
+      fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 
+          "app_id", te);
+      String fileName = fsi.getOutputRoot() +
+          "/entities/cluster_id/user_id/flow_name/flow_version/12345678/" +
+          "app_id/" + type + "/" + id +
+          FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      Path path = Paths.get(fileName);
+      File f = new File(fileName);
+      assertTrue(f.exists() && !f.isDirectory());
+      List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
+      // ensure there's only one entity + 1 new line
+      assertTrue("data size is:" + data.size(), data.size() == 2);
+      String d = data.get(0);
+      // confirm the contents same as what was written
+      assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      // verify aggregated metrics
+      String fileName2 = fsi.getOutputRoot() +
+          + type2 + "/" + id2 +
+          FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      Path path2 = Paths.get(fileName2);
+      File file = new File(fileName2);
+      assertTrue(file.exists() && !file.isDirectory());
+      List<String> data2 = Files.readAllLines(path2, StandardCharsets.UTF_8);
+      // ensure there's only one entity + 1 new line
+      assertTrue("data size is:" + data.size(), data2.size() == 2);
+      String metricToString = data2.get(0);
+      // confirm the contents same as what was written
+      assertEquals(metricToString,
+          TimelineUtils.dumpTimelineRecordtoJSON(entity2));
+      // delete the directory
+      File outputDir = new File(fsi.getOutputRoot());
+      FileUtils.deleteDirectory(outputDir);
+      assertTrue(!(f.exists()));
+    } finally {
+      if (fsi != null) {
+        fsi.close();
+        FileUtils.deleteDirectory(new File(fsi.getOutputRoot()));
+      }
+    }
+  }
diff --git 
new file mode 100644
index 0000000..58df970
--- /dev/null
@@ -0,0 +1,130 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Test;
+public class TestKeyConverters {
+  @Test
+  public void testAppIdKeyConverter() {
+    AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter();
+    long currentTs = System.currentTimeMillis();
+    ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1);
+    ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2);
+    ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1);
+    String appIdStr1 = appId1.toString();
+    String appIdStr2 = appId2.toString();
+    String appIdStr3 = appId3.toString();
+    byte[] appIdBytes1 = appIdKeyConverter.encode(appIdStr1);
+    byte[] appIdBytes2 = appIdKeyConverter.encode(appIdStr2);
+    byte[] appIdBytes3 = appIdKeyConverter.encode(appIdStr3);
+    // App ids' should be encoded in a manner wherein descending order
+    // is maintained.
+    assertTrue(
+        "Ordering of app ids' is incorrect",
+        Bytes.compareTo(appIdBytes1, appIdBytes2) > 0
+            && Bytes.compareTo(appIdBytes1, appIdBytes3) > 0
+            && Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
+    String decodedAppId1 = appIdKeyConverter.decode(appIdBytes1);
+    String decodedAppId2 = appIdKeyConverter.decode(appIdBytes2);
+    String decodedAppId3 = appIdKeyConverter.decode(appIdBytes3);
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr1.equals(decodedAppId1));
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr2.equals(decodedAppId2));
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr3.equals(decodedAppId3));
+  }
+  @Test
+  public void testEventColumnNameConverter() {
+    String eventId = "=foo_=eve=nt=";
+    byte[] valSepBytes = Bytes.toBytes(Separator.VALUES.getValue());
+    byte[] maxByteArr =
+        Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length);
+    byte[] ts = Bytes.add(valSepBytes, maxByteArr);
+    Long eventTs = Bytes.toLong(ts);
+    byte[] byteEventColName =
+        new EventColumnName(eventId, eventTs, null).getColumnQualifier();
+    KeyConverter<EventColumnName> eventColumnNameConverter =
+        new EventColumnNameConverter();
+    EventColumnName eventColName =
+        eventColumnNameConverter.decode(byteEventColName);
+    assertEquals(eventId, eventColName.getId());
+    assertEquals(eventTs, eventColName.getTimestamp());
+    assertNull(eventColName.getInfoKey());
+    String infoKey = "f=oo_event_in=fo=_key";
+    byteEventColName =
+        new EventColumnName(eventId, eventTs, infoKey).getColumnQualifier();
+    eventColName = eventColumnNameConverter.decode(byteEventColName);
+    assertEquals(eventId, eventColName.getId());
+    assertEquals(eventTs, eventColName.getTimestamp());
+    assertEquals(infoKey, eventColName.getInfoKey());
+  }
+  @Test
+  public void testLongKeyConverter() {
+    LongKeyConverter longKeyConverter = new LongKeyConverter();
+    confirmLongKeyConverter(longKeyConverter, Long.MIN_VALUE);
+    confirmLongKeyConverter(longKeyConverter, -1234567890L);
+    confirmLongKeyConverter(longKeyConverter, -128L);
+    confirmLongKeyConverter(longKeyConverter, -127L);
+    confirmLongKeyConverter(longKeyConverter, -1L);
+    confirmLongKeyConverter(longKeyConverter, 0L);
+    confirmLongKeyConverter(longKeyConverter, 1L);
+    confirmLongKeyConverter(longKeyConverter, 127L);
+    confirmLongKeyConverter(longKeyConverter, 128L);
+    confirmLongKeyConverter(longKeyConverter, 1234567890L);
+    confirmLongKeyConverter(longKeyConverter, Long.MAX_VALUE);
+  }
+  private void confirmLongKeyConverter(LongKeyConverter longKeyConverter,
+      Long testValue) {
+    Long decoded = longKeyConverter.decode(longKeyConverter.encode(testValue));
+    assertEquals(testValue, decoded);
+  }
+  @Test
+  public void testStringKeyConverter() {
+    StringKeyConverter stringKeyConverter = new StringKeyConverter();
+    String phrase = "QuackAttack now!";
+    for (int i = 0; i < phrase.length(); i++) {
+      String sub = phrase.substring(i, phrase.length());
+      confirmStrignKeyConverter(stringKeyConverter, sub);
+      confirmStrignKeyConverter(stringKeyConverter, sub + sub);
+    }
+  }
+  private void confirmStrignKeyConverter(StringKeyConverter stringKeyConverter,
+      String testValue) {
+    String decoded =
+        stringKeyConverter.decode(stringKeyConverter.encode(testValue));
+    assertEquals(testValue, decoded);
+  }
diff --git 
new file mode 100644
index 0000000..368b060
--- /dev/null
@@ -0,0 +1,246 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Test;
+public class TestRowKeys {
+  private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue();
+  private final static byte[] QUALIFIER_SEP_BYTES = Bytes
+      .toBytes(QUALIFIER_SEP);
+  private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster";
+  private final static String USER = QUALIFIER_SEP + "user";
+  private final static String FLOW_NAME = "dummy_" + QUALIFIER_SEP + "flow"
+  private final static Long FLOW_RUN_ID;
+  private final static String APPLICATION_ID;
+  static {
+    long runid = Long.MAX_VALUE - 900L;
+    byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE);
+    byte[] byteArr = Bytes.toBytes(runid);
+    int sepByteLen = QUALIFIER_SEP_BYTES.length;
+    if (sepByteLen <= byteArr.length) {
+      for (int i = 0; i < sepByteLen; i++) {
+        byteArr[i] = (byte) (longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]);
+      }
+    }
+    FLOW_RUN_ID = Bytes.toLong(byteArr);
+    long clusterTs = System.currentTimeMillis();
+    byteArr = Bytes.toBytes(clusterTs);
+    if (sepByteLen <= byteArr.length) {
+      for (int i = 0; i < sepByteLen; i++) {
+        byteArr[byteArr.length - sepByteLen + i] =
+            (byte) (longMaxByteArr[byteArr.length - sepByteLen + i] -
+                QUALIFIER_SEP_BYTES[i]);
+      }
+    }
+    clusterTs = Bytes.toLong(byteArr);
+    int seqId = 222;
+    APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString();
+  }
+  private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) {
+    int sepLen = QUALIFIER_SEP_BYTES.length;
+    for (int i = 0; i < sepLen; i++) {
+      assertTrue(
+          "Row key prefix not encoded properly.",
+          byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] ==
+              QUALIFIER_SEP_BYTES[i]);
+    }
+  }
+  @Test
+  public void testApplicationRowKey() {
+    byte[] byteRowKey =
+        new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
+            APPLICATION_ID).getRowKey();
+    ApplicationRowKey rowKey = ApplicationRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
+    assertEquals(APPLICATION_ID, rowKey.getAppId());
+    byte[] byteRowKeyPrefix =
+        new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID)
+            .getRowKeyPrefix();
+    byte[][] splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix,
+            new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+                Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+                Separator.VARIABLE_SIZE});
+    assertEquals(5, splits.length);
+    assertEquals(0, splits[4].length);
+    assertEquals(FLOW_NAME,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
+    assertEquals(FLOW_RUN_ID,
+        (Long) LongConverter.invertLong(Bytes.toLong(splits[3])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+    byteRowKeyPrefix =
+        new ApplicationRowKeyPrefix(CLUSTER, USER, 
+    splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
+    assertEquals(4, splits.length);
+    assertEquals(0, splits[3].length);
+    assertEquals(FLOW_NAME,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+  /**
+   * Tests the converters indirectly through the public methods of the
+   * corresponding rowkey.
+   */
+  @Test
+  public void testAppToFlowRowKey() {
+    byte[] byteRowKey = new AppToFlowRowKey(CLUSTER,
+        APPLICATION_ID).getRowKey();
+    AppToFlowRowKey rowKey = AppToFlowRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(APPLICATION_ID, rowKey.getAppId());
+  }
+  @Test
+  public void testEntityRowKey() {
+    String entityId = "!ent!ity!!id!";
+    String entityType = "entity!Type";
+    byte[] byteRowKey =
+            entityType, entityId).getRowKey();
+    EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
+    assertEquals(APPLICATION_ID, rowKey.getAppId());
+    assertEquals(entityType, rowKey.getEntityType());
+    assertEquals(entityId, rowKey.getEntityId());
+    byte[] byteRowKeyPrefix =
+        new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
+            APPLICATION_ID, entityType).getRowKeyPrefix();
+    byte[][] splits =
+        Separator.QUALIFIERS.split(
+            byteRowKeyPrefix,
+            new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+                Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+                AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
+                Separator.VARIABLE_SIZE});
+    assertEquals(7, splits.length);
+    assertEquals(0, splits[6].length);
+    assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4]));
+    assertEquals(entityType,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+    byteRowKeyPrefix =
+        new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
+            APPLICATION_ID).getRowKeyPrefix();
+    splits =
+        Separator.QUALIFIERS.split(
+            byteRowKeyPrefix,
+            new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+                Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+                AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE});
+    assertEquals(6, splits.length);
+    assertEquals(0, splits[5].length);
+    AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter();
+    assertEquals(APPLICATION_ID, appIdKeyConverter.decode(splits[4]));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+  @Test
+  public void testFlowActivityRowKey() {
+    Long ts = 1459900830000L;
+    Long dayTimestamp = TimelineStorageUtils.getTopOfTheDayTimestamp(ts);
+    byte[] byteRowKey =
+        new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey();
+    FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(dayTimestamp, rowKey.getDayTimestamp());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+    byte[] byteRowKeyPrefix =
+        new FlowActivityRowKeyPrefix(CLUSTER).getRowKeyPrefix();
+    byte[][] splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
+    assertEquals(2, splits.length);
+    assertEquals(0, splits[1].length);
+    assertEquals(CLUSTER,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+    byteRowKeyPrefix =
+        new FlowActivityRowKeyPrefix(CLUSTER, ts).getRowKeyPrefix();
+    splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix,
+            new int[] {Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+                Separator.VARIABLE_SIZE});
+    assertEquals(3, splits.length);
+    assertEquals(0, splits[2].length);
+    assertEquals(CLUSTER,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
+    assertEquals(ts,
+        (Long) LongConverter.invertLong(Bytes.toLong(splits[1])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+  @Test
+  public void testFlowRunRowKey() {
+    byte[] byteRowKey =
+        new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID).getRowKey();
+    FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
+    byte[] byteRowKeyPrefix =
+        new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null).getRowKey();
+    byte[][] splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
+    assertEquals(4, splits.length);
+    assertEquals(0, splits[3].length);
+    assertEquals(FLOW_NAME,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
diff --git 
new file mode 100644
index 0000000..7d37206
--- /dev/null
@@ -0,0 +1,215 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
+ * the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+public class TestSeparator {
+  private static String villain = "Dr. Heinz Doofenshmirtz";
+  private static String special =
+      ".   *   |   ?   +   \t   (   )   [   ]   {   }   ^   $  \\ \"  %";
+  /**
+   *
+   */
+  @Test
+  public void testEncodeDecodeString() {
+    for (Separator separator : Separator.values()) {
+      testEncodeDecode(separator, "");
+      testEncodeDecode(separator, " ");
+      testEncodeDecode(separator, "!");
+      testEncodeDecode(separator, "?");
+      testEncodeDecode(separator, "&");
+      testEncodeDecode(separator, "+");
+      testEncodeDecode(separator, "\t");
+      testEncodeDecode(separator, "Dr.");
+      testEncodeDecode(separator, "Heinz");
+      testEncodeDecode(separator, "Doofenshmirtz");
+      testEncodeDecode(separator, villain);
+      testEncodeDecode(separator, special);
+      assertNull(separator.encode(null));
+    }
+  }
+  private void testEncodeDecode(Separator separator, String token) {
+    String encoded = separator.encode(token);
+    String decoded = separator.decode(encoded);
+    String msg = "token:" + token + " separator:" + separator + ".";
+    assertEquals(msg, token, decoded);
+  }
+  @Test
+  public void testEncodeDecode() {
+    testEncodeDecode("Dr.", Separator.QUALIFIERS);
+    testEncodeDecode("Heinz", Separator.QUALIFIERS, Separator.QUALIFIERS);
+    testEncodeDecode("Doofenshmirtz", Separator.QUALIFIERS, null,
+        Separator.QUALIFIERS);
+    testEncodeDecode("&Perry", Separator.QUALIFIERS, Separator.VALUES, null);
+    testEncodeDecode("the ", Separator.QUALIFIERS, Separator.SPACE);
+    testEncodeDecode("Platypus...", (Separator) null);
+    testEncodeDecode("The what now ?!?", Separator.QUALIFIERS,
+        Separator.VALUES, Separator.SPACE);
+  }
+  @Test
+  public void testEncodedValues() {
+    testEncodeDecode("Double-escape %2$ and %9$ or %%2$ or %%3$, nor  %%%2$" +
+        "= no problem!",
+        Separator.QUALIFIERS, Separator.VALUES, Separator.SPACE, 
+  }
+  @Test
+  public void testSplits() {
+    byte[] maxLongBytes = Bytes.toBytes(Long.MAX_VALUE);
+    byte[] maxIntBytes = Bytes.toBytes(Integer.MAX_VALUE);
+    for (Separator separator : Separator.values()) {
+      String str1 = "cl" + separator.getValue() + "us";
+      String str2 = separator.getValue() + "rst";
+      byte[] sepByteArr = Bytes.toBytes(separator.getValue());
+      byte[] longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes,
+          sepByteArr.length, Bytes.SIZEOF_LONG - sepByteArr.length));
+      byte[] intVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxIntBytes,
+          sepByteArr.length, Bytes.SIZEOF_INT - sepByteArr.length));
+      byte[] arr = separator.join(
+          Bytes.toBytes(separator.encode(str1)), longVal1Arr,
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
+      int[] sizes = {Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+          Separator.VARIABLE_SIZE, Bytes.SIZEOF_INT};
+      byte[][] splits = separator.split(arr, sizes);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
+      longVal1Arr = Bytes.add(Bytes.copy(maxLongBytes, 0, Bytes.SIZEOF_LONG -
+          sepByteArr.length), sepByteArr);
+      intVal1Arr = Bytes.add(Bytes.copy(maxIntBytes, 0, Bytes.SIZEOF_INT -
+          sepByteArr.length), sepByteArr);
+      arr = separator.join(Bytes.toBytes(separator.encode(str1)), longVal1Arr,
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
+      splits = separator.split(arr, sizes);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
+      longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes,
+          sepByteArr.length, 4 - sepByteArr.length), sepByteArr);
+      longVal1Arr = Bytes.add(longVal1Arr, Bytes.copy(maxLongBytes, 4, 3 -
+              sepByteArr.length), sepByteArr);
+      arr = separator.join(Bytes.toBytes(separator.encode(str1)), longVal1Arr,
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
+      splits = separator.split(arr, sizes);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
+      arr = separator.join(Bytes.toBytes(separator.encode(str1)),
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr, longVal1Arr);
+      int[] sizes1 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+          Bytes.SIZEOF_INT, Bytes.SIZEOF_LONG};
+      splits = separator.split(arr, sizes1);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[1])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[2]));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[3]));
+      try {
+        int[] sizes2 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+            Bytes.SIZEOF_INT, 7};
+        splits = separator.split(arr, sizes2);
+        fail("Exception should have been thrown.");
+      } catch (IllegalArgumentException e) {}
+      try {
+        int[] sizes2 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, 2,
+            Bytes.SIZEOF_LONG};
+        splits = separator.split(arr, sizes2);
+        fail("Exception should have been thrown.");
+      } catch (IllegalArgumentException e) {}
+    }
+  }
+  /**
+   * Simple test to encode and decode using the same separators and confirm 
+   * we end up with the same as what we started with.
+   *
+   * @param token
+   * @param separators
+   */
+  private static void testEncodeDecode(String token, Separator... separators) {
+    byte[] encoded = Separator.encode(token, separators);
+    String decoded = Separator.decode(encoded, separators);
+    assertEquals(token, decoded);
+  }
+  @Test
+  public void testJoinStripped() {
+    List<String> stringList = new ArrayList<String>(0);
+    stringList.add("nothing");
+    String joined = Separator.VALUES.joinEncoded(stringList);
+    Iterable<String> split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(stringList, split));
+    stringList = new ArrayList<String>(3);
+    stringList.add("a");
+    stringList.add("b?");
+    stringList.add("c");
+    joined = Separator.VALUES.joinEncoded(stringList);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(stringList, split));
+    String[] stringArray1 = {"else"};
+    joined = Separator.VALUES.joinEncoded(stringArray1);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray1), split));
+    String[] stringArray2 = {"d", "e?", "f"};
+    joined = Separator.VALUES.joinEncoded(stringArray2);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray2), split));
+    List<String> empty = new ArrayList<String>(0);
+    split = Separator.VALUES.splitEncoded(null);
+    assertTrue(Iterables.elementsEqual(empty, split));
+  }
diff --git 
new file mode 100644
index 0000000..81a3f6a
--- /dev/null
@@ -0,0 +1,19 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# log4j configuration used during build and unit tests
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} 
(%F:%M(%L)) - %m%n
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml 
index 80544bd..1b6b1d5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
@@ -43,5 +43,7 @@
+    <module>hadoop-yarn-server-timelineservice</module>
+    <module>hadoop-yarn-server-timelineservice-hbase-tests</module>
diff --git 
index f20bd2c..f09909b 100644
@@ -88,7 +88,7 @@ Current status
 Future Plans
 1. Future releases will introduce a next generation timeline service
-which is scalable and reliable, "Timeline Server v2".
+which is scalable and reliable, ["Timeline Service 
 1. The expanded features of this service *may not* be available to
 applications using the Timeline Server v1 REST API. That includes extended
 data structures as well as the ability of the client to failover between 
Timeline Server instances.

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to