YARN-5928. Move ATSv2 HBase backend code into a new module that is only 
dependent at runtime by yarn servers. Contributed by Haibo Chen.

(cherry picked from commit b92089c0e8ab1b87b8b5b55b1e3d4367ae5d847a)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/673ab905
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/673ab905
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/673ab905

Branch: refs/heads/YARN-5355-branch-2
Commit: 673ab905cf6c5a326449f8d5a0565667e4018398
Parents: ac13f7c
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Jan 19 21:21:48 2017 -0800
Committer: Varun Saxena <varunsax...@apache.org>
Committed: Tue Apr 25 23:14:28 2017 +0530

----------------------------------------------------------------------
 hadoop-project/pom.xml                          |  13 +-
 .../TestRMHATimelineCollectors.java             |   6 +
 .../server/resourcemanager/TestRMRestart.java   |   4 +
 .../TestResourceTrackerService.java             |   4 +
 .../pom.xml                                     |  12 +
 .../pom.xml                                     | 190 +++++
 .../reader/filter/TimelineFilterUtils.java      | 307 ++++++++
 .../reader/filter/package-info.java             |  28 +
 .../storage/HBaseTimelineReaderImpl.java        |  96 +++
 .../storage/HBaseTimelineWriterImpl.java        | 542 ++++++++++++++
 .../storage/TimelineSchemaCreator.java          | 251 +++++++
 .../storage/application/ApplicationColumn.java  | 156 ++++
 .../application/ApplicationColumnFamily.java    |  65 ++
 .../application/ApplicationColumnPrefix.java    | 288 ++++++++
 .../storage/application/ApplicationRowKey.java  | 206 ++++++
 .../application/ApplicationRowKeyPrefix.java    |  69 ++
 .../storage/application/ApplicationTable.java   | 161 ++++
 .../storage/application/package-info.java       |  28 +
 .../apptoflow/AppToFlowColumnFamily.java        |  51 ++
 .../apptoflow/AppToFlowColumnPrefix.java        | 206 ++++++
 .../storage/apptoflow/AppToFlowRowKey.java      |  58 ++
 .../storage/apptoflow/AppToFlowTable.java       | 124 ++++
 .../storage/apptoflow/package-info.java         |  28 +
 .../storage/common/AppIdKeyConverter.java       |  96 +++
 .../storage/common/BaseTable.java               | 167 +++++
 .../common/BufferedMutatorDelegator.java        |  73 ++
 .../timelineservice/storage/common/Column.java  |  80 ++
 .../storage/common/ColumnFamily.java            |  34 +
 .../storage/common/ColumnHelper.java            | 389 ++++++++++
 .../storage/common/ColumnPrefix.java            | 145 ++++
 .../storage/common/EventColumnName.java         |  63 ++
 .../common/EventColumnNameConverter.java        |  99 +++
 .../storage/common/GenericConverter.java        |  48 ++
 .../common/HBaseTimelineStorageUtils.java       | 306 ++++++++
 .../storage/common/KeyConverter.java            |  41 ++
 .../storage/common/LongConverter.java           |  94 +++
 .../storage/common/LongKeyConverter.java        |  68 ++
 .../storage/common/NumericValueConverter.java   |  39 +
 .../timelineservice/storage/common/Range.java   |  62 ++
 .../storage/common/RowKeyPrefix.java            |  42 ++
 .../storage/common/Separator.java               | 575 +++++++++++++++
 .../storage/common/StringKeyConverter.java      |  54 ++
 .../common/TimelineHBaseSchemaConstants.java    |  71 ++
 .../storage/common/TimestampGenerator.java      | 116 +++
 .../storage/common/TypedBufferedMutator.java    |  28 +
 .../storage/common/ValueConverter.java          |  47 ++
 .../storage/common/package-info.java            |  28 +
 .../storage/entity/EntityColumn.java            | 160 ++++
 .../storage/entity/EntityColumnFamily.java      |  65 ++
 .../storage/entity/EntityColumnPrefix.java      | 300 ++++++++
 .../storage/entity/EntityRowKey.java            | 249 +++++++
 .../storage/entity/EntityRowKeyPrefix.java      |  77 ++
 .../storage/entity/EntityTable.java             | 161 ++++
 .../storage/entity/package-info.java            |  28 +
 .../flow/AggregationCompactionDimension.java    |  63 ++
 .../storage/flow/AggregationOperation.java      |  94 +++
 .../timelineservice/storage/flow/Attribute.java |  39 +
 .../storage/flow/FlowActivityColumnFamily.java  |  55 ++
 .../storage/flow/FlowActivityColumnPrefix.java  | 277 +++++++
 .../storage/flow/FlowActivityRowKey.java        | 196 +++++
 .../storage/flow/FlowActivityRowKeyPrefix.java  |  60 ++
 .../storage/flow/FlowActivityTable.java         | 108 +++
 .../storage/flow/FlowRunColumn.java             | 182 +++++
 .../storage/flow/FlowRunColumnFamily.java       |  54 ++
 .../storage/flow/FlowRunColumnPrefix.java       | 268 +++++++
 .../storage/flow/FlowRunCoprocessor.java        | 274 +++++++
 .../storage/flow/FlowRunRowKey.java             | 190 +++++
 .../storage/flow/FlowRunRowKeyPrefix.java       |  54 ++
 .../storage/flow/FlowRunTable.java              | 150 ++++
 .../storage/flow/FlowScanner.java               | 728 +++++++++++++++++++
 .../storage/flow/FlowScannerOperation.java      |  46 ++
 .../storage/flow/package-info.java              |  29 +
 .../timelineservice/storage/package-info.java   |  28 +
 .../reader/AbstractTimelineStorageReader.java   | 158 ++++
 .../storage/reader/ApplicationEntityReader.java | 502 +++++++++++++
 .../storage/reader/EntityTypeReader.java        | 180 +++++
 .../reader/FlowActivityEntityReader.java        | 163 +++++
 .../storage/reader/FlowRunEntityReader.java     | 290 ++++++++
 .../storage/reader/GenericEntityReader.java     | 628 ++++++++++++++++
 .../storage/reader/TimelineEntityReader.java    | 458 ++++++++++++
 .../reader/TimelineEntityReaderFactory.java     | 102 +++
 .../storage/reader/package-info.java            |  28 +
 .../storage/common/TestKeyConverters.java       | 130 ++++
 .../storage/common/TestRowKeys.java             | 250 +++++++
 .../storage/common/TestSeparator.java           | 215 ++++++
 .../hadoop-yarn-server-timelineservice/pom.xml  |  50 --
 .../reader/filter/TimelineFilterUtils.java      | 307 --------
 .../storage/HBaseTimelineReaderImpl.java        |  96 ---
 .../storage/HBaseTimelineWriterImpl.java        | 542 --------------
 .../storage/TimelineSchemaCreator.java          | 251 -------
 .../storage/application/ApplicationColumn.java  | 156 ----
 .../application/ApplicationColumnFamily.java    |  65 --
 .../application/ApplicationColumnPrefix.java    | 288 --------
 .../storage/application/ApplicationRowKey.java  | 206 ------
 .../application/ApplicationRowKeyPrefix.java    |  69 --
 .../storage/application/ApplicationTable.java   | 161 ----
 .../storage/application/package-info.java       |  28 -
 .../apptoflow/AppToFlowColumnFamily.java        |  51 --
 .../apptoflow/AppToFlowColumnPrefix.java        | 206 ------
 .../storage/apptoflow/AppToFlowRowKey.java      |  58 --
 .../storage/apptoflow/AppToFlowTable.java       | 124 ----
 .../storage/apptoflow/package-info.java         |  28 -
 .../storage/common/AppIdKeyConverter.java       |  96 ---
 .../storage/common/BaseTable.java               | 167 -----
 .../common/BufferedMutatorDelegator.java        |  73 --
 .../timelineservice/storage/common/Column.java  |  80 --
 .../storage/common/ColumnFamily.java            |  34 -
 .../storage/common/ColumnHelper.java            | 389 ----------
 .../storage/common/ColumnPrefix.java            | 145 ----
 .../storage/common/EventColumnName.java         |  63 --
 .../common/EventColumnNameConverter.java        |  99 ---
 .../storage/common/GenericConverter.java        |  48 --
 .../common/HBaseTimelineStorageUtils.java       | 306 --------
 .../storage/common/KeyConverter.java            |  41 --
 .../storage/common/LongConverter.java           |  94 ---
 .../storage/common/LongKeyConverter.java        |  68 --
 .../storage/common/NumericValueConverter.java   |  39 -
 .../timelineservice/storage/common/Range.java   |  62 --
 .../storage/common/RowKeyPrefix.java            |  42 --
 .../storage/common/Separator.java               | 575 ---------------
 .../storage/common/StringKeyConverter.java      |  54 --
 .../common/TimelineHBaseSchemaConstants.java    |  71 --
 .../storage/common/TimestampGenerator.java      | 116 ---
 .../storage/common/TypedBufferedMutator.java    |  28 -
 .../storage/common/ValueConverter.java          |  47 --
 .../storage/entity/EntityColumn.java            | 160 ----
 .../storage/entity/EntityColumnFamily.java      |  65 --
 .../storage/entity/EntityColumnPrefix.java      | 300 --------
 .../storage/entity/EntityRowKey.java            | 249 -------
 .../storage/entity/EntityRowKeyPrefix.java      |  77 --
 .../storage/entity/EntityTable.java             | 161 ----
 .../storage/entity/package-info.java            |  28 -
 .../flow/AggregationCompactionDimension.java    |  63 --
 .../storage/flow/AggregationOperation.java      |  94 ---
 .../timelineservice/storage/flow/Attribute.java |  39 -
 .../storage/flow/FlowActivityColumnFamily.java  |  55 --
 .../storage/flow/FlowActivityColumnPrefix.java  | 277 -------
 .../storage/flow/FlowActivityRowKey.java        | 196 -----
 .../storage/flow/FlowActivityRowKeyPrefix.java  |  60 --
 .../storage/flow/FlowActivityTable.java         | 108 ---
 .../storage/flow/FlowRunColumn.java             | 182 -----
 .../storage/flow/FlowRunColumnFamily.java       |  54 --
 .../storage/flow/FlowRunColumnPrefix.java       | 268 -------
 .../storage/flow/FlowRunCoprocessor.java        | 274 -------
 .../storage/flow/FlowRunRowKey.java             | 190 -----
 .../storage/flow/FlowRunRowKeyPrefix.java       |  54 --
 .../storage/flow/FlowRunTable.java              | 150 ----
 .../storage/flow/FlowScanner.java               | 728 -------------------
 .../storage/flow/FlowScannerOperation.java      |  46 --
 .../storage/flow/package-info.java              |  29 -
 .../reader/AbstractTimelineStorageReader.java   | 158 ----
 .../storage/reader/ApplicationEntityReader.java | 502 -------------
 .../storage/reader/EntityTypeReader.java        | 180 -----
 .../reader/FlowActivityEntityReader.java        | 163 -----
 .../storage/reader/FlowRunEntityReader.java     | 290 --------
 .../storage/reader/GenericEntityReader.java     | 628 ----------------
 .../storage/reader/TimelineEntityReader.java    | 458 ------------
 .../reader/TimelineEntityReaderFactory.java     | 102 ---
 .../storage/reader/package-info.java            |  28 -
 .../storage/common/TestKeyConverters.java       | 130 ----
 .../storage/common/TestRowKeys.java             | 250 -------
 .../storage/common/TestSeparator.java           | 215 ------
 .../hadoop-yarn/hadoop-yarn-server/pom.xml      |   1 +
 .../src/site/markdown/TimelineServiceV2.md      |   2 +-
 hadoop-yarn-project/pom.xml                     |   4 +
 165 files changed, 12696 insertions(+), 12442 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 152a703..1124189 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -325,6 +325,12 @@
           <type>test-jar</type>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-yarn-server-timelineservice-hbase</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-yarn-applications-distributedshell</artifactId>
