http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
new file mode 100644
index 0000000..40cdd2c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Identifies the attributes to be set for puts into the {@link FlowRunTable}.
+ * The numbers used for tagType are prime numbers.
+ */
+public enum AggregationOperation {
+
+  /**
+   * When the flow was started.
+   */
+  GLOBAL_MIN((byte) 71),
+
+  /**
+   * When it ended.
+   */
+  GLOBAL_MAX((byte) 73),
+
+  /**
+   * The metrics of the flow.
+   */
+  SUM((byte) 79),
+
+  /**
+   * application running.
+   */
+  SUM_FINAL((byte) 83),
+
+  /**
+   * Min value as per the latest timestamp
+   * seen for a given app.
+   */
+  LATEST_MIN((byte) 89),
+
+  /**
+   * Max value as per the latest timestamp
+   * seen for a given app.
+   */
+  LATEST_MAX((byte) 97);
+
+  private byte tagType;
+  private byte[] inBytes;
+
+  private AggregationOperation(byte tagType) {
+    this.tagType = tagType;
+    this.inBytes = Bytes.toBytes(this.name());
+  }
+
+  public Attribute getAttribute() {
+    return new Attribute(this.name(), this.inBytes);
+  }
+
+  public byte getTagType() {
+    return tagType;
+  }
+
+  public byte[] getInBytes() {
+    return this.inBytes.clone();
+  }
+
+  /**
+   * returns the AggregationOperation enum that represents that string.
+   * @param aggOpStr Aggregation operation.
+   * @return the AggregationOperation enum that represents that string
+   */
+  public static AggregationOperation getAggregationOperation(String aggOpStr) {
+    for (AggregationOperation aggOp : AggregationOperation.values()) {
+      if (aggOp.name().equals(aggOpStr)) {
+        return aggOp;
+      }
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
new file mode 100644
index 0000000..d3de518
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+/**
+ * Defines the attribute tuple to be set for puts into the {@link 
FlowRunTable}.
+ */
+public class Attribute {
+  private final String name;
+  private final byte[] value;
+
+  public Attribute(String name, byte[] value) {
+    this.name = name;
+    this.value = value.clone();
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public byte[] getValue() {
+    return value.clone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
new file mode 100644
index 0000000..f9eb5b4
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the flow run table column families.
+ */
+public enum FlowActivityColumnFamily
+    implements ColumnFamily<FlowActivityTable> {
+
+  /**
+   * Info column family houses known columns, specifically ones included in
+   * columnfamily filters.
+   */
+  INFO("i");
+
+  /**
+   * Byte representation of this column family.
+   */
+  private final byte[] bytes;
+
+  /**
+   * @param value
+   *          create a column family with this name. Must be lower case and
+   *          without spaces.
+   */
+  private FlowActivityColumnFamily(String value) {
+    // column families should be lower case and not contain any spaces.
+    this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+  }
+
+  public byte[] getBytes() {
+    return Bytes.copy(bytes);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
new file mode 100644
index 0000000..71c3d90
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+
+/**
+ * Identifies partially qualified columns for the {@link FlowActivityTable}.
+ */
+public enum FlowActivityColumnPrefix
+    implements ColumnPrefix<FlowActivityTable> {
+
+  /**
+   * To store run ids of the flows.
+   */
+  RUN_ID(FlowActivityColumnFamily.INFO, "r", null);
+
+  private final ColumnHelper<FlowActivityTable> column;
+  private final ColumnFamily<FlowActivityTable> columnFamily;
+
+  /**
+   * Can be null for those cases where the provided column qualifier is the
+   * entire column name.
+   */
+  private final String columnPrefix;
+  private final byte[] columnPrefixBytes;
+
+  private final AggregationOperation aggOp;
+
+  /**
+   * Private constructor, meant to be used by the enum definition.
+   *
+   * @param columnFamily
+   *          that this column is stored in.
+   * @param columnPrefix
+   *          for this column.
+   */
+  private FlowActivityColumnPrefix(
+      ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix,
+      AggregationOperation aggOp) {
+    this(columnFamily, columnPrefix, aggOp, false);
+  }
+
+  private FlowActivityColumnPrefix(
+      ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix,
+      AggregationOperation aggOp, boolean compoundColQual) {
+    column = new ColumnHelper<FlowActivityTable>(columnFamily);
+    this.columnFamily = columnFamily;
+    this.columnPrefix = columnPrefix;
+    if (columnPrefix == null) {
+      this.columnPrefixBytes = null;
+    } else {
+      // Future-proof by ensuring the right column prefix hygiene.
+      this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
+          .encode(columnPrefix));
+    }
+    this.aggOp = aggOp;
+  }
+
+  /**
+   * @return the column name value
+   */
+  public String getColumnPrefix() {
+    return columnPrefix;
+  }
+
+  @Override
+  public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifierPrefix);
+  }
+
+  @Override
+  public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifierPrefix);
+  }
+
+  public byte[] getColumnPrefixBytes() {
+    return columnPrefixBytes.clone();
+  }
+
+  @Override
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
+  @Override
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
+  public AggregationOperation getAttribute() {
+    return aggOp;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, byte[], java.lang.Long, java.lang.Object,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+   */
+  @Override
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowActivityTable> tableMutator, byte[] qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
+        attributes, this.aggOp);
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        combinedAttributes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+   */
+  public Object readResult(Result result, String qualifier) throws IOException 
{
+    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifier);
+    return column.readResult(result, columnQualifier);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResults(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
+   */
+  public <K> Map<K, Object> readResults(Result result,
+      KeyConverter<K> keyConverter) throws IOException {
+    return column.readResults(result, columnPrefixBytes, keyConverter);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
+   */
+  public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
+      throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes,
+        keyConverter);
+  }
+
+  /**
+   * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if 
there
+   * is no match. The following holds true: {@code columnFor(x) == 
columnFor(y)}
+   * if and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnPrefix
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowActivityColumnPrefix} or null
+   */
+  public static final FlowActivityColumnPrefix columnFor(String columnPrefix) {
+
+    // Match column based on value, assume column family matches.
+    for (FlowActivityColumnPrefix flowActivityColPrefix :
+        FlowActivityColumnPrefix.values()) {
+      // Find a match based only on name.
+      if (flowActivityColPrefix.getColumnPrefix().equals(columnPrefix)) {
+        return flowActivityColPrefix;
+      }
+    }
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if 
there
+   * is no match. The following holds true:
+   * {@code columnFor(a,x) == columnFor(b,y)} if and only if
+   * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
+   *
+   * @param columnFamily
+   *          The columnFamily for which to retrieve the column.
+   * @param columnPrefix
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowActivityColumnPrefix} or null if both
+   *         arguments don't match.
+   */
+  public static final FlowActivityColumnPrefix columnFor(
+      FlowActivityColumnFamily columnFamily, String columnPrefix) {
+
+    // TODO: needs unit test to confirm and need to update javadoc to explain
+    // null prefix case.
+
+    for (FlowActivityColumnPrefix flowActivityColumnPrefix :
+        FlowActivityColumnPrefix.values()) {
+      // Find a match based column family and on name.
+      if (flowActivityColumnPrefix.columnFamily.equals(columnFamily)
+          && (((columnPrefix == null) && (flowActivityColumnPrefix
+              .getColumnPrefix() == null)) || (flowActivityColumnPrefix
+              .getColumnPrefix().equals(columnPrefix)))) {
+        return flowActivityColumnPrefix;
+      }
+    }
+    // Default to null
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+   */
+  @Override
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowActivityTable> tableMutator, String qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
+        attributes, this.aggOp);
+    column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
+        combinedAttributes);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
new file mode 100644
index 0000000..d10608a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+
+/**
+ * Represents a rowkey for the flow activity table.
+ */
+public class FlowActivityRowKey {
+
+  private final String clusterId;
+  private final Long dayTs;
+  private final String userId;
+  private final String flowName;
+  private final KeyConverter<FlowActivityRowKey> flowActivityRowKeyConverter =
+      new FlowActivityRowKeyConverter();
+
+  /**
+   * @param clusterId identifying the cluster
+   * @param dayTs to be converted to the top of the day timestamp
+   * @param userId identifying user
+   * @param flowName identifying the flow
+   */
+  public FlowActivityRowKey(String clusterId, Long dayTs, String userId,
+      String flowName) {
+    this(clusterId, dayTs, userId, flowName, true);
+  }
+
+  /**
+   * @param clusterId identifying the cluster
+   * @param timestamp when the flow activity happened. May be converted to the
+   *          top of the day depending on the convertDayTsToTopOfDay argument.
+   * @param userId identifying user
+   * @param flowName identifying the flow
+   * @param convertDayTsToTopOfDay if true and timestamp isn't null, then
+   *          timestamp will be converted to the top-of-the day timestamp
+   */
+  protected FlowActivityRowKey(String clusterId, Long timestamp, String userId,
+      String flowName, boolean convertDayTsToTopOfDay) {
+    this.clusterId = clusterId;
+    if (convertDayTsToTopOfDay && (timestamp != null)) {
+      this.dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(timestamp);
+    } else {
+      this.dayTs = timestamp;
+    }
+    this.userId = userId;
+    this.flowName = flowName;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public Long getDayTimestamp() {
+    return dayTs;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getFlowName() {
+    return flowName;
+  }
+
+  /**
+   * Constructs a row key for the flow activity table as follows:
+   * {@code clusterId!dayTimestamp!user!flowName}.
+   *
+   * @return byte array for the row key
+   */
+  public byte[] getRowKey() {
+    return flowActivityRowKeyConverter.encode(this);
+  }
+
+  /**
+   * Given the raw row key as bytes, returns the row key as an object.
+   *
+   * @param rowKey Byte representation of row key.
+   * @return A <cite>FlowActivityRowKey</cite> object.
+   */
+  public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
+    return new FlowActivityRowKeyConverter().decode(rowKey);
+  }
+
+  /**
+   * Encodes and decodes row key for flow activity table. The row key is of the
+   * form : clusterId!dayTimestamp!user!flowName. dayTimestamp(top of the day
+   * timestamp) is a long and rest are strings.
+   * <p>
+   */
+  final private static class FlowActivityRowKeyConverter implements
+      KeyConverter<FlowActivityRowKey> {
+
+    private FlowActivityRowKeyConverter() {
+    }
+
+    /**
+     * The flow activity row key is of the form
+     * clusterId!dayTimestamp!user!flowName with each segment separated by !.
+     * The sizes below indicate sizes of each one of these segements in
+     * sequence. clusterId, user and flowName are strings. Top of the day
+     * timestamp is a long hence 8 bytes in size. Strings are variable in size
+     * (i.e. they end whenever separator is encountered). This is used while
+     * decoding and helps in determining where to split.
+     */
+    private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
+        Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE };
+
+    /*
+     * (non-Javadoc)
+     *
+     * Encodes FlowActivityRowKey object into a byte array with each
+     * component/field in FlowActivityRowKey separated by Separator#QUALIFIERS.
+     * This leads to an flow activity table row key of the form
+     * clusterId!dayTimestamp!user!flowName. If dayTimestamp in passed
+     * FlowActivityRowKey object is null and clusterId is not null, then this
+     * returns a row key prefix as clusterId! and if userId in
+     * FlowActivityRowKey is null (and the fields preceding it i.e. clusterId
+     * and dayTimestamp are not null), this returns a row key prefix as
+     * clusterId!dayTimeStamp! dayTimestamp is inverted while encoding as it
+     * helps maintain a descending order for row keys in flow activity table.
+     *
+     * @see org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#encode(java.lang.Object)
+     */
+    @Override
+    public byte[] encode(FlowActivityRowKey rowKey) {
+      if (rowKey.getDayTimestamp() == null) {
+        return Separator.QUALIFIERS.join(Separator.encode(
+            rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
+            Separator.QUALIFIERS), Separator.EMPTY_BYTES);
+      }
+      if (rowKey.getUserId() == null) {
+        return Separator.QUALIFIERS.join(Separator.encode(
+            rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
+            Separator.QUALIFIERS), Bytes.toBytes(LongConverter
+            .invertLong(rowKey.getDayTimestamp())), Separator.EMPTY_BYTES);
+      }
+      return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
+          Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Bytes
+          .toBytes(LongConverter.invertLong(rowKey.getDayTimestamp())),
+          Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+              Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(),
+              Separator.SPACE, Separator.TAB, Separator.QUALIFIERS));
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#decode(byte[])
+     */
+    @Override
+    public FlowActivityRowKey decode(byte[] rowKey) {
+      byte[][] rowKeyComponents =
+          Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+      if (rowKeyComponents.length != 4) {
+        throw new IllegalArgumentException("the row key is not valid for "
+            + "a flow activity");
+      }
+      String clusterId =
+          Separator.decode(Bytes.toString(rowKeyComponents[0]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      Long dayTs = LongConverter.invertLong(Bytes.toLong(rowKeyComponents[1]));
+      String userId =
+          Separator.decode(Bytes.toString(rowKeyComponents[2]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String flowName =
+          Separator.decode(Bytes.toString(rowKeyComponents[3]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java
new file mode 100644
index 0000000..eb88e54
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyPrefix.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+
+/**
+ * A prefix partial rowkey for flow activities.
+ */
+public class FlowActivityRowKeyPrefix extends FlowActivityRowKey implements
+    RowKeyPrefix<FlowActivityRowKey> {
+
+  /**
+   * Constructs a row key prefix for the flow activity table as follows:
+   * {@code clusterId!dayTimestamp!}.
+   *
+   * @param clusterId Cluster Id.
+   * @param dayTs Start of the day timestamp.
+   */
+  public FlowActivityRowKeyPrefix(String clusterId, Long dayTs) {
+    super(clusterId, dayTs, null, null, false);
+  }
+
+  /**
+   * Constructs a row key prefix for the flow activity table as follows:
+   * {@code clusterId!}.
+   *
+   * @param clusterId identifying the cluster
+   */
+  public FlowActivityRowKeyPrefix(String clusterId) {
+    super(clusterId, null, null, null, false);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.application.
+   * RowKeyPrefix#getRowKeyPrefix()
+   */
+  public byte[] getRowKeyPrefix() {
+    return super.getRowKey();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java
new file mode 100644
index 0000000..8a0430c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+
+/**
+ * The flow activity table has column family info
+ * Stores the daily activity record for flows
+ * Useful as a quick lookup of what flows were
+ * running on a given day
+ *
+ * Example flow activity table record:
+ *
+ * <pre>
+ * |-------------------------------------------|
+ * |  Row key   | Column Family                |
+ * |            | info                         |
+ * |-------------------------------------------|
+ * | clusterId! | r!runid1:version1            |
+ * | inv Top of |                              |
+ * | Day!       | r!runid2:version7            |
+ * | userName!  |                              |
+ * | flowName   |                              |
+ * |-------------------------------------------|
+ * </pre>
+ */
+public class FlowActivityTable extends BaseTable<FlowActivityTable> {
+  /** flow activity table prefix. */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowactivity";
+
+  /** config param name that specifies the flowactivity table name. */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /** default value for flowactivity table name. */
+  public static final String DEFAULT_TABLE_NAME =
+      "timelineservice.flowactivity";
+
+  private static final Log LOG = LogFactory.getLog(FlowActivityTable.class);
+
+  /** default max number of versions. */
+  public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
+
+  public FlowActivityTable() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * 
org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+   * (org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor flowActivityTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor infoCF =
+        new HColumnDescriptor(FlowActivityColumnFamily.INFO.getBytes());
+    infoCF.setBloomFilterType(BloomType.ROWCOL);
+    flowActivityTableDescp.addFamily(infoCF);
+    infoCF.setMinVersions(1);
+    infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+
+    // TODO: figure the split policy before running in production
+    admin.createTable(flowActivityTableDescp);
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
new file mode 100644
index 0000000..2e7a9d8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+
+/**
+ * Identifies fully qualified columns for the {@link FlowRunTable}.
+ */
+public enum FlowRunColumn implements Column<FlowRunTable> {
+
+  /**
+   * When the flow was started. This is the minimum of currently known
+   * application start times.
+   */
+  MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time",
+      AggregationOperation.GLOBAL_MIN, new LongConverter()),
+
+  /**
+   * When the flow ended. This is the maximum of currently known application 
end
+   * times.
+   */
+  MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time",
+      AggregationOperation.GLOBAL_MAX, new LongConverter()),
+
+  /**
+   * The version of the flow that this flow belongs to.
+   */
+  FLOW_VERSION(FlowRunColumnFamily.INFO, "flow_version", null);
+
+  private final ColumnHelper<FlowRunTable> column;
+  private final ColumnFamily<FlowRunTable> columnFamily;
+  private final String columnQualifier;
+  private final byte[] columnQualifierBytes;
+  private final AggregationOperation aggOp;
+
+  private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily,
+      String columnQualifier, AggregationOperation aggOp) {
+    this(columnFamily, columnQualifier, aggOp,
+        GenericConverter.getInstance());
+  }
+
+  private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily,
+      String columnQualifier, AggregationOperation aggOp,
+      ValueConverter converter) {
+    this.columnFamily = columnFamily;
+    this.columnQualifier = columnQualifier;
+    this.aggOp = aggOp;
+    // Future-proof by ensuring the right column prefix hygiene.
+    this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE
+        .encode(columnQualifier));
+    this.column = new ColumnHelper<FlowRunTable>(columnFamily, converter);
+  }
+
+  /**
+   * @return the column name value
+   */
+  private String getColumnQualifier() {
+    return columnQualifier;
+  }
+
+  @Override
+  public byte[] getColumnQualifierBytes() {
+    return columnQualifierBytes.clone();
+  }
+
+  @Override
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
+  public AggregationOperation getAggregationOperation() {
+    return aggOp;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.Column#store
+   * (byte[], org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.Long, java.lang.Object,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+   */
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
+      Object inputValue, Attribute... attributes) throws IOException {
+
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
+        attributes, aggOp);
+    column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
+        inputValue, combinedAttributes);
+  }
+
+  public Object readResult(Result result) throws IOException {
+    return column.readResult(result, columnQualifierBytes);
+  }
+
+  /**
+   * Retrieve an {@link FlowRunColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnQualifier
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowRunColumn} or null
+   */
+  public static final FlowRunColumn columnFor(String columnQualifier) {
+
+    // Match column based on value, assume column family matches.
+    for (FlowRunColumn ec : FlowRunColumn.values()) {
+      // Find a match based only on name.
+      if (ec.getColumnQualifier().equals(columnQualifier)) {
+        return ec;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  @Override
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
+  /**
+   * Retrieve an {@link FlowRunColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+   * if and only if {@code a.equals(b) & x.equals(y)} or
+   * {@code (x == y == null)}
+   *
+   * @param columnFamily
+   *          The columnFamily for which to retrieve the column.
+   * @param name
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowRunColumn} or null if both arguments
+   *         don't match.
+   */
+  public static final FlowRunColumn columnFor(FlowRunColumnFamily columnFamily,
+      String name) {
+
+    for (FlowRunColumn ec : FlowRunColumn.values()) {
+      // Find a match based column family and on name.
+      if (ec.columnFamily.equals(columnFamily)
+          && ec.getColumnQualifier().equals(name)) {
+        return ec;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java
new file mode 100644
index 0000000..8faf5f8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the flow run table column families.
+ */
+public enum FlowRunColumnFamily implements ColumnFamily<FlowRunTable> {
+
+  /**
+   * Info column family houses known columns, specifically ones included in
+   * columnfamily filters.
+   */
+  INFO("i");
+
+  /**
+   * Byte representation of this column family.
+   */
+  private final byte[] bytes;
+
+  /**
+   * @param value
+   *          create a column family with this name. Must be lower case and
+   *          without spaces.
+   */
+  private FlowRunColumnFamily(String value) {
+    // column families should be lower case and not contain any spaces.
+    this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+  }
+
+  public byte[] getBytes() {
+    return Bytes.copy(bytes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
new file mode 100644
index 0000000..e74282a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+
+/**
+ * Identifies partially qualified columns for the {@link FlowRunTable}.
+ */
+public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
+
+  /**
+   * To store flow run info values.
+   */
+  METRIC(FlowRunColumnFamily.INFO, "m", null, new LongConverter());
+
+  private final ColumnHelper<FlowRunTable> column;
+  private final ColumnFamily<FlowRunTable> columnFamily;
+
+  /**
+   * Can be null for those cases where the provided column qualifier is the
+   * entire column name.
+   */
+  private final String columnPrefix;
+  private final byte[] columnPrefixBytes;
+
+  private final AggregationOperation aggOp;
+
+  /**
+   * Private constructor, meant to be used by the enum definition.
+   *
+   * @param columnFamily that this column is stored in.
+   * @param columnPrefix for this column.
+   */
+  private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
+      String columnPrefix, AggregationOperation fra, ValueConverter converter) 
{
+    this(columnFamily, columnPrefix, fra, converter, false);
+  }
+
+  private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
+      String columnPrefix, AggregationOperation fra, ValueConverter converter,
+      boolean compoundColQual) {
+    column = new ColumnHelper<FlowRunTable>(columnFamily, converter);
+    this.columnFamily = columnFamily;
+    this.columnPrefix = columnPrefix;
+    if (columnPrefix == null) {
+      this.columnPrefixBytes = null;
+    } else {
+      // Future-proof by ensuring the right column prefix hygiene.
+      this.columnPrefixBytes =
+          Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
+    }
+    this.aggOp = fra;
+  }
+
+  /**
+   * @return the column name value
+   */
+  public String getColumnPrefix() {
+    return columnPrefix;
+  }
+
+  public byte[] getColumnPrefixBytes() {
+    return columnPrefixBytes.clone();
+  }
+
+  @Override
+  public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(this.columnPrefixBytes,
+        qualifierPrefix);
+  }
+
+  @Override
+  public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+    return ColumnHelper.getColumnQualifier(this.columnPrefixBytes,
+        qualifierPrefix);
+  }
+
+  @Override
+  public byte[] getColumnFamilyBytes() {
+    return columnFamily.getBytes();
+  }
+
+  public AggregationOperation getAttribute() {
+    return aggOp;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+   */
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowRunTable> tableMutator, String qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
+    Attribute[] combinedAttributes =
+        TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        combinedAttributes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+   */
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowRunTable> tableMutator, byte[] qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = getColumnPrefixBytes(qualifier);
+    Attribute[] combinedAttributes =
+        TimelineStorageUtils.combineAttributes(attributes, this.aggOp);
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        combinedAttributes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+   */
+  public Object readResult(Result result, String qualifier) throws IOException 
{
+    byte[] columnQualifier =
+        ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+    return column.readResult(result, columnQualifier);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResults(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
+   */
+  public <K> Map<K, Object> readResults(Result result,
+      KeyConverter<K> keyConverter) throws IOException {
+    return column.readResults(result, columnPrefixBytes, keyConverter);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
+   */
+  public <K, V> NavigableMap<K, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
+      throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes,
+        keyConverter);
+  }
+
+  /**
+   * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is
+   * no match. The following holds true: {@code columnFor(x) == columnFor(y)} 
if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnPrefix Name of the column to retrieve
+   * @return the corresponding {@link FlowRunColumnPrefix} or null
+   */
+  public static final FlowRunColumnPrefix columnFor(String columnPrefix) {
+
+    // Match column based on value, assume column family matches.
+    for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) {
+      // Find a match based only on name.
+      if (frcp.getColumnPrefix().equals(columnPrefix)) {
+        return frcp;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  @Override
+  public ValueConverter getValueConverter() {
+    return column.getValueConverter();
+  }
+
+  /**
+   * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is
+   * no match. The following holds true:
+   * {@code columnFor(a,x) == columnFor(b,y)} if and only if
+   * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param columnPrefix Name of the column to retrieve
+   * @return the corresponding {@link FlowRunColumnPrefix} or null if both
+   *         arguments don't match.
+   */
+  public static final FlowRunColumnPrefix columnFor(
+      FlowRunColumnFamily columnFamily, String columnPrefix) {
+
+    // TODO: needs unit test to confirm and need to update javadoc to explain
+    // null prefix case.
+
+    for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) {
+      // Find a match based column family and on name.
+      if (frcp.columnFamily.equals(columnFamily)
+          && (((columnPrefix == null) && (frcp.getColumnPrefix() == null)) ||
+          (frcp.getColumnPrefix().equals(columnPrefix)))) {
+        return frcp;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
new file mode 100644
index 0000000..a9dcfaa
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+
+/**
+ * Coprocessor for flow run table.
+ */
+public class FlowRunCoprocessor extends BaseRegionObserver {
+
+  private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
+  private boolean isFlowRunRegion = false;
+
+  private Region region;
+  /**
+   * generate a timestamp that is unique per row in a region this is per 
region.
+   */
+  private final TimestampGenerator timestampGenerator =
+      new TimestampGenerator();
+
+  @Override
+  public void start(CoprocessorEnvironment e) throws IOException {
+    if (e instanceof RegionCoprocessorEnvironment) {
+      RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+      this.region = env.getRegion();
+      isFlowRunRegion = TimelineStorageUtils.isFlowRunTable(
+          region.getRegionInfo(), env.getConfiguration());
+    }
+  }
+
+  public boolean isFlowRunRegion() {
+    return isFlowRunRegion;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * This method adds the tags onto the cells in the Put. It is presumed that
+   * all the cells in one Put have the same set of Tags. The existing cell
+   * timestamp is overwritten for non-metric cells and each such cell gets a 
new
+   * unique timestamp generated by {@link TimestampGenerator}
+   *
+   * @see
+   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache
+   * .hadoop.hbase.coprocessor.ObserverContext,
+   * org.apache.hadoop.hbase.client.Put,
+   * org.apache.hadoop.hbase.regionserver.wal.WALEdit,
+   * org.apache.hadoop.hbase.client.Durability)
+   */
+  @Override
+  public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
+      WALEdit edit, Durability durability) throws IOException {
+    Map<String, byte[]> attributes = put.getAttributesMap();
+
+    if (!isFlowRunRegion) {
+      return;
+    }
+    // Assumption is that all the cells in a put are the same operation.
+    List<Tag> tags = new ArrayList<>();
+    if ((attributes != null) && (attributes.size() > 0)) {
+      for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
+        Tag t = TimelineStorageUtils.getTagFromAttribute(attribute);
+        tags.add(t);
+      }
+      byte[] tagByteArray = Tag.fromList(tags);
+      NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
+          Bytes.BYTES_COMPARATOR);
+      for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()
+          .entrySet()) {
+        List<Cell> newCells = new ArrayList<>(entry.getValue().size());
+        for (Cell cell : entry.getValue()) {
+          // for each cell in the put add the tags
+          // Assumption is that all the cells in
+          // one put are the same operation
+          // also, get a unique cell timestamp for non-metric cells
+          // this way we don't inadvertently overwrite cell versions
+          long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags);
+          newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell),
+              CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
+              cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell),
+              tagByteArray));
+        }
+        newFamilyMap.put(entry.getKey(), newCells);
+      } // for each entry
+      // Update the family map for the Put
+      put.setFamilyCellMap(newFamilyMap);
+    }
+  }
+
+  /**
+   * Determines if the current cell's timestamp is to be used or a new unique
+   * cell timestamp is to be used. The reason this is done is to inadvertently
+   * overwrite cells when writes come in very fast. But for metric cells, the
+   * cell timestamp signifies the metric timestamp. Hence we don't want to
+   * overwrite it.
+   *
+   * @param timestamp
+   * @param tags
+   * @return cell timestamp
+   */
+  private long getCellTimestamp(long timestamp, List<Tag> tags) {
+    // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default)
+    // then use the generator
+    if (timestamp == HConstants.LATEST_TIMESTAMP) {
+      return timestampGenerator.getUniqueTimestamp();
+    } else {
+      return timestamp;
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Creates a {@link FlowScanner} Scan so that it can correctly process the
+   * contents of {@link FlowRunTable}.
+   *
+   * @see
+   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache
+   * .hadoop.hbase.coprocessor.ObserverContext,
+   * org.apache.hadoop.hbase.client.Get, java.util.List)
+   */
+  @Override
+  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
+      Get get, List<Cell> results) throws IOException {
+    if (!isFlowRunRegion) {
+      return;
+    }
+
+    Scan scan = new Scan(get);
+    scan.setMaxVersions();
+    RegionScanner scanner = null;
+    try {
+      scanner = new FlowScanner(e.getEnvironment(), scan,
+          region.getScanner(scan), FlowScannerOperation.READ);
+      scanner.next(results);
+      e.bypass();
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Ensures that max versions are set for the Scan so that metrics can be
+   * correctly aggregated and min/max can be correctly determined.
+   *
+   * @see
+   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org
+   * .apache.hadoop.hbase.coprocessor.ObserverContext,
+   * org.apache.hadoop.hbase.client.Scan,
+   * org.apache.hadoop.hbase.regionserver.RegionScanner)
+   */
+  @Override
+  public RegionScanner preScannerOpen(
+      ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
+      RegionScanner scanner) throws IOException {
+
+    if (isFlowRunRegion) {
+      // set max versions for scan to see all
+      // versions to aggregate for metrics
+      scan.setMaxVersions();
+    }
+    return scanner;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Creates a {@link FlowScanner} Scan so that it can correctly process the
+   * contents of {@link FlowRunTable}.
+   *
+   * @see
+   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen(
+   * org.apache.hadoop.hbase.coprocessor.ObserverContext,
+   * org.apache.hadoop.hbase.client.Scan,
+   * org.apache.hadoop.hbase.regionserver.RegionScanner)
+   */
+  @Override
+  public RegionScanner postScannerOpen(
+      ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
+      RegionScanner scanner) throws IOException {
+    if (!isFlowRunRegion) {
+      return scanner;
+    }
+    return new FlowScanner(e.getEnvironment(), scan,
+        scanner, FlowScannerOperation.READ);
+  }
+
+  @Override
+  public InternalScanner preFlush(
+      ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      InternalScanner scanner) throws IOException {
+    if (!isFlowRunRegion) {
+      return scanner;
+    }
+    if (LOG.isDebugEnabled()) {
+      if (store != null) {
+        LOG.debug("preFlush store = " + store.getColumnFamilyName()
+            + " flushableSize=" + store.getFlushableSize()
+            + " flushedCellsCount=" + store.getFlushedCellsCount()
+            + " compactedCellsCount=" + store.getCompactedCellsCount()
+            + " majorCompactedCellsCount="
+            + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
+            + store.getMemstoreFlushSize() + " memstoreSize="
+            + store.getMemStoreSize() + " size=" + store.getSize()
+            + " storeFilesCount=" + store.getStorefilesCount());
+      }
+    }
+    return new FlowScanner(c.getEnvironment(), scanner,
+        FlowScannerOperation.FLUSH);
+  }
+
+  @Override
+  public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
+      Store store, StoreFile resultFile) {
+    if (!isFlowRunRegion) {
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      if (store != null) {
+        LOG.debug("postFlush store = " + store.getColumnFamilyName()
+            + " flushableSize=" + store.getFlushableSize()
+            + " flushedCellsCount=" + store.getFlushedCellsCount()
+            + " compactedCellsCount=" + store.getCompactedCellsCount()
+            + " majorCompactedCellsCount="
+            + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
+            + store.getMemstoreFlushSize() + " memstoreSize="
+            + store.getMemStoreSize() + " size=" + store.getSize()
+            + " storeFilesCount=" + store.getStorefilesCount());
+      }
+    }
+  }
+
+  @Override
+  public InternalScanner preCompact(
+      ObserverContext<RegionCoprocessorEnvironment> e, Store store,
+      InternalScanner scanner, ScanType scanType, CompactionRequest request)
+      throws IOException {
+
+    if (!isFlowRunRegion) {
+      return scanner;
+    }
+    FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
+    if (request != null) {
+      requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
+          : FlowScannerOperation.MINOR_COMPACTION);
+      LOG.info("Compactionrequest= " + request.toString() + " "
+          + requestOp.toString() + " RegionName=" + e.getEnvironment()
+              .getRegion().getRegionInfo().getRegionNameAsString());
+    }
+    return new FlowScanner(e.getEnvironment(), scanner, requestOp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
new file mode 100644
index 0000000..8fda9a8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents a rowkey for the flow run table.
+ */
+public class FlowRunRowKey {
+  private final String clusterId;
+  private final String userId;
+  private final String flowName;
+  private final Long flowRunId;
+  private final FlowRunRowKeyConverter flowRunRowKeyConverter =
+      new FlowRunRowKeyConverter();
+
+  public FlowRunRowKey(String clusterId, String userId, String flowName,
+      Long flowRunId) {
+    this.clusterId = clusterId;
+    this.userId = userId;
+    this.flowName = flowName;
+    this.flowRunId = flowRunId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getFlowName() {
+    return flowName;
+  }
+
+  public Long getFlowRunId() {
+    return flowRunId;
+  }
+
+  /**
+   * Constructs a row key for the entity table as follows: {
+   * clusterId!userId!flowName!Inverted Flow Run Id}.
+   *
+   * @return byte array with the row key
+   */
+  public byte[] getRowKey() {
+    return flowRunRowKeyConverter.encode(this);
+  }
+
+
+  /**
+   * Given the raw row key as bytes, returns the row key as an object.
+   *
+   * @param rowKey Byte representation of row key.
+   * @return A <cite>FlowRunRowKey</cite> object.
+   */
+  public static FlowRunRowKey parseRowKey(byte[] rowKey) {
+    return new FlowRunRowKeyConverter().decode(rowKey);
+  }
+
+  /**
+   * returns the Flow Key as a verbose String output.
+   * @return String
+   */
+  @Override
+  public String toString() {
+    StringBuilder flowKeyStr = new StringBuilder();
+    flowKeyStr.append("{clusterId=" + clusterId);
+    flowKeyStr.append(" userId=" + userId);
+    flowKeyStr.append(" flowName=" + flowName);
+    flowKeyStr.append(" flowRunId=");
+    flowKeyStr.append(flowRunId);
+    flowKeyStr.append("}");
+    return flowKeyStr.toString();
+  }
+
+  /**
+   * Encodes and decodes row key for flow run table.
+   * The row key is of the form : clusterId!userId!flowName!flowrunId.
+   * flowrunId is a long and rest are strings.
+   * <p>
+   */
+  final private static class FlowRunRowKeyConverter implements
+      KeyConverter<FlowRunRowKey> {
+
+    private FlowRunRowKeyConverter() {
+    }
+
+    /**
+     * The flow run row key is of the form clusterId!userId!flowName!flowrunId
+     * with each segment separated by !. The sizes below indicate sizes of each
+     * one of these segments in sequence. clusterId, userId and flowName are
+     * strings. flowrunId is a long hence 8 bytes in size. Strings are variable
+     * in size (i.e. end whenever separator is encountered). This is used while
+     * decoding and helps in determining where to split.
+     */
+    private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
+        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG };
+
+    /*
+     * (non-Javadoc)
+     *
+     * Encodes FlowRunRowKey object into a byte array with each component/field
+     * in FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an 
flow
+     * run row key of the form clusterId!userId!flowName!flowrunId If flowRunId
+     * in passed FlowRunRowKey object is null (and the fields preceding it i.e.
+     * clusterId, userId and flowName are not null), this returns a row key
+     * prefix of the form clusterId!userName!flowName! flowRunId is inverted
+     * while encoding as it helps maintain a descending order for flow keys in
+     * flow run table.
+     *
+     * @see
+     * org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#encode(java.lang.Object)
+     */
+    @Override
+    public byte[] encode(FlowRunRowKey rowKey) {
+      byte[] first =
+          Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
+              Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Separator
+              .encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+                  Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(),
+              Separator.SPACE, Separator.TAB, Separator.QUALIFIERS));
+      if (rowKey.getFlowRunId() == null) {
+        return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
+      } else {
+        // Note that flowRunId is a long, so we can't encode them all at the
+        // same
+        // time.
+        byte[] second =
+            Bytes.toBytes(LongConverter.invertLong(rowKey.getFlowRunId()));
+        return Separator.QUALIFIERS.join(first, second);
+      }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * Decodes an flow run row key of the form
+     * clusterId!userId!flowName!flowrunId represented in byte format and
+     * converts it into an FlowRunRowKey object. flowRunId is inverted while
+     * decoding as it was inverted while encoding.
+     *
+     * @see
+     * org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#decode(byte[])
+     */
+    @Override
+    public FlowRunRowKey decode(byte[] rowKey) {
+      byte[][] rowKeyComponents =
+          Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+      if (rowKeyComponents.length != 4) {
+        throw new IllegalArgumentException("the row key is not valid for "
+            + "a flow run");
+      }
+      String clusterId =
+          Separator.decode(Bytes.toString(rowKeyComponents[0]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String userId =
+          Separator.decode(Bytes.toString(rowKeyComponents[1]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String flowName =
+          Separator.decode(Bytes.toString(rowKeyComponents[2]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      Long flowRunId =
+          LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
+      return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
new file mode 100644
index 0000000..23ebc66
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+
+/**
+ * Represents a partial rowkey (without the flowRunId) for the flow run table.
+ */
+public class FlowRunRowKeyPrefix extends FlowRunRowKey implements
+    RowKeyPrefix<FlowRunRowKey> {
+
+  /**
+   * Constructs a row key prefix for the flow run table as follows:
+   * {@code clusterId!userI!flowName!}.
+   *
+   * @param clusterId identifying the cluster
+   * @param userId identifying the user
+   * @param flowName identifying the flow
+   */
+  public FlowRunRowKeyPrefix(String clusterId, String userId,
+      String flowName) {
+    super(clusterId, userId, flowName, null);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.application.
+   * RowKeyPrefix#getRowKeyPrefix()
+   */
+  public byte[] getRowKeyPrefix() {
+    // We know we're a FlowRunRowKey with null florRunId, so we can simply
+    // delegate
+    return super.getRowKey();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
new file mode 100644
index 0000000..547bef0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+
+/**
+ * The flow run table has column family info
+ * Stores per flow run information
+ * aggregated across applications.
+ *
+ * Metrics are also stored in the info column family.
+ *
+ * Example flow run table record:
+ *
+ * <pre>
+ * flow_run table
+ * |-------------------------------------------|
+ * |  Row key   | Column Family                |
+ * |            | info                         |
+ * |-------------------------------------------|
+ * | clusterId! | flow_version:version7        |
+ * | userName!  |                              |
+ * | flowName!  | running_apps:1               |
+ * | flowRunId  |                              |
+ * |            | min_start_time:1392995080000 |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            | min_start_time:1392995081012 |
+ * |            | #0:appId2                    |
+ * |            |                              |
+ * |            | min_start_time:1392993083210 |
+ * |            | #0:appId3                    |
+ * |            |                              |
+ * |            |                              |
+ * |            | max_end_time:1392993084018   |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            |                              |
+ * |            | m!mapInputRecords:127        |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            | m!mapInputRecords:31         |
+ * |            | #2:appId2                    |
+ * |            |                              |
+ * |            | m!mapInputRecords:37         |
+ * |            | #1:appId3                    |
+ * |            |                              |
+ * |            |                              |
+ * |            | m!mapOutputRecords:181       |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            | m!mapOutputRecords:37        |
+ * |            | #1:appId3                    |
+ * |            |                              |
+ * |            |                              |
+ * |-------------------------------------------|
+ * </pre>
+ */
+public class FlowRunTable extends BaseTable<FlowRunTable> {
+  /** entity prefix. */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun";
+
+  /** config param name that specifies the flowrun table name. */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /** default value for flowrun table name. */
+  public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun";
+
+  private static final Log LOG = LogFactory.getLog(FlowRunTable.class);
+
+  /** default max number of versions. */
+  public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
+
+  public FlowRunTable() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * 
org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+   * (org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor flowRunTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor infoCF =
+        new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes());
+    infoCF.setBloomFilterType(BloomType.ROWCOL);
+    flowRunTableDescp.addFamily(infoCF);
+    infoCF.setMinVersions(1);
+    infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+
+    // TODO: figure the split policy
+    flowRunTableDescp.addCoprocessor(FlowRunCoprocessor.class
+        .getCanonicalName());
+    admin.createTable(flowRunTableDescp);
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to