http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnRWHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnRWHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnRWHelper.java new file mode 100644 index 0000000..a8e5149 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnRWHelper.java @@ -0,0 +1,487 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * A set of utility functions that read or read to a column. + * This class is meant to be used only by explicit Columns, + * and not directly to write by clients. + */ +public final class ColumnRWHelper { + private static final Logger LOG = + LoggerFactory.getLogger(ColumnHelper.class); + + private ColumnRWHelper() { + } + + /** + * Figures out the cell timestamp used in the Put For storing. + * Will supplement the timestamp if required. Typically done for flow run + * table.If we supplement the timestamp, we left shift the timestamp and + * supplement it with the AppId id so that there are no collisions in the flow + * run table's cells. + */ + private static long getPutTimestamp( + Long timestamp, boolean supplementTs, Attribute[] attributes) { + if (timestamp == null) { + timestamp = System.currentTimeMillis(); + } + if (!supplementTs) { + return timestamp; + } else { + String appId = getAppIdFromAttributes(attributes); + long supplementedTS = TimestampGenerator.getSupplementedTimestamp( + timestamp, appId); + return supplementedTS; + } + } + + private static String getAppIdFromAttributes(Attribute[] attributes) { + if (attributes == null) { + return null; + } + String appId = null; + for (Attribute attribute : attributes) { + if (AggregationCompactionDimension.APPLICATION_ID.toString().equals( + attribute.getName())) { + appId = Bytes.toString(attribute.getValue()); + } + } + return appId; + } + + /** + * Sends a Mutation to the table. The mutations will be buffered and sent over + * the wire as part of a batch. + * + * @param rowKey + * identifying the row to write. Nothing gets written when null. + * @param tableMutator + * used to modify the underlying HBase table + * @param column the column that is to be modified + * @param timestamp + * version timestamp. When null the current timestamp multiplied with + * TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of + * app id will be used + * @param inputValue + * the value to write to the rowKey and column qualifier. Nothing + * gets written when null. + * @param attributes Attributes to be set for HBase Put. + * @throws IOException if any problem occurs during store operation(sending + * mutation to table). + */ + public static void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator, + Column<?> column, Long timestamp, + Object inputValue, Attribute... attributes) + throws IOException { + store(rowKey, tableMutator, column.getColumnFamilyBytes(), + column.getColumnQualifierBytes(), timestamp, + column.supplementCellTimestamp(), inputValue, + column.getValueConverter(), + column.getCombinedAttrsWithAggr(attributes)); + } + + /** + * Sends a Mutation to the table. The mutations will be buffered and sent over + * the wire as part of a batch. + * + * @param rowKey + * identifying the row to write. Nothing gets written when null. + * @param tableMutator + * used to modify the underlying HBase table + * @param columnFamilyBytes + * @param columnQualifier + * column qualifier. Nothing gets written when null. + * @param timestamp + * version timestamp. When null the current timestamp multiplied with + * TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of + * app id will be used + * @param inputValue + * the value to write to the rowKey and column qualifier. Nothing + * gets written when null. + * @param converter + * @param attributes Attributes to be set for HBase Put. + * @throws IOException if any problem occurs during store operation(sending + * mutation to table). + */ + public static void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator, + byte[] columnFamilyBytes, byte[] columnQualifier, Long timestamp, + boolean supplementTs, Object inputValue, ValueConverter converter, + Attribute... attributes) throws IOException { + if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) { + return; + } + Put p = new Put(rowKey); + timestamp = getPutTimestamp(timestamp, supplementTs, attributes); + p.addColumn(columnFamilyBytes, columnQualifier, timestamp, + converter.encodeValue(inputValue)); + if ((attributes != null) && (attributes.length > 0)) { + for (Attribute attribute : attributes) { + p.setAttribute(attribute.getName(), attribute.getValue()); + } + } + tableMutator.mutate(p); + } + + /** + * Get the latest version of this specified column. Note: this call clones the + * value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}. + * + * @param result from which to read the value. Cannot be null + * @param columnFamilyBytes + * @param columnQualifierBytes referring to the column to be read. + * @param converter + * @return latest version of the specified column of whichever object was + * written. + * @throws IOException if any problem occurs while reading result. + */ + public static Object readResult(Result result, byte[] columnFamilyBytes, + byte[] columnQualifierBytes, ValueConverter converter) + throws IOException { + if (result == null || columnQualifierBytes == null) { + return null; + } + + // Would have preferred to be able to use getValueAsByteBuffer and get a + // ByteBuffer to avoid copy, but GenericObjectMapper doesn't seem to like + // that. + byte[] value = result.getValue(columnFamilyBytes, columnQualifierBytes); + return converter.decodeValue(value); + } + + /** + * Get the latest version of this specified column. Note: this call clones the + * value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}. + * + * @param result from which to read the value. Cannot be null + * @param column the column that the result can be parsed to + * @return latest version of the specified column of whichever object was + * written. + * @throws IOException if any problem occurs while reading result. + */ + public static Object readResult(Result result, Column<?> column) + throws IOException { + return readResult(result, column.getColumnFamilyBytes(), + column.getColumnQualifierBytes(), column.getValueConverter()); + } + + /** + * Get the latest version of this specified column. Note: this call clones the + * value content of the hosting {@link org.apache.hadoop.hbase.Cell Cell}. + * + * @param result Cannot be null + * @param columnPrefix column prefix to read from + * @param qualifier column qualifier. Nothing gets read when null. + * @return result object (can be cast to whatever object was written to) or + * null when specified column qualifier for this prefix doesn't exist + * in the result. + * @throws IOException if there is any exception encountered while reading + * result. + */ + public static Object readResult(Result result, ColumnPrefix<?> columnPrefix, + String qualifier) throws IOException { + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + columnPrefix.getColumnPrefixInBytes(), qualifier); + + return readResult( + result, columnPrefix.getColumnFamilyBytes(), + columnQualifier, columnPrefix.getValueConverter()); + } + + /** + * + * @param <K> identifies the type of key converter. + * @param result from which to read columns. + * @param keyConverter used to convert column bytes to the appropriate key + * type + * @return the latest values of columns in the column family with this prefix + * (or all of them if the prefix value is null). + * @throws IOException if there is any exception encountered while reading + * results. + */ + public static <K> Map<K, Object> readResults(Result result, + ColumnPrefix<?> columnPrefix, KeyConverter<K> keyConverter) + throws IOException { + return readResults(result, + columnPrefix.getColumnFamilyBytes(), + columnPrefix.getColumnPrefixInBytes(), + keyConverter, columnPrefix.getValueConverter()); + } + + /** + * @param result from which to reads data with timestamps. + * @param <K> identifies the type of key converter. + * @param <V> the type of the values. The values will be cast into that type. + * @param keyConverter used to convert column bytes to the appropriate key + * type. + * @return the cell values at each respective time in for form + * {@literal {idA={timestamp1->value1}, idA={timestamp2->value2}, + * idB={timestamp3->value3}, idC={timestamp1->value4}}} + * @throws IOException if there is any exception encountered while reading + * result. + */ + public static <K, V> NavigableMap<K, NavigableMap<Long, V>> + readResultsWithTimestamps(Result result, ColumnPrefix<?> columnPrefix, + KeyConverter<K> keyConverter) throws IOException { + return readResultsWithTimestamps(result, + columnPrefix.getColumnFamilyBytes(), + columnPrefix.getColumnPrefixInBytes(), + keyConverter, columnPrefix.getValueConverter(), + columnPrefix.supplementCellTimeStamp()); + } + + /** + * @param result from which to reads data with timestamps + * @param columnPrefixBytes optional prefix to limit columns. If null all + * columns are returned. + * @param <K> identifies the type of column name(indicated by type of key + * converter). + * @param <V> the type of the values. The values will be cast into that type. + * @param keyConverter used to convert column bytes to the appropriate key + * type. + * @return the cell values at each respective time in for form + * {@literal {idA={timestamp1->value1}, idA={timestamp2->value2}, + * idB={timestamp3->value3}, idC={timestamp1->value4}}} + * @throws IOException if any problem occurs while reading results. + */ + @SuppressWarnings("unchecked") + public static <K, V> NavigableMap<K, NavigableMap<Long, V>> + readResultsWithTimestamps(Result result, byte[] columnFamilyBytes, + byte[] columnPrefixBytes, KeyConverter<K> keyConverter, + ValueConverter valueConverter, boolean supplementTs) + throws IOException { + + NavigableMap<K, NavigableMap<Long, V>> results = new TreeMap<>(); + + if (result != null) { + NavigableMap< + byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> resultMap = + result.getMap(); + + NavigableMap<byte[], NavigableMap<Long, byte[]>> columnCellMap = + resultMap.get(columnFamilyBytes); + // could be that there is no such column family. + if (columnCellMap != null) { + for (Map.Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap + .entrySet()) { + K converterColumnKey = null; + if (columnPrefixBytes == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("null prefix was specified; returning all columns"); + } + try { + converterColumnKey = keyConverter.decode(entry.getKey()); + } catch (IllegalArgumentException iae) { + LOG.error("Illegal column found, skipping this column.", iae); + continue; + } + } else { + // A non-null prefix means columns are actually of the form + // prefix!columnNameRemainder + byte[][] columnNameParts = + Separator.QUALIFIERS.split(entry.getKey(), 2); + byte[] actualColumnPrefixBytes = columnNameParts[0]; + if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) + && columnNameParts.length == 2) { + try { + // This is the prefix that we want + converterColumnKey = keyConverter.decode(columnNameParts[1]); + } catch (IllegalArgumentException iae) { + LOG.error("Illegal column found, skipping this column.", iae); + continue; + } + } + } + + // If this column has the prefix we want + if (converterColumnKey != null) { + NavigableMap<Long, V> cellResults = + new TreeMap<Long, V>(); + NavigableMap<Long, byte[]> cells = entry.getValue(); + if (cells != null) { + for (Map.Entry<Long, byte[]> cell : cells.entrySet()) { + V value = + (V) valueConverter.decodeValue(cell.getValue()); + Long ts = supplementTs ? TimestampGenerator. + getTruncatedTimestamp(cell.getKey()) : cell.getKey(); + cellResults.put(ts, value); + } + } + results.put(converterColumnKey, cellResults); + } + } // for entry : columnCellMap + } // if columnCellMap != null + } // if result != null + return results; + } + + /** + * @param <K> identifies the type of column name(indicated by type of key + * converter). + * @param result from which to read columns + * @param columnPrefixBytes optional prefix to limit columns. If null all + * columns are returned. + * @param keyConverter used to convert column bytes to the appropriate key + * type. + * @return the latest values of columns in the column family. If the column + * prefix is null, the column qualifier is returned as Strings. For a + * non-null column prefix bytes, the column qualifier is returned as + * a list of parts, each part a byte[]. This is to facilitate + * returning byte arrays of values that were not Strings. + * @throws IOException if any problem occurs while reading results. + */ + public static <K> Map<K, Object> readResults(Result result, + byte[] columnFamilyBytes, byte[] columnPrefixBytes, + KeyConverter<K> keyConverter, ValueConverter valueConverter) + throws IOException { + Map<K, Object> results = new HashMap<K, Object>(); + + if (result != null) { + Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes); + for (Map.Entry<byte[], byte[]> entry : columns.entrySet()) { + byte[] columnKey = entry.getKey(); + if (columnKey != null && columnKey.length > 0) { + + K converterColumnKey = null; + if (columnPrefixBytes == null) { + try { + converterColumnKey = keyConverter.decode(columnKey); + } catch (IllegalArgumentException iae) { + LOG.error("Illegal column found, skipping this column.", iae); + continue; + } + } else { + // A non-null prefix means columns are actually of the form + // prefix!columnNameRemainder + byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 2); + if (columnNameParts.length > 0) { + byte[] actualColumnPrefixBytes = columnNameParts[0]; + // If this is the prefix that we want + if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes) + && columnNameParts.length == 2) { + try { + converterColumnKey = keyConverter.decode(columnNameParts[1]); + } catch (IllegalArgumentException iae) { + LOG.error("Illegal column found, skipping this column.", iae); + continue; + } + } + } + } // if-else + + // If the columnPrefix is null (we want all columns), or the actual + // prefix matches the given prefix we want this column + if (converterColumnKey != null) { + Object value = valueConverter.decodeValue(entry.getValue()); + // we return the columnQualifier in parts since we don't know + // which part is of which data type. + results.put(converterColumnKey, value); + } + } + } // for entry + } + return results; + } + + /** + * Sends a Mutation to the table. The mutations will be buffered and sent over + * the wire as part of a batch. + * + * @param rowKey identifying the row to write. Nothing gets written when null. + * @param tableMutator used to modify the underlying HBase table. Caller is + * responsible to pass a mutator for the table that actually has this + * column. + * @param qualifier column qualifier. Nothing gets written when null. + * @param timestamp version timestamp. When null the server timestamp will be + * used. + * @param attributes attributes for the mutation that are used by the + * coprocessor to set/read the cell tags. + * @param inputValue the value to write to the rowKey and column qualifier. + * Nothing gets written when null. + * @throws IOException if there is any exception encountered while doing + * store operation(sending mutation to the table). + */ + public static void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator, + ColumnPrefix<?> columnPrefix, byte[] qualifier, Long timestamp, + Object inputValue, Attribute... attributes) throws IOException { + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + +tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = columnPrefix.getColumnPrefixBytes(qualifier); + Attribute[] combinedAttributes = + columnPrefix.getCombinedAttrsWithAggr(attributes); + + store(rowKey, tableMutator, columnPrefix.getColumnFamilyBytes(), + columnQualifier, timestamp, columnPrefix.supplementCellTimeStamp(), + inputValue, columnPrefix.getValueConverter(), combinedAttributes); + } + + /** + * Sends a Mutation to the table. The mutations will be buffered and sent over + * the wire as part of a batch. + * + * @param rowKey identifying the row to write. Nothing gets written when null. + * @param tableMutator used to modify the underlying HBase table. Caller is + * responsible to pass a mutator for the table that actually has this + * column. + * @param qualifier column qualifier. Nothing gets written when null. + * @param timestamp version timestamp. When null the server timestamp will be + * used. + * @param attributes attributes for the mutation that are used by the + * coprocessor to set/read the cell tags. + * @param inputValue the value to write to the rowKey and column qualifier. + * Nothing gets written when null. + * @throws IOException if there is any exception encountered while doing + * store operation(sending mutation to the table). + */ + public static void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator, + ColumnPrefix<?> columnPrefix, String qualifier, Long timestamp, + Object inputValue, Attribute... attributes) throws IOException { + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = columnPrefix.getColumnPrefixBytes(qualifier); + Attribute[] combinedAttributes = + columnPrefix.getCombinedAttrsWithAggr(attributes); + + store(rowKey, tableMutator, columnPrefix.getColumnFamilyBytes(), + columnQualifier, timestamp, columnPrefix.supplementCellTimeStamp(), + inputValue, columnPrefix.getValueConverter(), combinedAttributes); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java new file mode 100644 index 0000000..f4cd6fb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Query; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A bunch of utility functions used in HBase TimelineService backend. + */ +public final class HBaseTimelineStorageUtils { + private static final Logger LOG = + LoggerFactory.getLogger(HBaseTimelineStorageUtils.class); + + private HBaseTimelineStorageUtils() { + } + + + /** + * @param conf YARN configuration. Used to see if there is an explicit config + * pointing to the HBase config file to read. It should not be null + * or a NullPointerException will be thrown. + * @return a configuration with the HBase configuration from the classpath, + * optionally overwritten by the timeline service configuration URL if + * specified. + * @throws MalformedURLException if a timeline service HBase configuration URL + * is specified but is a malformed URL. + */ + public static Configuration getTimelineServiceHBaseConf(Configuration conf) + throws MalformedURLException { + if (conf == null) { + throw new NullPointerException(); + } + + Configuration hbaseConf; + String timelineServiceHBaseConfFileURL = + conf.get(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE); + if (timelineServiceHBaseConfFileURL != null + && timelineServiceHBaseConfFileURL.length() > 0) { + LOG.info("Using hbase configuration at " + + timelineServiceHBaseConfFileURL); + // create a clone so that we don't mess with out input one + hbaseConf = new Configuration(conf); + Configuration plainHBaseConf = new Configuration(false); + URL hbaseSiteXML = new URL(timelineServiceHBaseConfFileURL); + plainHBaseConf.addResource(hbaseSiteXML); + HBaseConfiguration.merge(hbaseConf, plainHBaseConf); + } else { + // default to what is on the classpath + hbaseConf = HBaseConfiguration.create(conf); + } + return hbaseConf; + } + + /** + * Given a row key prefix stored in a byte array, return a byte array for its + * immediate next row key. + * + * @param rowKeyPrefix The provided row key prefix, represented in an array. + * @return the closest next row key of the provided row key. + */ + public static byte[] calculateTheClosestNextRowKeyForPrefix( + byte[] rowKeyPrefix) { + // Essentially we are treating it like an 'unsigned very very long' and + // doing +1 manually. + // Search for the place where the trailing 0xFFs start + int offset = rowKeyPrefix.length; + while (offset > 0) { + if (rowKeyPrefix[offset - 1] != (byte) 0xFF) { + break; + } + offset--; + } + + if (offset == 0) { + // We got an 0xFFFF... (only FFs) stopRow value which is + // the last possible prefix before the end of the table. + // So set it to stop at the 'end of the table' + return HConstants.EMPTY_END_ROW; + } + + // Copy the right length of the original + byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset); + // And increment the last one + newStopRow[newStopRow.length - 1]++; + return newStopRow; + } + + public static void setMetricsTimeRange(Query query, byte[] metricsCf, + long tsBegin, long tsEnd) { + if (tsBegin != 0 || tsEnd != Long.MAX_VALUE) { + query.setColumnFamilyTimeRange(metricsCf, + tsBegin, ((tsEnd == Long.MAX_VALUE) ? Long.MAX_VALUE : (tsEnd + 1))); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java new file mode 100644 index 0000000..8e6c259 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * contains the constants used in the context of schema accesses for + * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class TimelineHBaseSchemaConstants { + private TimelineHBaseSchemaConstants() { + } + + /** + * Used to create a pre-split for tables starting with a username in the + * prefix. TODO: this may have to become a config variable (string with + * separators) so that different installations can presplit based on their own + * commonly occurring names. + */ + private final static byte[][] USERNAME_SPLITS = { + Bytes.toBytes("a"), Bytes.toBytes("ad"), Bytes.toBytes("an"), + Bytes.toBytes("b"), Bytes.toBytes("ca"), Bytes.toBytes("cl"), + Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), + Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"), + Bytes.toBytes("j"), Bytes.toBytes("k"), Bytes.toBytes("l"), + Bytes.toBytes("m"), Bytes.toBytes("n"), Bytes.toBytes("o"), + Bytes.toBytes("q"), Bytes.toBytes("r"), Bytes.toBytes("s"), + Bytes.toBytes("se"), Bytes.toBytes("t"), Bytes.toBytes("u"), + Bytes.toBytes("v"), Bytes.toBytes("w"), Bytes.toBytes("x"), + Bytes.toBytes("y"), Bytes.toBytes("z") + }; + + /** + * The length at which keys auto-split. + */ + public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4"; + + /** + * @return splits for splits where a user is a prefix. + */ + public static byte[][] getUsernameSplits() { + byte[][] kloon = USERNAME_SPLITS.clone(); + // Deep copy. + for (int row = 0; row < USERNAME_SPLITS.length; row++) { + kloon[row] = Bytes.copy(USERNAME_SPLITS[row]); + } + return kloon; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java new file mode 100644 index 0000000..29a07e4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Mutation; + +/** + * To be used to wrap an actual {@link BufferedMutator} in a type safe manner. + * + * @param <T> The class referring to the table to be written to. + */ +public class TypedBufferedMutator<T extends BaseTable<T>> { + + private final BufferedMutator bufferedMutator; + + /** + * @param bufferedMutator the mutator to be wrapped for delegation. Shall not + * be null. + */ + public TypedBufferedMutator(BufferedMutator bufferedMutator) { + this.bufferedMutator = bufferedMutator; + } + + public TableName getName() { + return bufferedMutator.getName(); + } + + public Configuration getConfiguration() { + return bufferedMutator.getConfiguration(); + } + + public void mutate(Mutation mutation) throws IOException { + bufferedMutator.mutate(mutation); + } + + public void mutate(List<? extends Mutation> mutations) throws IOException { + bufferedMutator.mutate(mutations); + } + + public void close() throws IOException { + bufferedMutator.close(); + } + + public void flush() throws IOException { + bufferedMutator.flush(); + } + + public long getWriteBufferSize() { + return bufferedMutator.getWriteBufferSize(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java new file mode 100644 index 0000000..0df5b8a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.common contains + * a set of utility classes used across backend storage reader and writer. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTableRW.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTableRW.java new file mode 100644 index 0000000..111ae71 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTableRW.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.entity; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Create, read and write to the Entity Table. + */ +public class EntityTableRW extends BaseTableRW<EntityTable> { + /** entity prefix. */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "entity"; + + /** config param name that specifies the entity table name. */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** + * config param name that specifies the TTL for metrics column family in + * entity table. + */ + private static final String METRICS_TTL_CONF_NAME = PREFIX + + ".table.metrics.ttl"; + + /** + * config param name that specifies max-versions for metrics column family in + * entity table. + */ + private static final String METRICS_MAX_VERSIONS = + PREFIX + ".table.metrics.max-versions"; + + /** default value for entity table name. */ + public static final String DEFAULT_TABLE_NAME = "timelineservice.entity"; + + /** default TTL is 30 days for metrics timeseries. */ + private static final int DEFAULT_METRICS_TTL = 2592000; + + /** default max number of versions. */ + private static final int DEFAULT_METRICS_MAX_VERSIONS = 10000; + + private static final Logger LOG = + LoggerFactory.getLogger(EntityTableRW.class); + + public EntityTableRW() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW# + * createTable(org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor entityTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(EntityColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + entityTableDescp.addFamily(infoCF); + + HColumnDescriptor configCF = + new HColumnDescriptor(EntityColumnFamily.CONFIGS.getBytes()); + configCF.setBloomFilterType(BloomType.ROWCOL); + configCF.setBlockCacheEnabled(true); + entityTableDescp.addFamily(configCF); + + HColumnDescriptor metricsCF = + new HColumnDescriptor(EntityColumnFamily.METRICS.getBytes()); + entityTableDescp.addFamily(metricsCF); + metricsCF.setBlockCacheEnabled(true); + // always keep 1 version (the latest) + metricsCF.setMinVersions(1); + metricsCF.setMaxVersions( + hbaseConf.getInt(METRICS_MAX_VERSIONS, DEFAULT_METRICS_MAX_VERSIONS)); + metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME, + DEFAULT_METRICS_TTL)); + entityTableDescp.setRegionSplitPolicyClassName( + "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); + entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", + TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); + admin.createTable(entityTableDescp, + TimelineHBaseSchemaConstants.getUsernameSplits()); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } + + /** + * @param metricsTTL time to live parameter for the metricss in this table. + * @param hbaseConf configururation in which to set the metrics TTL config + * variable. + */ + public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) { + hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java new file mode 100644 index 0000000..bb0e331 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.entity + * contains classes related to implementation for entity table. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.entity; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTableRW.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTableRW.java new file mode 100644 index 0000000..5b9fe13 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTableRW.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Create, read and write to the FlowActivity Table. + */ +public class FlowActivityTableRW extends BaseTableRW<FlowActivityTable> { + /** flow activity table prefix. */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowactivity"; + + /** config param name that specifies the flowactivity table name. */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** default value for flowactivity table name. */ + public static final String DEFAULT_TABLE_NAME = + "timelineservice.flowactivity"; + + private static final Logger LOG = + LoggerFactory.getLogger(FlowActivityTableRW.class); + + /** default max number of versions. */ + public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE; + + public FlowActivityTableRW() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW# + * createTable(org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor flowActivityTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(FlowActivityColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + flowActivityTableDescp.addFamily(infoCF); + infoCF.setMinVersions(1); + infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS); + + // TODO: figure the split policy before running in production + admin.createTable(flowActivityTableDescp); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTableRW.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTableRW.java new file mode 100644 index 0000000..61c0734 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTableRW.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Coprocessor; + +/** + * Create, read and write to the FlowRun table. + */ +public class FlowRunTableRW extends BaseTableRW<FlowRunTable> { + /** entity prefix. */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun"; + + /** config param name that specifies the flowrun table name. */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** default value for flowrun table name. */ + public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun"; + + private static final Logger LOG = + LoggerFactory.getLogger(FlowRunTableRW.class); + + /** default max number of versions. */ + public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE; + + public FlowRunTableRW() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW# + * createTable(org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor flowRunTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + flowRunTableDescp.addFamily(infoCF); + infoCF.setMinVersions(1); + infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS); + + // TODO: figure the split policy + String coprocessorJarPathStr = hbaseConf.get( + YarnConfiguration.FLOW_RUN_COPROCESSOR_JAR_HDFS_LOCATION, + YarnConfiguration.DEFAULT_HDFS_LOCATION_FLOW_RUN_COPROCESSOR_JAR); + + Path coprocessorJarPath = new Path(coprocessorJarPathStr); + LOG.info("CoprocessorJarPath=" + coprocessorJarPath.toString()); + flowRunTableDescp.addCoprocessor( + "org.apache.hadoop.yarn.server.timelineservice.storage." + + "flow.FlowRunCoprocessor", coprocessorJarPath, + Coprocessor.PRIORITY_USER, null); + admin.createTable(flowRunTableDescp); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java new file mode 100644 index 0000000..04963f3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.flow + * contains classes related to implementation for flow related tables, viz. flow + * run table and flow activity table. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java new file mode 100644 index 0000000..e78db2a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage contains + * classes which define and implement reading and writing to backend storage. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java new file mode 100644 index 0000000..0956f1e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper; +import org.apache.hadoop.yarn.webapp.NotFoundException; + +/** + * The base class for reading timeline data from the HBase storage. This class + * provides basic support to validate and augment reader context. + */ +public abstract class AbstractTimelineStorageReader { + + private final TimelineReaderContext context; + /** + * Used to look up the flow context. + */ + private final AppToFlowTableRW appToFlowTable = new AppToFlowTableRW(); + + public AbstractTimelineStorageReader(TimelineReaderContext ctxt) { + context = ctxt; + } + + protected TimelineReaderContext getContext() { + return context; + } + + /** + * Looks up flow context from AppToFlow table. + * + * @param appToFlowRowKey to identify Cluster and App Ids. + * @param clusterId the cluster id. + * @param hbaseConf HBase configuration. + * @param conn HBase Connection. + * @return flow context information. + * @throws IOException if any problem occurs while fetching flow information. + */ + protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey, + String clusterId, Configuration hbaseConf, Connection conn) + throws IOException { + byte[] rowKey = appToFlowRowKey.getRowKey(); + Get get = new Get(rowKey); + Result result = appToFlowTable.getResult(hbaseConf, conn, get); + if (result != null && !result.isEmpty()) { + Object flowName = ColumnRWHelper.readResult( + result, AppToFlowColumnPrefix.FLOW_NAME, clusterId); + Object flowRunId = ColumnRWHelper.readResult( + result, AppToFlowColumnPrefix.FLOW_RUN_ID, clusterId); + Object userId = ColumnRWHelper.readResult( + result, AppToFlowColumnPrefix.USER_ID, clusterId); + if (flowName == null || userId == null || flowRunId == null) { + throw new NotFoundException( + "Unable to find the context flow name, and flow run id, " + + "and user id for clusterId=" + clusterId + + ", appId=" + appToFlowRowKey.getAppId()); + } + return new FlowContext((String)userId, (String)flowName, + ((Number)flowRunId).longValue()); + } else { + throw new NotFoundException( + "Unable to find the context flow name, and flow run id, " + + "and user id for clusterId=" + clusterId + + ", appId=" + appToFlowRowKey.getAppId()); + } + } + + /** + * Sets certain parameters to defaults if the values are not provided. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @throws IOException if any exception is encountered while setting params. + */ + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + defaultAugmentParams(hbaseConf, conn); + } + + /** + * Default behavior for all timeline readers to augment parameters. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @throws IOException if any exception is encountered while setting params. + */ + final protected void defaultAugmentParams(Configuration hbaseConf, + Connection conn) throws IOException { + // In reality all three should be null or neither should be null + if (context.getFlowName() == null || context.getFlowRunId() == null + || context.getUserId() == null) { + // Get flow context information from AppToFlow table. + AppToFlowRowKey appToFlowRowKey = + new AppToFlowRowKey(context.getAppId()); + FlowContext flowContext = + lookupFlowContext(appToFlowRowKey, context.getClusterId(), hbaseConf, + conn); + context.setFlowName(flowContext.flowName); + context.setFlowRunId(flowContext.flowRunId); + context.setUserId(flowContext.userId); + } + } + + /** + * Validates the required parameters to read the entities. + */ + protected abstract void validateParams(); + + /** + * Encapsulates flow context information. + */ + protected static class FlowContext { + private final String userId; + private final String flowName; + private final Long flowRunId; + + public FlowContext(String user, String flowName, Long flowRunId) { + this.userId = user; + this.flowName = flowName; + this.flowRunId = flowRunId; + } + + protected String getUserId() { + return userId; + } + + protected String getFlowName() { + return flowName; + } + + protected Long getFlowRunId() { + return flowRunId; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org