@@ -1113,13 +1119,6 @@
      </dependency>
       <dependency>
         <groupId>org.apache.hbase</groupId>
-        <artifactId>hbase-it</artifactId>
-        <version>${hbase.version}</version>
-        <scope>test</scope>
-        <classifier>tests</classifier>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.hbase</groupId>
         <artifactId>hbase-testing-util</artifactId>
         <version>${hbase.version}</version>
         <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java
index a54ff34..fa0d318 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHATimelineCollectors.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -47,8 +49,12 @@ public class TestRMHATimelineCollectors extends RMHATestBase 
{
     super.setup();
     confForRM1.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
     confForRM2.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    confForRM1.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+        FileSystemTimelineWriterImpl.class, TimelineWriter.class);
     confForRM1.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
     confForRM2.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    confForRM2.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+        FileSystemTimelineWriterImpl.class, TimelineWriter.class);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index af7d5de..139e2da 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -115,6 +115,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.Level;
@@ -150,6 +152,8 @@ public class TestRMRestart extends 
ParameterizedSchedulerTestBase {
     conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
     conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, 
false);
     conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+        FileSystemTimelineWriterImpl.class, TimelineWriter.class);
     rmAddr = new InetSocketAddress("localhost", 8032);
     Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index c172548..ef12e8c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -80,6 +80,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
