http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index 8e806bc..aa2bfda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -46,8 +46,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import com.google.common.base.Preconditions; @@ -150,13 +153,13 @@ class ApplicationEntityReader extends GenericEntityReader { EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // If INFO field has to be retrieved, add a filter for fetching columns // with INFO column prefix. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { + if (hasField(fieldsToRetrieve, Field.INFO)) { infoFamilyColsFilter.addFilter( TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.EQUAL, ApplicationColumnPrefix.INFO)); } TimelineFilterList relatesTo = getFilters().getRelatesTo(); - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { + if (hasField(fieldsToRetrieve, Field.RELATES_TO)) { // If RELATES_TO field has to be retrieved, add a filter for fetching // columns with RELATES_TO column prefix. infoFamilyColsFilter.addFilter( @@ -169,12 +172,11 @@ class ApplicationEntityReader extends GenericEntityReader { // matched after fetching rows from HBase. Set<String> relatesToCols = TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - ApplicationColumnPrefix.RELATES_TO, relatesToCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.RELATES_TO, relatesToCols)); } TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { // If IS_RELATED_TO field has to be retrieved, add a filter for fetching // columns with IS_RELATED_TO column prefix. infoFamilyColsFilter.addFilter( @@ -187,12 +189,11 @@ class ApplicationEntityReader extends GenericEntityReader { // matched after fetching rows from HBase. Set<String> isRelatedToCols = TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols)); } TimelineFilterList eventFilters = getFilters().getEventFilters(); - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { + if (hasField(fieldsToRetrieve, Field.EVENTS)) { // If EVENTS field has to be retrieved, add a filter for fetching columns // with EVENT column prefix. infoFamilyColsFilter.addFilter( @@ -205,9 +206,8 @@ class ApplicationEntityReader extends GenericEntityReader { // fetching rows from HBase. Set<String> eventCols = TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - ApplicationColumnPrefix.EVENT, eventCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.EVENT, eventCols)); } return infoFamilyColsFilter; } @@ -222,25 +222,25 @@ class ApplicationEntityReader extends GenericEntityReader { private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // Events not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { infoColFamilyList.addFilter( TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.NOT_EQUAL, ApplicationColumnPrefix.EVENT)); } // info not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { + if (!hasField(fieldsToRetrieve, Field.INFO)) { infoColFamilyList.addFilter( TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.NOT_EQUAL, ApplicationColumnPrefix.INFO)); } // is related to not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { infoColFamilyList.addFilter( TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.NOT_EQUAL, ApplicationColumnPrefix.IS_RELATED_TO)); } // relates to not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { infoColFamilyList.addFilter( TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.NOT_EQUAL, ApplicationColumnPrefix.RELATES_TO)); @@ -308,9 +308,10 @@ class ApplicationEntityReader extends GenericEntityReader { protected Result getResult(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { TimelineReaderContext context = getContext(); - byte[] rowKey = - ApplicationRowKey.getRowKey(context.getClusterId(), context.getUserId(), + ApplicationRowKey applicationRowKey = + new ApplicationRowKey(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowRunId(), context.getAppId()); + byte[] rowKey = applicationRowKey.getRowKey(); Get get = new Get(rowKey); get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -345,10 +346,13 @@ class ApplicationEntityReader extends GenericEntityReader { TimelineReaderContext context = getContext(); if (isSingleEntityRead()) { // Get flow context information from AppToFlow table. - if (context.getFlowName() == null || context.getFlowRunId() == null || - context.getUserId() == null) { - FlowContext flowContext = lookupFlowContext( - context.getClusterId(), context.getAppId(), hbaseConf, conn); + if (context.getFlowName() == null || context.getFlowRunId() == null + || context.getUserId() == null) { + AppToFlowRowKey appToFlowRowKey = + new AppToFlowRowKey(context.getClusterId(), context.getAppId()); + FlowContext flowContext = + lookupFlowContext(appToFlowRowKey, + hbaseConf, conn); context.setFlowName(flowContext.getFlowName()); context.setFlowRunId(flowContext.getFlowRunId()); context.setUserId(flowContext.getUserId()); @@ -367,15 +371,13 @@ class ApplicationEntityReader extends GenericEntityReader { Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); TimelineReaderContext context = getContext(); - if (context.getFlowRunId() != null) { - scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId())); - } else { - scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName())); - } + // Whether or not flowRunID is null doesn't matter, the + // ApplicationRowKeyPrefix will do the right thing. + RowKeyPrefix<ApplicationRowKey> applicationRowKeyPrefix = + new ApplicationRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), + context.getFlowRunId()); + scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix()); FilterList newList = new FilterList(); newList.addFilter(new PageFilter(getFilters().getLimit())); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -409,15 +411,14 @@ class ApplicationEntityReader extends GenericEntityReader { boolean checkIsRelatedTo = !isSingleEntityRead() && filters.getIsRelatedTo() != null && filters.getIsRelatedTo().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || - checkIsRelatedTo) { - TimelineStorageUtils.readRelationship( - entity, result, ApplicationColumnPrefix.IS_RELATED_TO, true); + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, + true); if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity, filters.getIsRelatedTo())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, + if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { entity.getIsRelatedToEntities().clear(); } @@ -430,29 +431,27 @@ class ApplicationEntityReader extends GenericEntityReader { boolean checkRelatesTo = !isSingleEntityRead() && filters.getRelatesTo() != null && filters.getRelatesTo().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) || + if (hasField(fieldsToRetrieve, Field.RELATES_TO) || checkRelatesTo) { - TimelineStorageUtils.readRelationship( - entity, result, ApplicationColumnPrefix.RELATES_TO, false); + readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, + false); if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity, filters.getRelatesTo())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { entity.getRelatesToEntities().clear(); } } // fetch info if fieldsToRetrieve contains INFO or ALL. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - TimelineStorageUtils.readKeyValuePairs( - entity, result, ApplicationColumnPrefix.INFO, false); + if (hasField(fieldsToRetrieve, Field.INFO)) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); } // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) { - TimelineStorageUtils.readKeyValuePairs( - entity, result, ApplicationColumnPrefix.CONFIG, true); + if (hasField(fieldsToRetrieve, Field.CONFIGS)) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); } // fetch events and match event filters if they exist. If event filters do @@ -462,21 +461,19 @@ class ApplicationEntityReader extends GenericEntityReader { boolean checkEvents = !isSingleEntityRead() && filters.getEventFilters() != null && filters.getEventFilters().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) || - checkEvents) { - TimelineStorageUtils.readEvents( - entity, result, ApplicationColumnPrefix.EVENT); + if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) { + readEvents(entity, result, ApplicationColumnPrefix.EVENT); if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity, filters.getEventFilters())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { entity.getEvents().clear(); } } // fetch metrics if fieldsToRetrieve contains METRICS or ALL. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) { + if (hasField(fieldsToRetrieve, Field.METRICS)) { readMetrics(entity, result, ApplicationColumnPrefix.METRIC); } return entity;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index faecd14..9ba5e38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -35,9 +35,11 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrie import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; import com.google.common.base.Preconditions; @@ -50,6 +52,12 @@ class FlowActivityEntityReader extends TimelineEntityReader { private static final FlowActivityTable FLOW_ACTIVITY_TABLE = new FlowActivityTable(); + /** + * Used to convert Long key components to and from storage format. + */ + private final KeyConverter<Long> longKeyConverter = new LongKeyConverter(); + + public FlowActivityEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { super(ctxt, entityFilters, toRetrieve, true); @@ -105,15 +113,14 @@ class FlowActivityEntityReader extends TimelineEntityReader { if (getFilters().getCreatedTimeBegin() == 0L && getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { // All records have to be chosen. - scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); + scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId) + .getRowKeyPrefix()); } else { - scan.setStartRow( - FlowActivityRowKey.getRowKeyPrefix(clusterId, - getFilters().getCreatedTimeEnd())); - scan.setStopRow( - FlowActivityRowKey.getRowKeyPrefix(clusterId, - (getFilters().getCreatedTimeBegin() <= 0 ? 0 : - (getFilters().getCreatedTimeBegin() - 1)))); + scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters() + .getCreatedTimeEnd()).getRowKeyPrefix()); + scan.setStopRow(new FlowActivityRowKeyPrefix(clusterId, (getFilters() + .getCreatedTimeBegin() <= 0 ? 0 + : (getFilters().getCreatedTimeBegin() - 1))).getRowKeyPrefix()); } // use the page filter to limit the result to the page size // the scanner may still return more than the limit; therefore we need to @@ -137,8 +144,7 @@ class FlowActivityEntityReader extends TimelineEntityReader { // get the list of run ids along with the version that are associated with // this flow on this day Map<Long, Object> runIdsMap = - FlowActivityColumnPrefix.RUN_ID.readResults(result, - LongKeyConverter.getInstance()); + FlowActivityColumnPrefix.RUN_ID.readResults(result, longKeyConverter); for (Map.Entry<Long, Object> e : runIdsMap.entrySet()) { Long runId = e.getKey(); String version = (String)e.getValue(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index e1695ef..986a28f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -28,12 +28,12 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FamilyFilter; import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; @@ -43,11 +43,12 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; import org.apache.hadoop.yarn.webapp.BadRequestException; @@ -81,8 +82,8 @@ class FlowRunEntityReader extends TimelineEntityReader { @Override protected void validateParams() { Preconditions.checkNotNull(getContext(), "context shouldn't be null"); - Preconditions.checkNotNull( - getDataToRetrieve(), "data to retrieve shouldn't be null"); + Preconditions.checkNotNull(getDataToRetrieve(), + "data to retrieve shouldn't be null"); Preconditions.checkNotNull(getContext().getClusterId(), "clusterId shouldn't be null"); Preconditions.checkNotNull(getContext().getUserId(), @@ -97,8 +98,8 @@ class FlowRunEntityReader extends TimelineEntityReader { if (!isSingleEntityRead() && fieldsToRetrieve != null) { for (Field field : fieldsToRetrieve) { if (field != Field.ALL && field != Field.METRICS) { - throw new BadRequestException("Invalid field " + field + - " specified while querying flow runs."); + throw new BadRequestException("Invalid field " + field + + " specified while querying flow runs."); } } } @@ -119,23 +120,22 @@ class FlowRunEntityReader extends TimelineEntityReader { Long createdTimeBegin = getFilters().getCreatedTimeBegin(); Long createdTimeEnd = getFilters().getCreatedTimeEnd(); if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createSingleColValueFiltersByRange( - FlowRunColumn.MIN_START_TIME, createdTimeBegin, createdTimeEnd)); + listBasedOnFilters.addFilter(TimelineFilterUtils + .createSingleColValueFiltersByRange(FlowRunColumn.MIN_START_TIME, + createdTimeBegin, createdTimeEnd)); } // Filter based on metric filters. TimelineFilterList metricFilters = getFilters().getMetricFilters(); if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - FlowRunColumnPrefix.METRIC, metricFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricFilters)); } return listBasedOnFilters; } /** - * Add {@link QualifierFilter} filters to filter list for each column of - * flow run table. + * Add {@link QualifierFilter} filters to filter list for each column of flow + * run table. * * @return filter list to which qualifier filters have been added. */ @@ -153,20 +153,19 @@ class FlowRunEntityReader extends TimelineEntityReader { FilterList list = new FilterList(Operator.MUST_PASS_ONE); // By default fetch everything in INFO column family. FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(FlowRunColumnFamily.INFO.getBytes())); + new FamilyFilter(CompareOp.EQUAL, new BinaryComparator( + FlowRunColumnFamily.INFO.getBytes())); TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // If multiple entities have to be retrieved, check if metrics have to be // retrieved and if not, add a filter so that metrics can be excluded. // Metrics are always returned if we are reading a single entity. - if (!isSingleEntityRead() && !TimelineStorageUtils.hasField( - dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) { + if (!isSingleEntityRead() + && !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) { FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); infoColFamilyList.addFilter(infoColumnFamily); - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes("")))); + infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC + .getColumnPrefixBytes("")))); list.addFilter(infoColFamilyList); } else { // Check if metricsToRetrieve are specified and if they are, create a @@ -176,14 +175,13 @@ class FlowRunEntityReader extends TimelineEntityReader { // (in augmentParams()). TimelineFilterList metricsToRetrieve = dataToRetrieve.getMetricsToRetrieve(); - if (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { + if (metricsToRetrieve != null + && !metricsToRetrieve.getFilterList().isEmpty()) { FilterList infoColFamilyList = new FilterList(); infoColFamilyList.addFilter(infoColumnFamily); FilterList columnsList = updateFixedColumns(); - columnsList.addFilter( - TimelineFilterUtils.createHBaseFilterList( - FlowRunColumnPrefix.METRIC, metricsToRetrieve)); + columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricsToRetrieve)); infoColFamilyList.addFilter(columnsList); list.addFilter(infoColFamilyList); } @@ -195,9 +193,10 @@ class FlowRunEntityReader extends TimelineEntityReader { protected Result getResult(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { TimelineReaderContext context = getContext(); - byte[] rowKey = - FlowRunRowKey.getRowKey(context.getClusterId(), context.getUserId(), + FlowRunRowKey flowRunRowKey = + new FlowRunRowKey(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowRunId()); + byte[] rowKey = flowRunRowKey.getRowKey(); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -207,13 +206,14 @@ class FlowRunEntityReader extends TimelineEntityReader { } @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { + protected ResultScanner getResults(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { Scan scan = new Scan(); TimelineReaderContext context = getContext(); - scan.setRowPrefixFilter( - FlowRunRowKey.getRowKeyPrefix(context.getClusterId(), - context.getUserId(), context.getFlowName())); + RowKeyPrefix<FlowRunRowKey> flowRunRowKeyPrefix = + new FlowRunRowKeyPrefix(context.getClusterId(), context.getUserId(), + context.getFlowName()); + scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix()); FilterList newList = new FilterList(); newList.addFilter(new PageFilter(getFilters().getLimit())); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -238,27 +238,27 @@ class FlowRunEntityReader extends TimelineEntityReader { } // read the start time - Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result); + Long startTime = (Long) FlowRunColumn.MIN_START_TIME.readResult(result); if (startTime != null) { flowRun.setStartTime(startTime.longValue()); } // read the end time if available - Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result); + Long endTime = (Long) FlowRunColumn.MAX_END_TIME.readResult(result); if (endTime != null) { flowRun.setMaxEndTime(endTime.longValue()); } // read the flow version - String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result); + String version = (String) FlowRunColumn.FLOW_VERSION.readResult(result); if (version != null) { flowRun.setVersion(version); } // read metrics if its a single entity query or if METRICS are part of // fieldsToRetrieve. - if (isSingleEntityRead() || TimelineStorageUtils.hasField( - getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) { + if (isSingleEntityRead() + || hasField(getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) { readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 22583b5..4e1ab8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; import java.util.EnumSet; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -28,11 +29,11 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FamilyFilter; import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; @@ -44,11 +45,16 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -66,6 +72,12 @@ class GenericEntityReader extends TimelineEntityReader { */ private final AppToFlowTable appToFlowTable = new AppToFlowTable(); + /** + * Used to convert strings key components to and from storage format. + */ + private final KeyConverter<String> stringKeyConverter = + new StringKeyConverter(); + public GenericEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, boolean sortedKeys) { @@ -95,32 +107,29 @@ class GenericEntityReader extends TimelineEntityReader { long createdTimeBegin = filters.getCreatedTimeBegin(); long createdTimeEnd = filters.getCreatedTimeEnd(); if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createSingleColValueFiltersByRange( - EntityColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd)); + listBasedOnFilters.addFilter(TimelineFilterUtils + .createSingleColValueFiltersByRange(EntityColumn.CREATED_TIME, + createdTimeBegin, createdTimeEnd)); } // Create filter list based on metric filters and add it to // listBasedOnFilters. TimelineFilterList metricFilters = filters.getMetricFilters(); if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.METRIC, metricFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.METRIC, metricFilters)); } // Create filter list based on config filters and add it to // listBasedOnFilters. TimelineFilterList configFilters = filters.getConfigFilters(); if (configFilters != null && !configFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.CONFIG, configFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.CONFIG, configFilters)); } // Create filter list based on info filters and add it to listBasedOnFilters TimelineFilterList infoFilters = filters.getInfoFilters(); if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.INFO, infoFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.INFO, infoFilters)); } return listBasedOnFilters; } @@ -130,10 +139,10 @@ class GenericEntityReader extends TimelineEntityReader { * * @return true if we need to fetch some of the columns, false otherwise. */ - private static boolean fetchPartialEventCols(TimelineFilterList eventFilters, + private boolean fetchPartialEventCols(TimelineFilterList eventFilters, EnumSet<Field> fieldsToRetrieve) { return (eventFilters != null && !eventFilters.getFilterList().isEmpty() && - !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)); + !hasField(fieldsToRetrieve, Field.EVENTS)); } /** @@ -141,10 +150,10 @@ class GenericEntityReader extends TimelineEntityReader { * * @return true if we need to fetch some of the columns, false otherwise. */ - private static boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo, + private boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo, EnumSet<Field> fieldsToRetrieve) { return (relatesTo != null && !relatesTo.getFilterList().isEmpty() && - !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)); + !hasField(fieldsToRetrieve, Field.RELATES_TO)); } /** @@ -152,10 +161,10 @@ class GenericEntityReader extends TimelineEntityReader { * * @return true if we need to fetch some of the columns, false otherwise. */ - private static boolean fetchPartialIsRelatedToCols( - TimelineFilterList isRelatedTo, EnumSet<Field> fieldsToRetrieve) { + private boolean fetchPartialIsRelatedToCols(TimelineFilterList isRelatedTo, + EnumSet<Field> fieldsToRetrieve) { return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() && - !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)); + !hasField(fieldsToRetrieve, Field.IS_RELATED_TO)); } /** @@ -163,19 +172,20 @@ class GenericEntityReader extends TimelineEntityReader { * relatesto and isrelatedto from info family. * * @return true, if we need to fetch only some of the columns, false if we - * need to fetch all the columns under info column family. + * need to fetch all the columns under info column family. */ protected boolean fetchPartialColsFromInfoFamily() { EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); TimelineEntityFilters filters = getFilters(); - return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) || - fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) || - fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), fieldsToRetrieve); + return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) + || fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) + || fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), + fieldsToRetrieve); } /** - * Check if we need to create filter list based on fields. We need to create - * a filter list iff all fields need not be retrieved or we have some specific + * Check if we need to create filter list based on fields. We need to create a + * filter list iff all fields need not be retrieved or we have some specific * fields or metrics to retrieve. We also need to create a filter list if we * have relationships(relatesTo/isRelatedTo) and event filters specified for * the query. @@ -188,22 +198,24 @@ class GenericEntityReader extends TimelineEntityReader { // be retrieved, also check if we have some metrics or configs to // retrieve specified for the query because then a filter list will have // to be created. - boolean flag = !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) || - (dataToRetrieve.getConfsToRetrieve() != null && - !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) || - (dataToRetrieve.getMetricsToRetrieve() != null && - !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()); + boolean flag = + !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) + || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve + .getConfsToRetrieve().getFilterList().isEmpty()) + || (dataToRetrieve.getMetricsToRetrieve() != null && !dataToRetrieve + .getMetricsToRetrieve().getFilterList().isEmpty()); // Filters need to be checked only if we are reading multiple entities. If // condition above is false, we check if there are relationships(relatesTo/ // isRelatedTo) and event filters specified for the query. if (!flag && !isSingleEntityRead()) { TimelineEntityFilters filters = getFilters(); - flag = (filters.getEventFilters() != null && - !filters.getEventFilters().getFilterList().isEmpty()) || - (filters.getIsRelatedTo() != null && - !filters.getIsRelatedTo().getFilterList().isEmpty()) || - (filters.getRelatesTo() != null && - !filters.getRelatesTo().getFilterList().isEmpty()); + flag = + (filters.getEventFilters() != null && !filters.getEventFilters() + .getFilterList().isEmpty()) + || (filters.getIsRelatedTo() != null && !filters.getIsRelatedTo() + .getFilterList().isEmpty()) + || (filters.getRelatesTo() != null && !filters.getRelatesTo() + .getFilterList().isEmpty()); } return flag; } @@ -216,8 +228,8 @@ class GenericEntityReader extends TimelineEntityReader { */ protected void updateFixedColumns(FilterList list) { for (EntityColumn column : EntityColumn.values()) { - list.addFilter(new QualifierFilter(CompareOp.EQUAL, - new BinaryComparator(column.getColumnQualifierBytes()))); + list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator( + column.getColumnQualifierBytes()))); } } @@ -226,30 +238,29 @@ class GenericEntityReader extends TimelineEntityReader { * qualifiers in the info column family will be returned in result. * * @param isApplication If true, it means operations are to be performed for - * application table, otherwise for entity table. + * application table, otherwise for entity table. * @return filter list. * @throws IOException if any problem occurs while creating filter list. */ - private FilterList createFilterListForColsOfInfoFamily() - throws IOException { + private FilterList createFilterListForColsOfInfoFamily() throws IOException { FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE); // Add filters for each column in entity table. updateFixedColumns(infoFamilyColsFilter); EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // If INFO field has to be retrieved, add a filter for fetching columns // with INFO column prefix. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( + if (hasField(fieldsToRetrieve, Field.INFO)) { + infoFamilyColsFilter + .addFilter(TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.EQUAL, EntityColumnPrefix.INFO)); } TimelineFilterList relatesTo = getFilters().getRelatesTo(); - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { + if (hasField(fieldsToRetrieve, Field.RELATES_TO)) { // If RELATES_TO field has to be retrieved, add a filter for fetching // columns with RELATES_TO column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, EntityColumnPrefix.RELATES_TO)); + infoFamilyColsFilter.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.EQUAL, + EntityColumnPrefix.RELATES_TO)); } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) { // Even if fields to retrieve does not contain RELATES_TO, we still // need to have a filter to fetch some of the column qualifiers if @@ -257,17 +268,16 @@ class GenericEntityReader extends TimelineEntityReader { // matched after fetching rows from HBase. Set<String> relatesToCols = TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - EntityColumnPrefix.RELATES_TO, relatesToCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + EntityColumnPrefix.RELATES_TO, relatesToCols)); } TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { // If IS_RELATED_TO field has to be retrieved, add a filter for fetching // columns with IS_RELATED_TO column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, EntityColumnPrefix.IS_RELATED_TO)); + infoFamilyColsFilter.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.EQUAL, + EntityColumnPrefix.IS_RELATED_TO)); } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) { // Even if fields to retrieve does not contain IS_RELATED_TO, we still // need to have a filter to fetch some of the column qualifiers if @@ -275,27 +285,26 @@ class GenericEntityReader extends TimelineEntityReader { // matched after fetching rows from HBase. Set<String> isRelatedToCols = TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols)); } TimelineFilterList eventFilters = getFilters().getEventFilters(); - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { + if (hasField(fieldsToRetrieve, Field.EVENTS)) { // If EVENTS field has to be retrieved, add a filter for fetching columns // with EVENT column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( + infoFamilyColsFilter + .addFilter(TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.EQUAL, EntityColumnPrefix.EVENT)); - } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){ + } else if (eventFilters != null && + !eventFilters.getFilterList().isEmpty()) { // Even if fields to retrieve does not contain EVENTS, we still need to // have a filter to fetch some of the column qualifiers on the basis of // event filters specified. Event filters will then be matched after // fetching rows from HBase. Set<String> eventCols = TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - EntityColumnPrefix.EVENT, eventCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + EntityColumnPrefix.EVENT, eventCols)); } return infoFamilyColsFilter; } @@ -310,28 +319,28 @@ class GenericEntityReader extends TimelineEntityReader { private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // Events not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, EntityColumnPrefix.EVENT)); + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.EVENT)); } // info not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, EntityColumnPrefix.INFO)); + if (!hasField(fieldsToRetrieve, Field.INFO)) { + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.INFO)); } // is related to not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, EntityColumnPrefix.IS_RELATED_TO)); + if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.IS_RELATED_TO)); } // relates to not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, EntityColumnPrefix.RELATES_TO)); + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.RELATES_TO)); } } @@ -348,18 +357,18 @@ class GenericEntityReader extends TimelineEntityReader { // CONFS to fields to retrieve in augmentParams() even if not specified. if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) { // Create a filter list for configs. - listBasedOnFields.addFilter(TimelineFilterUtils. - createFilterForConfsOrMetricsToRetrieve( - dataToRetrieve.getConfsToRetrieve(), - EntityColumnFamily.CONFIGS, EntityColumnPrefix.CONFIG)); + listBasedOnFields.addFilter(TimelineFilterUtils + .createFilterForConfsOrMetricsToRetrieve( + dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS, + EntityColumnPrefix.CONFIG)); } // Please note that if metricsToRetrieve is specified, we would have added // METRICS to fields to retrieve in augmentParams() even if not specified. if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) { // Create a filter list for metrics. - listBasedOnFields.addFilter(TimelineFilterUtils. - createFilterForConfsOrMetricsToRetrieve( + listBasedOnFields.addFilter(TimelineFilterUtils + .createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getMetricsToRetrieve(), EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC)); } @@ -375,8 +384,8 @@ class GenericEntityReader extends TimelineEntityReader { FilterList infoColFamilyList = new FilterList(); // By default fetch everything in INFO column family. FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(EntityColumnFamily.INFO.getBytes())); + new FamilyFilter(CompareOp.EQUAL, new BinaryComparator( + EntityColumnFamily.INFO.getBytes())); infoColFamilyList.addFilter(infoColumnFamily); if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) { // We can fetch only some of the columns from info family. @@ -394,27 +403,27 @@ class GenericEntityReader extends TimelineEntityReader { /** * Looks up flow context from AppToFlow table. * - * @param clusterId Cluster Id. - * @param appId App Id. + * @param appToFlowRowKey to identify Cluster and App Ids. * @param hbaseConf HBase configuration. * @param conn HBase Connection. * @return flow context information. * @throws IOException if any problem occurs while fetching flow information. */ - protected FlowContext lookupFlowContext(String clusterId, String appId, + protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey, Configuration hbaseConf, Connection conn) throws IOException { - byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); + byte[] rowKey = appToFlowRowKey.getRowKey(); Get get = new Get(rowKey); Result result = appToFlowTable.getResult(hbaseConf, conn, get); if (result != null && !result.isEmpty()) { - return new FlowContext( - AppToFlowColumn.USER_ID.readResult(result).toString(), - AppToFlowColumn.FLOW_ID.readResult(result).toString(), - ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); + return new FlowContext(AppToFlowColumn.USER_ID.readResult(result) + .toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(), + ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)) + .longValue()); } else { throw new NotFoundException( - "Unable to find the context flow ID and flow run ID for clusterId=" + - clusterId + ", appId=" + appId); + "Unable to find the context flow ID and flow run ID for clusterId=" + + appToFlowRowKey.getClusterId() + ", appId=" + + appToFlowRowKey.getAppId()); } } @@ -425,17 +434,21 @@ class GenericEntityReader extends TimelineEntityReader { private final String userId; private final String flowName; private final Long flowRunId; + public FlowContext(String user, String flowName, Long flowRunId) { this.userId = user; this.flowName = flowName; this.flowRunId = flowRunId; } + protected String getUserId() { return userId; } + protected String getFlowName() { return flowName; } + protected Long getFlowRunId() { return flowRunId; } @@ -444,8 +457,8 @@ class GenericEntityReader extends TimelineEntityReader { @Override protected void validateParams() { Preconditions.checkNotNull(getContext(), "context shouldn't be null"); - Preconditions.checkNotNull( - getDataToRetrieve(), "data to retrieve shouldn't be null"); + Preconditions.checkNotNull(getDataToRetrieve(), + "data to retrieve shouldn't be null"); Preconditions.checkNotNull(getContext().getClusterId(), "clusterId shouldn't be null"); Preconditions.checkNotNull(getContext().getAppId(), @@ -463,11 +476,13 @@ class GenericEntityReader extends TimelineEntityReader { throws IOException { TimelineReaderContext context = getContext(); // In reality all three should be null or neither should be null - if (context.getFlowName() == null || context.getFlowRunId() == null || - context.getUserId() == null) { + if (context.getFlowName() == null || context.getFlowRunId() == null + || context.getUserId() == null) { // Get flow context information from AppToFlow table. - FlowContext flowContext = lookupFlowContext( - context.getClusterId(), context.getAppId(), hbaseConf, conn); + AppToFlowRowKey appToFlowRowKey = + new AppToFlowRowKey(context.getClusterId(), context.getAppId()); + FlowContext flowContext = + lookupFlowContext(appToFlowRowKey, hbaseConf, conn); context.setFlowName(flowContext.flowName); context.setFlowRunId(flowContext.flowRunId); context.setUserId(flowContext.userId); @@ -485,9 +500,9 @@ class GenericEntityReader extends TimelineEntityReader { FilterList filterList) throws IOException { TimelineReaderContext context = getContext(); byte[] rowKey = - EntityRowKey.getRowKey(context.getClusterId(), context.getUserId(), + new EntityRowKey(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowRunId(), context.getAppId(), - context.getEntityType(), context.getEntityId()); + context.getEntityType(), context.getEntityId()).getRowKey(); Get get = new Get(rowKey); get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -497,15 +512,17 @@ class GenericEntityReader extends TimelineEntityReader { } @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { + protected ResultScanner getResults(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { // Scan through part of the table to find the entities belong to one app // and one type Scan scan = new Scan(); TimelineReaderContext context = getContext(); - scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( - context.getClusterId(), context.getUserId(), context.getFlowName(), - context.getFlowRunId(), context.getAppId(), context.getEntityType())); + RowKeyPrefix<EntityRowKey> entityRowKeyPrefix = + new EntityRowKeyPrefix(context.getClusterId(), context.getUserId(), + context.getFlowName(), context.getFlowRunId(), context.getAppId(), + context.getEntityType()); + scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix()); scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); if (filterList != null && !filterList.getFilters().isEmpty()) { scan.setFilter(filterList); @@ -535,18 +552,16 @@ class GenericEntityReader extends TimelineEntityReader { // locally as relevant HBase filters to filter out rows on the basis of // isRelatedTo are not set in HBase scan. boolean checkIsRelatedTo = - !isSingleEntityRead() && filters.getIsRelatedTo() != null && - filters.getIsRelatedTo().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || - checkIsRelatedTo) { - TimelineStorageUtils.readRelationship( - entity, result, EntityColumnPrefix.IS_RELATED_TO, true); - if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity, - filters.getIsRelatedTo())) { + !isSingleEntityRead() && filters.getIsRelatedTo() != null + && filters.getIsRelatedTo().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); + if (checkIsRelatedTo + && !TimelineStorageUtils.matchIsRelatedTo(entity, + filters.getIsRelatedTo())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, - Field.IS_RELATED_TO)) { + if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { entity.getIsRelatedToEntities().clear(); } } @@ -556,31 +571,29 @@ class GenericEntityReader extends TimelineEntityReader { // locally as relevant HBase filters to filter out rows on the basis of // relatesTo are not set in HBase scan. boolean checkRelatesTo = - !isSingleEntityRead() && filters.getRelatesTo() != null && - filters.getRelatesTo().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) || - checkRelatesTo) { - TimelineStorageUtils.readRelationship( - entity, result, EntityColumnPrefix.RELATES_TO, false); - if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity, - filters.getRelatesTo())) { + !isSingleEntityRead() && filters.getRelatesTo() != null + && filters.getRelatesTo().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.RELATES_TO) + || checkRelatesTo) { + readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); + if (checkRelatesTo + && !TimelineStorageUtils.matchRelatesTo(entity, + filters.getRelatesTo())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { entity.getRelatesToEntities().clear(); } } // fetch info if fieldsToRetrieve contains INFO or ALL. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - TimelineStorageUtils.readKeyValuePairs( - entity, result, EntityColumnPrefix.INFO, false); + if (hasField(fieldsToRetrieve, Field.INFO)) { + readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); } // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) { - TimelineStorageUtils.readKeyValuePairs( - entity, result, EntityColumnPrefix.CONFIG, true); + if (hasField(fieldsToRetrieve, Field.CONFIGS)) { + readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); } // fetch events and match event filters if they exist. If event filters do @@ -588,24 +601,48 @@ class GenericEntityReader extends TimelineEntityReader { // as relevant HBase filters to filter out rows on the basis of events // are not set in HBase scan. boolean checkEvents = - !isSingleEntityRead() && filters.getEventFilters() != null && - filters.getEventFilters().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) || - checkEvents) { - TimelineStorageUtils.readEvents(entity, result, EntityColumnPrefix.EVENT); - if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity, - filters.getEventFilters())) { + !isSingleEntityRead() && filters.getEventFilters() != null + && filters.getEventFilters().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) { + readEvents(entity, result, EntityColumnPrefix.EVENT); + if (checkEvents + && !TimelineStorageUtils.matchEventFilters(entity, + filters.getEventFilters())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { entity.getEvents().clear(); } } // fetch metrics if fieldsToRetrieve contains METRICS or ALL. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) { + if (hasField(fieldsToRetrieve, Field.METRICS)) { readMetrics(entity, result, EntityColumnPrefix.METRIC); } return entity; } + + /** + * Helper method for reading key-value pairs for either info or config. + * + * @param <T> Describes the type of column prefix. + * @param entity entity to fill. + * @param result result from HBase. + * @param prefix column prefix. + * @param isConfig if true, means we are reading configs, otherwise info. + * @throws IOException if any problem is encountered while reading result. + */ + protected <T> void readKeyValuePairs(TimelineEntity entity, Result result, + ColumnPrefix<T> prefix, boolean isConfig) throws IOException { + // info and configuration are of type Map<String, Object or String> + Map<String, Object> columns = + prefix.readResults(result, stringKeyConverter); + if (isConfig) { + for (Map.Entry<String, Object> column : columns.entrySet()) { + entity.addConfig(column.getKey(), column.getValue().toString()); + } + } else { + entity.addInfo(columns); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 852834e..7b294a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; @@ -30,15 +33,27 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; /** * The base class for reading and deserializing timeline entities from the @@ -68,6 +83,12 @@ public abstract class TimelineEntityReader { private boolean sortedKeys = false; /** + * Used to convert strings key components to and from storage format. + */ + private final KeyConverter<String> stringKeyConverter = + new StringKeyConverter(); + + /** * Instantiates a reader for multiple-entity reads. * * @param ctxt Reader context which defines the scope in which query has to be @@ -331,7 +352,7 @@ public abstract class TimelineEntityReader { ColumnPrefix<?> columnPrefix) throws IOException { NavigableMap<String, NavigableMap<Long, Number>> metricsResult = columnPrefix.readResultsWithTimestamps( - result, StringKeyConverter.getInstance()); + result, stringKeyConverter); for (Map.Entry<String, NavigableMap<Long, Number>> metricResult: metricsResult.entrySet()) { TimelineMetric metric = new TimelineMetric(); @@ -359,4 +380,117 @@ public abstract class TimelineEntityReader { protected void setTable(BaseTable<?> baseTable) { this.table = baseTable; } + + /** + * Check if we have a certain field amongst fields to retrieve. This method + * checks against {@link Field#ALL} as well because that would mean field + * passed needs to be matched. + * + * @param fieldsToRetrieve fields to be retrieved. + * @param requiredField fields to be checked in fieldsToRetrieve. + * @return true if has the required field, false otherwise. + */ + protected boolean hasField(EnumSet<Field> fieldsToRetrieve, + Field requiredField) { + return fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(requiredField); + } + + /** + * Create a filter list of qualifier filters based on passed set of columns. + * + * @param <T> Describes the type of column prefix. + * @param colPrefix Column Prefix. + * @param columns set of column qualifiers. + * @return filter list. + */ + protected <T> FilterList createFiltersFromColumnQualifiers( + ColumnPrefix<T> colPrefix, Set<String> columns) { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + for (String column : columns) { + // For columns which have compound column qualifiers (eg. events), we need + // to include the required separator. + byte[] compoundColQual = createColQualifierPrefix(colPrefix, column); + list.addFilter(new QualifierFilter(CompareOp.EQUAL, + new BinaryPrefixComparator(colPrefix + .getColumnPrefixBytes(compoundColQual)))); + } + return list; + } + + protected <T> byte[] createColQualifierPrefix(ColumnPrefix<T> colPrefix, + String column) { + if (colPrefix == ApplicationColumnPrefix.EVENT + || colPrefix == EntityColumnPrefix.EVENT) { + return new EventColumnName(column, null, null).getColumnQualifier(); + } else { + return stringKeyConverter.encode(column); + } + } + + /** + * Helper method for reading relationship. + * + * @param <T> Describes the type of column prefix. + * @param entity entity to fill. + * @param result result from HBase. + * @param prefix column prefix. + * @param isRelatedTo if true, means relationship is to be added to + * isRelatedTo, otherwise its added to relatesTo. + * @throws IOException if any problem is encountered while reading result. + */ + protected <T> void readRelationship(TimelineEntity entity, Result result, + ColumnPrefix<T> prefix, boolean isRelatedTo) throws IOException { + // isRelatedTo and relatesTo are of type Map<String, Set<String>> + Map<String, Object> columns = + prefix.readResults(result, stringKeyConverter); + for (Map.Entry<String, Object> column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded(column.getValue() + .toString())) { + if (isRelatedTo) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + /** + * Read events from the entity table or the application table. The column name + * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted + * if there is no info associated with the event. + * + * @param <T> Describes the type of column prefix. + * @param entity entity to fill. + * @param result HBase Result. + * @param prefix column prefix. + * @throws IOException if any problem is encountered while reading result. + */ + protected static <T> void readEvents(TimelineEntity entity, Result result, + ColumnPrefix<T> prefix) throws IOException { + Map<String, TimelineEvent> eventsMap = new HashMap<>(); + Map<EventColumnName, Object> eventsResult = + prefix.readResults(result, new EventColumnNameConverter()); + for (Map.Entry<EventColumnName, Object> + eventResult : eventsResult.entrySet()) { + EventColumnName eventColumnName = eventResult.getKey(); + String key = eventColumnName.getId() + + Long.toString(eventColumnName.getTimestamp()); + // Retrieve previously seen event to add to it + TimelineEvent event = eventsMap.get(key); + if (event == null) { + // First time we're seeing this event, add it to the eventsMap + event = new TimelineEvent(); + event.setId(eventColumnName.getId()); + event.setTimestamp(eventColumnName.getTimestamp()); + eventsMap.put(key, event); + } + if (eventColumnName.getInfoKey() != null) { + event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue()); + } + } + Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values()); + entity.addEvents(eventsSet); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f56409b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java index 74e4b5d..58df970 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java @@ -24,220 +24,13 @@ import static org.junit.Assert.assertTrue; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyConverter; import org.junit.Test; public class TestKeyConverters { - private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue(); - private final static byte[] QUALIFIER_SEP_BYTES = - Bytes.toBytes(QUALIFIER_SEP); - private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster"; - private final static String USER = QUALIFIER_SEP + "user"; - private final static String FLOW_NAME = - "dummy_" + QUALIFIER_SEP + "flow" + QUALIFIER_SEP; - private final static Long FLOW_RUN_ID; - private final static String APPLICATION_ID; - static { - long runid = Long.MAX_VALUE - 900L; - byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE); - byte[] byteArr = Bytes.toBytes(runid); - int sepByteLen = QUALIFIER_SEP_BYTES.length; - if (sepByteLen <= byteArr.length) { - for (int i = 0; i < sepByteLen; i++) { - byteArr[i] = (byte)(longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]); - } - } - FLOW_RUN_ID = Bytes.toLong(byteArr); - long clusterTs = System.currentTimeMillis(); - byteArr = Bytes.toBytes(clusterTs); - if (sepByteLen <= byteArr.length) { - for (int i = 0; i < sepByteLen; i++) { - byteArr[byteArr.length - sepByteLen + i] = - (byte)(longMaxByteArr[byteArr.length - sepByteLen + i] - - QUALIFIER_SEP_BYTES[i]); - } - } - clusterTs = Bytes.toLong(byteArr); - int seqId = 222; - APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString(); - } - - private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) { - int sepLen = QUALIFIER_SEP_BYTES.length; - for (int i = 0; i < sepLen; i++) { - assertTrue("Row key prefix not encoded properly.", - byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] == - QUALIFIER_SEP_BYTES[i]); - } - } - - @Test - public void testFlowActivityRowKeyConverter() { - Long ts = TimelineStorageUtils.getTopOfTheDayTimestamp(1459900830000L); - byte[] byteRowKey = FlowActivityRowKeyConverter.getInstance().encode( - new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME)); - FlowActivityRowKey rowKey = - FlowActivityRowKeyConverter.getInstance().decode(byteRowKey); - assertEquals(CLUSTER, rowKey.getClusterId()); - assertEquals(ts, rowKey.getDayTimestamp()); - assertEquals(USER, rowKey.getUserId()); - assertEquals(FLOW_NAME, rowKey.getFlowName()); - - byte[] byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode( - new FlowActivityRowKey(CLUSTER, null, null, null)); - byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); - assertEquals(2, splits.length); - assertEquals(0, splits[1].length); - assertEquals(CLUSTER, - Separator.QUALIFIERS.decode(Bytes.toString(splits[0]))); - verifyRowPrefixBytes(byteRowKeyPrefix); - - byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode( - new FlowActivityRowKey(CLUSTER, ts, null, null)); - splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE }); - assertEquals(3, splits.length); - assertEquals(0, splits[2].length); - assertEquals(CLUSTER, - Separator.QUALIFIERS.decode(Bytes.toString(splits[0]))); - assertEquals(ts, (Long) TimelineStorageUtils.invertLong( - Bytes.toLong(splits[1]))); - verifyRowPrefixBytes(byteRowKeyPrefix); - } - - @Test - public void testFlowRunRowKeyConverter() { - byte[] byteRowKey = FlowRunRowKeyConverter.getInstance().encode( - new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID)); - FlowRunRowKey rowKey = - FlowRunRowKeyConverter.getInstance().decode(byteRowKey); - assertEquals(CLUSTER, rowKey.getClusterId()); - assertEquals(USER, rowKey.getUserId()); - assertEquals(FLOW_NAME, rowKey.getFlowName()); - assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); - - byte[] byteRowKeyPrefix = FlowRunRowKeyConverter.getInstance().encode( - new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null)); - byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); - assertEquals(4, splits.length); - assertEquals(0, splits[3].length); - assertEquals(FLOW_NAME, - Separator.QUALIFIERS.decode(Bytes.toString(splits[2]))); - verifyRowPrefixBytes(byteRowKeyPrefix); - } - - @Test - public void testApplicationRowKeyConverter() { - byte[] byteRowKey = ApplicationRowKeyConverter.getInstance().encode( - new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, - APPLICATION_ID)); - ApplicationRowKey rowKey = - ApplicationRowKeyConverter.getInstance().decode(byteRowKey); - assertEquals(CLUSTER, rowKey.getClusterId()); - assertEquals(USER, rowKey.getUserId()); - assertEquals(FLOW_NAME, rowKey.getFlowName()); - assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); - assertEquals(APPLICATION_ID, rowKey.getAppId()); - - byte[] byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode( - new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, null)); - byte[][] splits = - Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, - Separator.VARIABLE_SIZE }); - assertEquals(5, splits.length); - assertEquals(0, splits[4].length); - assertEquals(FLOW_NAME, - Separator.QUALIFIERS.decode(Bytes.toString(splits[2]))); - assertEquals(FLOW_RUN_ID, (Long)TimelineStorageUtils.invertLong( - Bytes.toLong(splits[3]))); - verifyRowPrefixBytes(byteRowKeyPrefix); - - byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode( - new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, null, null)); - splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); - assertEquals(4, splits.length); - assertEquals(0, splits[3].length); - assertEquals(FLOW_NAME, - Separator.QUALIFIERS.decode(Bytes.toString(splits[2]))); - verifyRowPrefixBytes(byteRowKeyPrefix); - } - - @Test - public void testEntityRowKeyConverter() { - String entityId = "!ent!ity!!id!"; - String entityType = "entity!Type"; - byte[] byteRowKey = EntityRowKeyConverter.getInstance().encode( - new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, - entityType, entityId)); - EntityRowKey rowKey = - EntityRowKeyConverter.getInstance().decode(byteRowKey); - assertEquals(CLUSTER, rowKey.getClusterId()); - assertEquals(USER, rowKey.getUserId()); - assertEquals(FLOW_NAME, rowKey.getFlowName()); - assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); - assertEquals(APPLICATION_ID, rowKey.getAppId()); - assertEquals(entityType, rowKey.getEntityType()); - assertEquals(entityId, rowKey.getEntityId()); - - byte[] byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode( - new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, - entityType, null)); - byte[][] splits = - Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, - AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE }); - assertEquals(7, splits.length); - assertEquals(0, splits[6].length); - assertEquals(APPLICATION_ID, - AppIdKeyConverter.getInstance().decode(splits[4])); - assertEquals(entityType, Separator.QUALIFIERS.decode( - Bytes.toString(splits[5]))); - verifyRowPrefixBytes(byteRowKeyPrefix); - - byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode( - new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, - null, null)); - splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, - AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE }); - assertEquals(6, splits.length); - assertEquals(0, splits[5].length); - assertEquals(APPLICATION_ID, - AppIdKeyConverter.getInstance().decode(splits[4])); - verifyRowPrefixBytes(byteRowKeyPrefix); - } - - @Test - public void testAppToFlowRowKeyConverter() { - byte[] byteRowKey = AppToFlowRowKeyConverter.getInstance().encode( - new AppToFlowRowKey(CLUSTER, APPLICATION_ID)); - AppToFlowRowKey rowKey = - AppToFlowRowKeyConverter.getInstance().decode(byteRowKey); - assertEquals(CLUSTER, rowKey.getClusterId()); - assertEquals(APPLICATION_ID, rowKey.getAppId()); - } @Test public void testAppIdKeyConverter() { + AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter(); long currentTs = System.currentTimeMillis(); ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1); ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2); @@ -245,18 +38,19 @@ public class TestKeyConverters { String appIdStr1 = appId1.toString(); String appIdStr2 = appId2.toString(); String appIdStr3 = appId3.toString(); - byte[] appIdBytes1 = AppIdKeyConverter.getInstance().encode(appIdStr1); - byte[] appIdBytes2 = AppIdKeyConverter.getInstance().encode(appIdStr2); - byte[] appIdBytes3 = AppIdKeyConverter.getInstance().encode(appIdStr3); + byte[] appIdBytes1 = appIdKeyConverter.encode(appIdStr1); + byte[] appIdBytes2 = appIdKeyConverter.encode(appIdStr2); + byte[] appIdBytes3 = appIdKeyConverter.encode(appIdStr3); // App ids' should be encoded in a manner wherein descending order // is maintained. - assertTrue("Ordering of app ids' is incorrect", - Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 && - Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 && - Bytes.compareTo(appIdBytes2, appIdBytes3) > 0); - String decodedAppId1 = AppIdKeyConverter.getInstance().decode(appIdBytes1); - String decodedAppId2 = AppIdKeyConverter.getInstance().decode(appIdBytes2); - String decodedAppId3 = AppIdKeyConverter.getInstance().decode(appIdBytes3); + assertTrue( + "Ordering of app ids' is incorrect", + Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 + && Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 + && Bytes.compareTo(appIdBytes2, appIdBytes3) > 0); + String decodedAppId1 = appIdKeyConverter.decode(appIdBytes1); + String decodedAppId2 = appIdKeyConverter.decode(appIdBytes2); + String decodedAppId3 = appIdKeyConverter.decode(appIdBytes3); assertTrue("Decoded app id is not same as the app id encoded", appIdStr1.equals(decodedAppId1)); assertTrue("Decoded app id is not same as the app id encoded", @@ -273,21 +67,64 @@ public class TestKeyConverters { Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length); byte[] ts = Bytes.add(valSepBytes, maxByteArr); Long eventTs = Bytes.toLong(ts); - byte[] byteEventColName = EventColumnNameConverter.getInstance().encode( - new EventColumnName(eventId, eventTs, null)); + byte[] byteEventColName = + new EventColumnName(eventId, eventTs, null).getColumnQualifier(); + KeyConverter<EventColumnName> eventColumnNameConverter = + new EventColumnNameConverter(); EventColumnName eventColName = - EventColumnNameConverter.getInstance().decode(byteEventColName); + eventColumnNameConverter.decode(byteEventColName); assertEquals(eventId, eventColName.getId()); assertEquals(eventTs, eventColName.getTimestamp()); assertNull(eventColName.getInfoKey()); String infoKey = "f=oo_event_in=fo=_key"; - byteEventColName = EventColumnNameConverter.getInstance().encode( - new EventColumnName(eventId, eventTs, infoKey)); - eventColName = - EventColumnNameConverter.getInstance().decode(byteEventColName); + byteEventColName = + new EventColumnName(eventId, eventTs, infoKey).getColumnQualifier(); + eventColName = eventColumnNameConverter.decode(byteEventColName); assertEquals(eventId, eventColName.getId()); assertEquals(eventTs, eventColName.getTimestamp()); assertEquals(infoKey, eventColName.getInfoKey()); } + + @Test + public void testLongKeyConverter() { + LongKeyConverter longKeyConverter = new LongKeyConverter(); + confirmLongKeyConverter(longKeyConverter, Long.MIN_VALUE); + confirmLongKeyConverter(longKeyConverter, -1234567890L); + confirmLongKeyConverter(longKeyConverter, -128L); + confirmLongKeyConverter(longKeyConverter, -127L); + confirmLongKeyConverter(longKeyConverter, -1L); + confirmLongKeyConverter(longKeyConverter, 0L); + confirmLongKeyConverter(longKeyConverter, 1L); + confirmLongKeyConverter(longKeyConverter, 127L); + confirmLongKeyConverter(longKeyConverter, 128L); + confirmLongKeyConverter(longKeyConverter, 1234567890L); + confirmLongKeyConverter(longKeyConverter, Long.MAX_VALUE); + } + + private void confirmLongKeyConverter(LongKeyConverter longKeyConverter, + Long testValue) { + Long decoded = longKeyConverter.decode(longKeyConverter.encode(testValue)); + assertEquals(testValue, decoded); + } + + @Test + public void testStringKeyConverter() { + StringKeyConverter stringKeyConverter = new StringKeyConverter(); + String phrase = "QuackAttack now!"; + + for (int i = 0; i < phrase.length(); i++) { + String sub = phrase.substring(i, phrase.length()); + confirmStrignKeyConverter(stringKeyConverter, sub); + confirmStrignKeyConverter(stringKeyConverter, sub + sub); + } + } + + private void confirmStrignKeyConverter(StringKeyConverter stringKeyConverter, + String testValue) { + String decoded = + stringKeyConverter.decode(stringKeyConverter.encode(testValue)); + assertEquals(testValue, decoded); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org