@@ -993,6 +995,8 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     conf.set(YarnConfiguration.NM_AUX_SERVICES + "."
         + "timeline_collector" + ".class",
         PerNodeTimelineCollectorsAuxService.class.getName());
+    conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+        FileSystemTimelineWriterImpl.class, TimelineWriter.class);
 
     rm = new MockRM(conf);
     rm.start();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
index ed014de..7bcf764 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
@@ -61,6 +61,18 @@
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice-hbase</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>${hbase-compatible-hadoop.version}</version>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
new file mode 100644
index 0000000..3358467c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
@@ -0,0 +1,190 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                             http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>hadoop-yarn-server</artifactId>
+    <groupId>org.apache.hadoop</groupId>
+    <version>2.9.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hadoop-yarn-server-timelineservice-hbase</artifactId>
+  <name>Apache Hadoop YARN TimelineService HBase Backend</name>
+
+  <properties>
+    <!-- Needed for generating FindBugs warnings using parent pom -->
+    <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-sslengine</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+          <executions>
+            <execution>
+              <goals>
+                <goal>test-jar</goal>
+              </goals>
+              <phase>test-compile</phase>
+            </execution>
+          </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <configuration>
+          <additionnalDependencies>
+            <additionnalDependency>
+              <groupId>junit</groupId>
+              <artifactId>junit</artifactId>
+              <version>4.11</version>
+            </additionnalDependency>
+          </additionnalDependencies>
+          </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
new file mode 100644
index 0000000..8e38e95
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+
+/**
+ * Set of utility methods used by timeline filter classes.
+ */
+public final class TimelineFilterUtils {
+
+  private static final Log LOG = LogFactory.getLog(TimelineFilterUtils.class);
+
+  private TimelineFilterUtils() {
+  }
+
+  /**
+   * Returns the equivalent HBase filter list's {@link Operator}.
+   *
+   * @param op timeline filter list operator.
+   * @return HBase filter list's Operator.
+   */
+  private static Operator getHBaseOperator(TimelineFilterList.Operator op) {
+    switch (op) {
+    case AND:
+      return Operator.MUST_PASS_ALL;
+    case OR:
+      return Operator.MUST_PASS_ONE;
+    default:
+      throw new IllegalArgumentException("Invalid operator");
+    }
+  }
+
+  /**
+   * Returns the equivalent HBase compare filter's {@link CompareOp}.
+   *
+   * @param op timeline compare op.
+   * @return HBase compare filter's CompareOp.
+   */
+  private static CompareOp getHBaseCompareOp(
+      TimelineCompareOp op) {
+    switch (op) {
+    case LESS_THAN:
+      return CompareOp.LESS;
+    case LESS_OR_EQUAL:
+      return CompareOp.LESS_OR_EQUAL;
+    case EQUAL:
+      return CompareOp.EQUAL;
+    case NOT_EQUAL:
+      return CompareOp.NOT_EQUAL;
+    case GREATER_OR_EQUAL:
+      return CompareOp.GREATER_OR_EQUAL;
+    case GREATER_THAN:
+      return CompareOp.GREATER;
+    default:
+      throw new IllegalArgumentException("Invalid compare operator");
+    }
+  }
+
+  /**
+   * Converts a {@link TimelinePrefixFilter} to an equivalent HBase
+   * {@link QualifierFilter}.
+   * @param colPrefix
+   * @param filter
+   * @return a {@link QualifierFilter} object
+   */
+  private static <T> Filter createHBaseColQualPrefixFilter(
+      ColumnPrefix<T> colPrefix, TimelinePrefixFilter filter) {
+    return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()),
+        new BinaryPrefixComparator(
+            colPrefix.getColumnPrefixBytes(filter.getPrefix())));
+  }
+
+  /**
+   * Create a HBase {@link QualifierFilter} for the passed column prefix and
+   * compare op.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param compareOp compare op.
+   * @param columnPrefix column prefix.
+   * @return a column qualifier filter.
+   */
+  public static <T> Filter createHBaseQualifierFilter(CompareOp compareOp,
+      ColumnPrefix<T> columnPrefix) {
+    return new QualifierFilter(compareOp,
+        new BinaryPrefixComparator(
+            columnPrefix.getColumnPrefixBytes("")));
+  }
+
+  /**
+   * Create filters for confs or metrics to retrieve. This list includes a
+   * configs/metrics family filter and relevant filters for confs/metrics to
+   * retrieve, if present.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param confsOrMetricToRetrieve configs/metrics to retrieve.
+   * @param columnFamily config or metric column family.
+   * @param columnPrefix config or metric column prefix.
+   * @return a filter list.
+   * @throws IOException if any problem occurs while creating the filters.
+   */
+  public static <T> Filter createFilterForConfsOrMetricsToRetrieve(
+      TimelineFilterList confsOrMetricToRetrieve, ColumnFamily<T> columnFamily,
+      ColumnPrefix<T> columnPrefix) throws IOException {
+    Filter familyFilter = new FamilyFilter(CompareOp.EQUAL,
+        new BinaryComparator(columnFamily.getBytes()));
+    if (confsOrMetricToRetrieve != null &&
+        !confsOrMetricToRetrieve.getFilterList().isEmpty()) {
+      // If confsOrMetricsToRetrive are specified, create a filter list based
+      // on it and family filter.
+      FilterList filter = new FilterList(familyFilter);
+      filter.addFilter(
+          createHBaseFilterList(columnPrefix, confsOrMetricToRetrieve));
+      return filter;
+    } else {
+      // Only the family filter needs to be added.
+      return familyFilter;
+    }
+  }
+
+  /**
+   * Create 2 HBase {@link SingleColumnValueFilter} filters for the specified
+   * value range represented by start and end value and wraps them inside a
+   * filter list. Start and end value should not be null.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param column Column for which single column value filter is to be 
created.
+   * @param startValue Start value.
+   * @param endValue End value.
+   * @return 2 single column value filters wrapped in a filter list.
+   * @throws IOException if any problem is encountered while encoding value.
+   */
+  public static <T> FilterList createSingleColValueFiltersByRange(
+      Column<T> column, Object startValue, Object endValue) throws IOException 
{
+    FilterList list = new FilterList();
+    Filter singleColValFilterStart = createHBaseSingleColValueFilter(
+        column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
+        column.getValueConverter().encodeValue(startValue),
+        CompareOp.GREATER_OR_EQUAL, true);
+    list.addFilter(singleColValFilterStart);
+
+    Filter singleColValFilterEnd = createHBaseSingleColValueFilter(
+        column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
+        column.getValueConverter().encodeValue(endValue),
+        CompareOp.LESS_OR_EQUAL, true);
+    list.addFilter(singleColValFilterEnd);
+    return list;
+  }
+
+  /**
+   * Creates a HBase {@link SingleColumnValueFilter} with specified column.
+   * @param <T> Describes the type of column prefix.
+   * @param column Column which value to be filtered.
+   * @param value Value to be filtered.
+   * @param op Compare operator
+   * @return a SingleColumnValue Filter
+   * @throws IOException if any exception.
+   */
+  public static <T> Filter createHBaseSingleColValueFilter(Column<T> column,
+      Object value, CompareOp op) throws IOException {
+    Filter singleColValFilter = createHBaseSingleColValueFilter(
+        column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
+        column.getValueConverter().encodeValue(value), op, true);
+    return singleColValFilter;
+  }
+
+  /**
+   * Creates a HBase {@link SingleColumnValueFilter}.
+   *
+   * @param columnFamily Column Family represented as bytes.
+   * @param columnQualifier Column Qualifier represented as bytes.
+   * @param value Value.
+   * @param compareOp Compare operator.
+   * @param filterIfMissing This flag decides if we should filter the row if 
the
+   *     specified column is missing. This is based on the filter's 
keyMustExist
+   *     field.
+   * @return a {@link SingleColumnValueFilter} object
+   * @throws IOException
+   */
+  private static SingleColumnValueFilter createHBaseSingleColValueFilter(
+      byte[] columnFamily, byte[] columnQualifier, byte[] value,
+      CompareOp compareOp, boolean filterIfMissing) throws IOException {
+    SingleColumnValueFilter singleColValFilter =
+        new SingleColumnValueFilter(columnFamily, columnQualifier, compareOp,
+        new BinaryComparator(value));
+    singleColValFilter.setLatestVersionOnly(true);
+    singleColValFilter.setFilterIfMissing(filterIfMissing);
+    return singleColValFilter;
+  }
+
+  /**
+   * Fetch columns from filter list containing exists and multivalue equality
+   * filters. This is done to fetch only required columns from back-end and
+   * then match event filters or relationships in reader.
+   *
+   * @param filterList filter list.
+   * @return set of columns.
+   */
+  public static Set<String> fetchColumnsFromFilterList(
+      TimelineFilterList filterList) {
+    Set<String> strSet = new HashSet<String>();
+    for (TimelineFilter filter : filterList.getFilterList()) {
+      switch(filter.getFilterType()) {
+      case LIST:
+        strSet.addAll(fetchColumnsFromFilterList((TimelineFilterList)filter));
+        break;
+      case KEY_VALUES:
+        strSet.add(((TimelineKeyValuesFilter)filter).getKey());
+        break;
+      case EXISTS:
+        strSet.add(((TimelineExistsFilter)filter).getValue());
+        break;
+      default:
+        LOG.info("Unexpected filter type " + filter.getFilterType());
+        break;
+      }
+    }
+    return strSet;
+  }
+
+  /**
+   * Creates equivalent HBase {@link FilterList} from {@link 
TimelineFilterList}
+   * while converting different timeline filters(of type {@link 
TimelineFilter})
+   * into their equivalent HBase filters.
+   *
+   * @param <T> Describes the type of column prefix.
+   * @param colPrefix column prefix which will be used for conversion.
+   * @param filterList timeline filter list which has to be converted.
+   * @return A {@link FilterList} object.
+   * @throws IOException if any problem occurs while creating the filter list.
+   */
+  public static <T> FilterList createHBaseFilterList(ColumnPrefix<T> colPrefix,
+      TimelineFilterList filterList) throws IOException {
+    FilterList list =
+        new FilterList(getHBaseOperator(filterList.getOperator()));
+    for (TimelineFilter filter : filterList.getFilterList()) {
+      switch(filter.getFilterType()) {
+      case LIST:
+        list.addFilter(createHBaseFilterList(colPrefix,
+            (TimelineFilterList)filter));
+        break;
+      case PREFIX:
+        list.addFilter(createHBaseColQualPrefixFilter(colPrefix,
+            (TimelinePrefixFilter)filter));
+        break;
+      case COMPARE:
+        TimelineCompareFilter compareFilter = (TimelineCompareFilter)filter;
+        list.addFilter(
+            createHBaseSingleColValueFilter(
+                colPrefix.getColumnFamilyBytes(),
+                colPrefix.getColumnPrefixBytes(compareFilter.getKey()),
+                colPrefix.getValueConverter().
+                    encodeValue(compareFilter.getValue()),
+                getHBaseCompareOp(compareFilter.getCompareOp()),
+                compareFilter.getKeyMustExist()));
+        break;
+      case KEY_VALUE:
+        TimelineKeyValueFilter kvFilter = (TimelineKeyValueFilter)filter;
+        list.addFilter(
+            createHBaseSingleColValueFilter(
+                colPrefix.getColumnFamilyBytes(),
+                colPrefix.getColumnPrefixBytes(kvFilter.getKey()),
+                colPrefix.getValueConverter().encodeValue(kvFilter.getValue()),
+                getHBaseCompareOp(kvFilter.getCompareOp()),
+                kvFilter.getKeyMustExist()));
+        break;
+      default:
+        LOG.info("Unexpected filter type " + filter.getFilterType());
+        break;
+      }
+    }
+    return list;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
new file mode 100644
index 0000000..f7c0705
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.server.timelineservice.reader.filter stores
+ * timeline filter implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
new file mode 100644
index 0000000..ce20113
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.reader.EntityTypeReader;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
+
+/**
+ * HBase based implementation for {@link TimelineReader}.
+ */
+public class HBaseTimelineReaderImpl
+    extends AbstractService implements TimelineReader {
+
+  private static final Log LOG = LogFactory
+      .getLog(HBaseTimelineReaderImpl.class);
+
+  private Configuration hbaseConf = null;
+  private Connection conn;
+
+  public HBaseTimelineReaderImpl() {
+    super(HBaseTimelineReaderImpl.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
+    conn = ConnectionFactory.createConnection(hbaseConf);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (conn != null) {
+      LOG.info("closing the hbase Connection");
+      conn.close();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public TimelineEntity getEntity(TimelineReaderContext context,
+      TimelineDataToRetrieve dataToRetrieve) throws IOException {
+    TimelineEntityReader reader =
+        TimelineEntityReaderFactory.createSingleEntityReader(context,
+            dataToRetrieve);
+    return reader.readEntity(hbaseConf, conn);
+  }
+
+  @Override
+  public Set<TimelineEntity> getEntities(TimelineReaderContext context,
+      TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
+      throws IOException {
+    TimelineEntityReader reader =
+        TimelineEntityReaderFactory.createMultipleEntitiesReader(context,
+            filters, dataToRetrieve);
+    return reader.readEntities(hbaseConf, conn);
+  }
+
+  @Override
+  public Set<String> getEntityTypes(TimelineReaderContext context)
+      throws IOException {
+    EntityTypeReader reader = new EntityTypeReader(context);
+    return reader.readEntityTypes(hbaseConf, conn);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
new file mode 100644
index 0000000..dfd63bf
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -0,0 +1,542 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.service.AbstractService;
+import  org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+
+/**
+ * This implements a hbase based backend for storing the timeline entity
+ * information.
+ * It writes to multiple tables at the backend
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HBaseTimelineWriterImpl extends AbstractService implements
+    TimelineWriter {
+
+  private static final Log LOG = LogFactory
+      .getLog(HBaseTimelineWriterImpl.class);
+
+  private Connection conn;
+  private TypedBufferedMutator<EntityTable> entityTable;
+  private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
+  private TypedBufferedMutator<ApplicationTable> applicationTable;
+  private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
+  private TypedBufferedMutator<FlowRunTable> flowRunTable;
+
+  /**
+   * Used to convert strings key components to and from storage format.
+   */
+  private final KeyConverter<String> stringKeyConverter =
+      new StringKeyConverter();
+
+  /**
+   * Used to convert Long key components to and from storage format.
+   */
+  private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
+
+  public HBaseTimelineWriterImpl() {
+    super(HBaseTimelineWriterImpl.class.getName());
+  }
+
+  /**
+   * initializes the hbase connection to write to the entity table.
+   */
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    Configuration hbaseConf =
+        HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
+    conn = ConnectionFactory.createConnection(hbaseConf);
+    entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
+    appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
+    applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn);
+    flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn);
+    flowActivityTable =
+        new FlowActivityTable().getTableMutator(hbaseConf, conn);
+  }
+
+  /**
+   * Stores the entire information in TimelineEntities to the timeline store.
+   */
+  @Override
+  public TimelineWriteResponse write(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntities data) throws IOException {
+
+    TimelineWriteResponse putStatus = new TimelineWriteResponse();
+    // defensive coding to avoid NPE during row key construction
+    if ((flowName == null) || (appId == null) || (clusterId == null)
+        || (userId == null)) {
+      LOG.warn("Found null for one of: flowName=" + flowName + " appId=" + 
appId
+          + " userId=" + userId + " clusterId=" + clusterId
+          + " . Not proceeding with writing to hbase");
+      return putStatus;
+    }
+
+    for (TimelineEntity te : data.getEntities()) {
+
+      // a set can have at most 1 null
+      if (te == null) {
+        continue;
+      }
+
+      // if the entity is the application, the destination is the application
+      // table
+      boolean isApplication = ApplicationEntity.isApplicationEntity(te);
+      byte[] rowKey;
+      if (isApplication) {
+        ApplicationRowKey applicationRowKey =
+            new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
+                appId);
+        rowKey = applicationRowKey.getRowKey();
+      } else {
+        EntityRowKey entityRowKey =
+            new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
+                te.getType(), te.getIdPrefix(), te.getId());
+        rowKey = entityRowKey.getRowKey();
+      }
+
+      storeInfo(rowKey, te, flowVersion, isApplication);
+      storeEvents(rowKey, te.getEvents(), isApplication);
+      storeConfig(rowKey, te.getConfigs(), isApplication);
+      storeMetrics(rowKey, te.getMetrics(), isApplication);
+      storeRelations(rowKey, te, isApplication);
+
+      if (isApplication) {
+        TimelineEvent event =
+            ApplicationEntity.getApplicationEvent(te,
+                ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+        FlowRunRowKey flowRunRowKey =
+            new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
+        if (event != null) {
+          onApplicationCreated(flowRunRowKey, clusterId, appId, userId,
+              flowVersion, te, event.getTimestamp());
+        }
+        // if it's an application entity, store metrics
+        storeFlowMetricsAppRunning(flowRunRowKey, appId, te);
+        // if application has finished, store it's finish time and write final
+        // values of all metrics
+        event = ApplicationEntity.getApplicationEvent(te,
+            ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+        if (event != null) {
+          onApplicationFinished(flowRunRowKey, flowVersion, appId, te,
+              event.getTimestamp());
+        }
+      }
+    }
+    return putStatus;
+  }
+
+  private void onApplicationCreated(FlowRunRowKey flowRunRowKey,
+      String clusterId, String appId, String userId, String flowVersion,
+      TimelineEntity te, long appCreatedTimeStamp)
+      throws IOException {
+
+    String flowName = flowRunRowKey.getFlowName();
+    Long flowRunId = flowRunRowKey.getFlowRunId();
+
+    // store in App to flow table
+    AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId);
+    byte[] rowKey = appToFlowRowKey.getRowKey();
+    AppToFlowColumnPrefix.FLOW_NAME.store(rowKey, appToFlowTable, clusterId,
+        null, flowName);
+    AppToFlowColumnPrefix.FLOW_RUN_ID.store(rowKey, appToFlowTable, clusterId,
+        null, flowRunId);
+    AppToFlowColumnPrefix.USER_ID.store(rowKey, appToFlowTable, clusterId, 
null,
+        userId);
+
+    // store in flow run table
+    storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te);
+
+    // store in flow activity table
+    byte[] flowActivityRowKeyBytes =
+        new FlowActivityRowKey(flowRunRowKey.getClusterId(),
+            appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName)
+            .getRowKey();
+    byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
+    FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes,
+        flowActivityTable, qualifier, null, flowVersion,
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+  }
+
+  /*
+   * updates the {@link FlowRunTable} with Application Created information
+   */
+  private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey,
+      String appId, TimelineEntity te) throws IOException {
+    byte[] rowKey = flowRunRowKey.getRowKey();
+    FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null,
+        te.getCreatedTime(),
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+  }
+
+
+  /*
+   * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an
+   * application has finished
+   */
+  private void onApplicationFinished(FlowRunRowKey flowRunRowKey,
+      String flowVersion, String appId, TimelineEntity te,
+      long appFinishedTimeStamp) throws IOException {
+    // store in flow run table
+    storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te,
+        appFinishedTimeStamp);
+
+    // indicate in the flow activity table that the app has finished
+    byte[] rowKey =
+        new FlowActivityRowKey(flowRunRowKey.getClusterId(),
+            appFinishedTimeStamp, flowRunRowKey.getUserId(),
+            flowRunRowKey.getFlowName()).getRowKey();
+    byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
+    FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
+        null, flowVersion,
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+  }
+
+  /*
+   * Update the {@link FlowRunTable} with Application Finished information
+   */
+  private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey,
+      String appId, TimelineEntity te, long appFinishedTimeStamp)
+      throws IOException {
+    byte[] rowKey = flowRunRowKey.getRowKey();
+    Attribute attributeAppId =
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId);
+    FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
+        appFinishedTimeStamp, attributeAppId);
+
+    // store the final value of metrics since application has finished
+    Set<TimelineMetric> metrics = te.getMetrics();
+    if (metrics != null) {
+      storeFlowMetrics(rowKey, metrics, attributeAppId,
+          AggregationOperation.SUM_FINAL.getAttribute());
+    }
+  }
+
+  /*
+   * Updates the {@link FlowRunTable} with Application Metrics
+   */
+  private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey,
+      String appId, TimelineEntity te) throws IOException {
+    Set<TimelineMetric> metrics = te.getMetrics();
+    if (metrics != null) {
+      byte[] rowKey = flowRunRowKey.getRowKey();
+      storeFlowMetrics(rowKey, metrics,
+          AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId),
+          AggregationOperation.SUM.getAttribute());
+    }
+  }
+
+  private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
+      Attribute... attributes) throws IOException {
+    for (TimelineMetric metric : metrics) {
+      byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId());
+      Map<Long, Number> timeseries = metric.getValues();
+      for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+        Long timestamp = timeseriesEntry.getKey();
+        FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable,
+            metricColumnQualifier, timestamp, timeseriesEntry.getValue(),
+            attributes);
+      }
+    }
+  }
+
+  private void storeRelations(byte[] rowKey, TimelineEntity te,
+      boolean isApplication) throws IOException {
+    if (isApplication) {
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          ApplicationColumnPrefix.IS_RELATED_TO, applicationTable);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          ApplicationColumnPrefix.RELATES_TO, applicationTable);
+    } else {
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          EntityColumnPrefix.IS_RELATED_TO, entityTable);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          EntityColumnPrefix.RELATES_TO, entityTable);
+    }
+  }
+
+  /**
+   * Stores the Relations from the {@linkplain TimelineEntity} object.
+   */
+  private <T> void storeRelations(byte[] rowKey,
+      Map<String, Set<String>> connectedEntities,
+      ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
+          throws IOException {
+    for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
+        .entrySet()) {
+      // id3?id4?id5
+      String compoundValue =
+          Separator.VALUES.joinEncoded(connectedEntity.getValue());
+      columnPrefix.store(rowKey, table,
+          stringKeyConverter.encode(connectedEntity.getKey()), null,
+          compoundValue);
+    }
+  }
+
+  /**
+   * Stores information from the {@linkplain TimelineEntity} object.
+   */
+  private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion,
+      boolean isApplication) throws IOException {
+
+    if (isApplication) {
+      ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId());
+      ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null,
+          te.getCreatedTime());
+      ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null,
+          flowVersion);
+      Map<String, Object> info = te.getInfo();
+      if (info != null) {
+        for (Map.Entry<String, Object> entry : info.entrySet()) {
+          ApplicationColumnPrefix.INFO.store(rowKey, applicationTable,
+              stringKeyConverter.encode(entry.getKey()), null,
+              entry.getValue());
+        }
+      }
+    } else {
+      EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
+      EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
+      EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
+          te.getCreatedTime());
+      EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
+      Map<String, Object> info = te.getInfo();
+      if (info != null) {
+        for (Map.Entry<String, Object> entry : info.entrySet()) {
+          EntityColumnPrefix.INFO.store(rowKey, entityTable,
+              stringKeyConverter.encode(entry.getKey()), null,
+              entry.getValue());
+        }
+      }
+    }
+  }
+
+  /**
+   * stores the config information from {@linkplain TimelineEntity}.
+   */
+  private void storeConfig(byte[] rowKey, Map<String, String> config,
+      boolean isApplication) throws IOException {
+    if (config == null) {
+      return;
+    }
+    for (Map.Entry<String, String> entry : config.entrySet()) {
+      byte[] configKey = stringKeyConverter.encode(entry.getKey());
+      if (isApplication) {
+        ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable,
+            configKey, null, entry.getValue());
+      } else {
+        EntityColumnPrefix.CONFIG.store(rowKey, entityTable, configKey,
+            null, entry.getValue());
+      }
+    }
+  }
+
+  /**
+   * stores the {@linkplain TimelineMetric} information from the
+   * {@linkplain TimelineEvent} object.
+   */
+  private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
+      boolean isApplication) throws IOException {
+    if (metrics != null) {
+      for (TimelineMetric metric : metrics) {
+        byte[] metricColumnQualifier =
+            stringKeyConverter.encode(metric.getId());
+        Map<Long, Number> timeseries = metric.getValues();
+        for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+          Long timestamp = timeseriesEntry.getKey();
+          if (isApplication) {
+            ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable,
+                metricColumnQualifier, timestamp, timeseriesEntry.getValue());
+          } else {
+            EntityColumnPrefix.METRIC.store(rowKey, entityTable,
+                metricColumnQualifier, timestamp, timeseriesEntry.getValue());
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Stores the events from the {@linkplain TimelineEvent} object.
+   */
+  private void storeEvents(byte[] rowKey, Set<TimelineEvent> events,
+      boolean isApplication) throws IOException {
+    if (events != null) {
+      for (TimelineEvent event : events) {
+        if (event != null) {
+          String eventId = event.getId();
+          if (eventId != null) {
+            long eventTimestamp = event.getTimestamp();
+            // if the timestamp is not set, use the current timestamp
+            if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) {
+              LOG.warn("timestamp is not set for event " + eventId +
+                  "! Using the current timestamp");
+              eventTimestamp = System.currentTimeMillis();
+            }
+            Map<String, Object> eventInfo = event.getInfo();
+            if ((eventInfo == null) || (eventInfo.size() == 0)) {
+              byte[] columnQualifierBytes =
+                  new EventColumnName(eventId, eventTimestamp, null)
+                      .getColumnQualifier();
+              if (isApplication) {
+                ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
+                    columnQualifierBytes, null, Separator.EMPTY_BYTES);
+              } else {
+                EntityColumnPrefix.EVENT.store(rowKey, entityTable,
+                    columnQualifierBytes, null, Separator.EMPTY_BYTES);
+              }
+            } else {
+              for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
+                // eventId=infoKey
+                byte[] columnQualifierBytes =
+                    new EventColumnName(eventId, eventTimestamp, info.getKey())
+                        .getColumnQualifier();
+                if (isApplication) {
+                  ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
+                      columnQualifierBytes, null, info.getValue());
+                } else {
+                  EntityColumnPrefix.EVENT.store(rowKey, entityTable,
+                      columnQualifierBytes, null, info.getValue());
+                }
+              } // for info: eventInfo
+            }
+          }
+        }
+      } // event : events
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage
+   * .TimelineWriter#aggregate
+   * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity,
+   * org.apache
+   * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack)
+   */
+  @Override
+  public TimelineWriteResponse aggregate(TimelineEntity data,
+      TimelineAggregationTrack track) throws IOException {
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush
+   * ()
+   */
+  @Override
+  public void flush() throws IOException {
+    // flush all buffered mutators
+    entityTable.flush();
+    appToFlowTable.flush();
+    applicationTable.flush();
+    flowRunTable.flush();
+    flowActivityTable.flush();
+  }
+
+  /**
+   * close the hbase connections The close APIs perform flushing and release 
any
+   * resources held.
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    if (entityTable != null) {
+      LOG.info("closing the entity table");
+      // The close API performs flushing and releases any resources held
+      entityTable.close();
+    }
+    if (appToFlowTable != null) {
+      LOG.info("closing the app_flow table");
+      // The close API performs flushing and releases any resources held
+      appToFlowTable.close();
+    }
+    if (applicationTable != null) {
+      LOG.info("closing the application table");
+      applicationTable.close();
+    }
+    if (flowRunTable != null) {
+      LOG.info("closing the flow run table");
+      // The close API performs flushing and releases any resources held
+      flowRunTable.close();
+    }
+    if (flowActivityTable != null) {
+      LOG.info("closing the flowActivityTable table");
+      // The close API performs flushing and releases any resources held
+      flowActivityTable.close();
+    }
+    if (conn != null) {
+      LOG.info("closing the hbase Connection");
+      conn.close();
+    }
+    super.serviceStop();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
new file mode 100644
index 0000000..dd87169
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.util.GenericOptionsParser;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This creates the schema for a hbase based backend for storing application
+ * timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class TimelineSchemaCreator {
+  private TimelineSchemaCreator() {
+  }
+
+  final static String NAME = TimelineSchemaCreator.class.getSimpleName();
+  private static final Log LOG = 
LogFactory.getLog(TimelineSchemaCreator.class);
+  private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s";
+  private static final String APP_TABLE_NAME_SHORT = "a";
+  private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f";
+  private static final String TTL_OPTION_SHORT = "m";
+  private static final String ENTITY_TABLE_NAME_SHORT = "e";
+
+  public static void main(String[] args) throws Exception {
+
+    Configuration hbaseConf =
+        HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(null);
+    // Grab input args and allow for -Dxyz style arguments
+    String[] otherArgs = new GenericOptionsParser(hbaseConf, args)
+        .getRemainingArgs();
+
+    // Grab the arguments we're looking for.
+    CommandLine commandLine = parseArgs(otherArgs);
+
+    // Grab the entityTableName argument
+    String entityTableName
+        = commandLine.getOptionValue(ENTITY_TABLE_NAME_SHORT);
+    if (StringUtils.isNotBlank(entityTableName)) {
+      hbaseConf.set(EntityTable.TABLE_NAME_CONF_NAME, entityTableName);
+    }
+    String entityTableTTLMetrics = 
commandLine.getOptionValue(TTL_OPTION_SHORT);
+    if (StringUtils.isNotBlank(entityTableTTLMetrics)) {
+      int metricsTTL = Integer.parseInt(entityTableTTLMetrics);
+      new EntityTable().setMetricsTTL(metricsTTL, hbaseConf);
+    }
+    // Grab the appToflowTableName argument
+    String appToflowTableName = commandLine.getOptionValue(
+        APP_TO_FLOW_TABLE_NAME_SHORT);
+    if (StringUtils.isNotBlank(appToflowTableName)) {
+      hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName);
+    }
+    // Grab the applicationTableName argument
+    String applicationTableName = commandLine.getOptionValue(
+        APP_TABLE_NAME_SHORT);
+    if (StringUtils.isNotBlank(applicationTableName)) {
+      hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME,
+          applicationTableName);
+    }
+
+    List<Exception> exceptions = new ArrayList<>();
+    try {
+      boolean skipExisting
+          = commandLine.hasOption(SKIP_EXISTING_TABLE_OPTION_SHORT);
+      if (skipExisting) {
+        LOG.info("Will skip existing tables and continue on htable creation "
+            + "exceptions!");
+      }
+      createAllTables(hbaseConf, skipExisting);
+      LOG.info("Successfully created HBase schema. ");
+    } catch (IOException e) {
+      LOG.error("Error in creating hbase tables: " + e.getMessage());
+      exceptions.add(e);
+    }
+
+    if (exceptions.size() > 0) {
+      LOG.warn("Schema creation finished with the following exceptions");
+      for (Exception e : exceptions) {
+        LOG.warn(e.getMessage());
+      }
+      System.exit(-1);
+    } else {
+      LOG.info("Schema creation finished successfully");
+    }
+  }
+
+  /**
+   * Parse command-line arguments.
+   *
+   * @param args
+   *          command line arguments passed to program.
+   * @return parsed command line.
+   * @throws ParseException
+   */
+  private static CommandLine parseArgs(String[] args) throws ParseException {
+    Options options = new Options();
+
+    // Input
+    Option o = new Option(ENTITY_TABLE_NAME_SHORT, "entityTableName", true,
+        "entity table name");
+    o.setArgName("entityTableName");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(TTL_OPTION_SHORT, "metricsTTL", true,
+        "TTL for metrics column family");
+    o.setArgName("metricsTTL");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(APP_TO_FLOW_TABLE_NAME_SHORT, "appToflowTableName", true,
+        "app to flow table name");
+    o.setArgName("appToflowTableName");
+    o.setRequired(false);
+    options.addOption(o);
+
+    o = new Option(APP_TABLE_NAME_SHORT, "applicationTableName", true,
+        "application table name");
+    o.setArgName("applicationTableName");
+    o.setRequired(false);
+    options.addOption(o);
+
+    // Options without an argument
+    // No need to set arg name since we do not need an argument here
+    o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable",
+        false, "skip existing Hbase tables and continue to create new tables");
+    o.setRequired(false);
+    options.addOption(o);
+
+    CommandLineParser parser = new PosixParser();
+    CommandLine commandLine = null;
+    try {
+      commandLine = parser.parse(options, args);
+    } catch (Exception e) {
+      LOG.error("ERROR: " + e.getMessage() + "\n");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp(NAME + " ", options, true);
+      System.exit(-1);
+    }
+
+    return commandLine;
+  }
+
+  @VisibleForTesting
+  public static void createAllTables(Configuration hbaseConf,
+      boolean skipExisting) throws IOException {
+
+    Connection conn = null;
+    try {
+      conn = ConnectionFactory.createConnection(hbaseConf);
+      Admin admin = conn.getAdmin();
+      if (admin == null) {
+        throw new IOException("Cannot create table since admin is null");
+      }
+      try {
+        new EntityTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new AppToFlowTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new ApplicationTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new FlowRunTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new FlowActivityTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+    } finally {
+      if (conn != null) {
+        conn.close();
+      }
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
new file mode 100644
index 0000000..dde3911
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+
+/**
+ * Identifies fully qualified columns for the {@link ApplicationTable}.
+ */
+public enum ApplicationColumn implements Column<ApplicationTable> {
+
+  /**
+   * App id.
+   */
+  ID(ApplicationColumnFamily.INFO, "id"),
+
+  /**
+   * When the application was created.
+   */
+  CREATED_TIME(ApplicationColumnFamily.INFO, "created_time",
+      new LongConverter()),
+
+  /**
+   * The version of the flow that this app belongs to.
+   */
+  FLOW_VERSION(ApplicationColumnFamily.INFO, "flow_version");
+
+  private final ColumnHelper<ApplicationTable> column;
+  private final ColumnFamily<ApplicationTable> columnFamily;
+  private final String columnQualifier;
+  private final byte[] columnQualifierBytes;
+
+  private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily,
+      String columnQualifier) {
+    this(columnFamily, columnQualifier, GenericConverter.getInstance());
+  }
+
+  private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily,
+      String columnQualifier, ValueConverter converter) {
+    this.columnFamily = columnFamily;
+    this.columnQualifier = columnQualifier;
+    // Future-proof by ensuring the right column prefix hygiene.
+    this.columnQualifierBytes =
+        Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
+    this.column = new ColumnHelper<ApplicationTable>(columnFamily, converter);
+  }
+
+  /**
+   * @return the column name value
+   */
+  private String getColumnQualifier() {
+    return columnQualifier;
+  }
+
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp,
+      Object inputValue, Attribute... attributes) throws IOException {
+    column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
+        inputValue, attributes);
+  }
+
+  public Object readResult(Result result) throws IOException {
+    return column.readResult(result, columnQualifierBytes);
+  }
+
+  @Override
+  public byte[] getColumnQualifierBytes() {
+    return columnQualifierBytes.clone();
+  }
+
+  @Override
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
+  @Override
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
+  /**
+   * Retrieve an {@link ApplicationColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}.
+   *
+   * @param columnQualifier Name of the column to retrieve
+   * @return the corresponding {@link ApplicationColumn} or null
+   */
+  public static final ApplicationColumn columnFor(String columnQualifier) {
+
+    // Match column based on value, assume column family matches.
+    for (ApplicationColumn ac : ApplicationColumn.values()) {
+      // Find a match based only on name.
+      if (ac.getColumnQualifier().equals(columnQualifier)) {
+        return ac;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link ApplicationColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+   * if and only if {@code a.equals(b) & x.equals(y)} or
+   * {@code (x == y == null)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param name Name of the column to retrieve
+   * @return the corresponding {@link ApplicationColumn} or null if both
+   *         arguments don't match.
+   */
+  public static final ApplicationColumn columnFor(
+      ApplicationColumnFamily columnFamily, String name) {
+
+    for (ApplicationColumn ac : ApplicationColumn.values()) {
+      // Find a match based column family and on name.
+      if (ac.columnFamily.equals(columnFamily)
+          && ac.getColumnQualifier().equals(name)) {
+        return ac;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to