Fixed out of memory issue during query execution, as invalid segments are not getting deleted from Btree
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/57ac4a5c Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/57ac4a5c Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/57ac4a5c Branch: refs/heads/branch-0.1 Commit: 57ac4a5c61cd775d1d0c902a4fff2563aa03dc43 Parents: 14029e2 Author: kumarvishal <kumarvishal.1...@gmail.com> Authored: Thu Sep 15 14:11:00 2016 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Sep 22 10:00:26 2016 +0530 ---------------------------------------------------------------------- .../core/carbon/datastore/BlockIndexStore.java | 60 +++++++++- .../carbon/datastore/SegmentTaskIndexStore.java | 6 +- .../apache/carbondata/core/util/CarbonUtil.java | 21 ++++ .../executor/impl/AbstractQueryExecutor.java | 13 +- .../carbondata/scan/model/QueryModel.java | 16 +++ .../carbondata/hadoop/CarbonInputFormat.java | 120 +++++++++++-------- .../carbondata/spark/rdd/CarbonScanRDD.scala | 20 +++- .../CompactionSystemLockFeatureTest.scala | 4 +- .../DataCompactionCardinalityBoundryTest.scala | 2 +- .../datacompaction/DataCompactionLockTest.scala | 2 +- .../DataCompactionNoDictionaryTest.scala | 2 +- .../datacompaction/DataCompactionTest.scala | 4 +- .../MajorCompactionIgnoreInMinorTest.scala | 4 +- .../MajorCompactionStopsAfterCompaction.scala | 4 +- .../lcm/status/SegmentStatusManager.java | 80 ++++++++----- 15 files changed, 247 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java index 07815c0..4a36373 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java @@ -20,6 +20,7 @@ package org.apache.carbondata.core.carbon.datastore; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -62,6 +63,12 @@ public class BlockIndexStore { private Map<AbsoluteTableIdentifier, Map<TableBlockInfo, AbstractIndex>> tableBlocksMap; /** + * map to maintain segment id to block info map, this map will be used to + * while removing the block from memory when segment is compacted or deleted + */ + private Map<AbsoluteTableIdentifier, Map<String, List<TableBlockInfo>>> segmentIdToBlockListMap; + + /** * map of block info to lock object map, while loading the btree this will be filled * and removed after loading the tree for that particular block info, this will be useful * while loading the tree concurrently so only block level lock will be applied another @@ -83,6 +90,7 @@ public class BlockIndexStore { tableLockMap = new ConcurrentHashMap<AbsoluteTableIdentifier, Object>( CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); blockInfoLock = new ConcurrentHashMap<TableBlockInfo, Object>(); + segmentIdToBlockListMap = new ConcurrentHashMap<>(); } /** @@ -129,6 +137,7 @@ public class BlockIndexStore { tableBlockMapTemp = new ConcurrentHashMap<TableBlockInfo, AbstractIndex>(); tableBlocksMap.put(absoluteTableIdentifier, tableBlockMapTemp); } + fillSegmentIdToTableInfoMap(tableBlocksInfos, absoluteTableIdentifier); } AbstractIndex tableBlock = null; List<Future<AbstractIndex>> blocksList = new ArrayList<Future<AbstractIndex>>(); @@ -189,6 +198,32 @@ public class BlockIndexStore { } /** + * Below method will be used to fill segment id to its block mapping map. + * it will group all the table block info based on segment id and it will fill + * + * @param tableBlockInfos table block infos + * @param absoluteTableIdentifier absolute table identifier + */ + private void fillSegmentIdToTableInfoMap(List<TableBlockInfo> tableBlockInfos, + AbsoluteTableIdentifier absoluteTableIdentifier) { + Map<String, List<TableBlockInfo>> map = segmentIdToBlockListMap.get(absoluteTableIdentifier); + if (null == map) { + map = new ConcurrentHashMap<String, List<TableBlockInfo>>(); + segmentIdToBlockListMap.put(absoluteTableIdentifier, map); + } + for (TableBlockInfo info : tableBlockInfos) { + List<TableBlockInfo> tempTableBlockInfos = map.get(info.getSegmentId()); + if (null == tempTableBlockInfos) { + tempTableBlockInfos = new ArrayList<>(); + map.put(info.getSegmentId(), tempTableBlockInfos); + } + if (!tempTableBlockInfos.contains(info)) { + tempTableBlockInfos.add(info); + } + } + } + + /** * Below method will be used to fill the loaded blocks to the array * which will be used for query execution * @@ -246,10 +281,10 @@ public class BlockIndexStore { * deletion of some of the blocks in case of retention or may be some other * scenario * - * @param removeTableBlocksInfos blocks to be removed + * @param segmentsToBeRemoved list of segments to be removed * @param absoluteTableIdentifier absolute table identifier */ - public void removeTableBlocks(List<TableBlockInfo> removeTableBlocksInfos, + public void removeTableBlocks(List<String> segmentsToBeRemoved, AbsoluteTableIdentifier absoluteTableIdentifier) { // get the lock object if lock object is not present then it is not // loaded at all @@ -260,11 +295,26 @@ public class BlockIndexStore { } Map<TableBlockInfo, AbstractIndex> map = tableBlocksMap.get(absoluteTableIdentifier); // if there is no loaded blocks then return - if (null == map) { + if (null == map || map.isEmpty()) { return; } - for (TableBlockInfo blockInfos : removeTableBlocksInfos) { - map.remove(blockInfos); + Map<String, List<TableBlockInfo>> segmentIdToBlockInfoMap = + segmentIdToBlockListMap.get(absoluteTableIdentifier); + if (null == segmentIdToBlockInfoMap || segmentIdToBlockInfoMap.isEmpty()) { + return; + } + synchronized (lockObject) { + for (String segmentId : segmentsToBeRemoved) { + List<TableBlockInfo> tableBlockInfoList = segmentIdToBlockInfoMap.remove(segmentId); + if (null == tableBlockInfoList) { + continue; + } + Iterator<TableBlockInfo> tableBlockInfoIterator = tableBlockInfoList.iterator(); + while (tableBlockInfoIterator.hasNext()) { + TableBlockInfo info = tableBlockInfoIterator.next(); + map.remove(info); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java index 50d462a..e2218a8 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java @@ -141,8 +141,8 @@ public class SegmentTaskIndexStore { synchronized (segmentLoderLockObject) { taskIdToSegmentIndexMap = tableSegmentMapTemp.get(segmentId); if (null == taskIdToSegmentIndexMap) { - // creating a map of take if to table segment - taskIdToSegmentIndexMap = new HashMap<String, AbstractIndex>(); + // creating a map of task id to table segment + taskIdToSegmentIndexMap = new ConcurrentHashMap<String, AbstractIndex>(); Iterator<Entry<String, List<TableBlockInfo>>> iterator = taskIdToTableBlockInfoMap.entrySet().iterator(); while (iterator.hasNext()) { @@ -256,7 +256,7 @@ public class SegmentTaskIndexStore { private Map<String, List<TableBlockInfo>> mappedAndGetTaskIdToTableBlockInfo( Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos) { Map<String, List<TableBlockInfo>> taskIdToTableBlockInfoMap = - new HashMap<String, List<TableBlockInfo>>(); + new ConcurrentHashMap<String, List<TableBlockInfo>>(); Iterator<Entry<String, List<TableBlockInfo>>> iterator = segmentToTableBlocksInfos.entrySet().iterator(); while (iterator.hasNext()) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 82c515c..5168208 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1431,5 +1431,26 @@ public final class CarbonUtil { return builder.toString(); } + /** + * Below method will be used to get the list of segment in + * comma separated string format + * + * @param segmentList + * @return comma separated segment string + */ + public static String getSegmentString(List<String> segmentList) { + if (segmentList.isEmpty()) { + return ""; + } + StringBuilder segmentStringbuilder = new StringBuilder(); + for (int i = 0; i < segmentList.size() - 1; i++) { + String segmentNo = segmentList.get(i); + segmentStringbuilder.append(segmentNo); + segmentStringbuilder.append(","); + } + segmentStringbuilder.append(segmentList.get(segmentList.size() - 1)); + return segmentStringbuilder.toString(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java index dab8a23..c31824f 100644 --- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java @@ -18,12 +18,7 @@ */ package org.apache.carbondata.scan.executor.impl; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -101,8 +96,12 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { // query execution Collections.sort(queryModel.getTableBlockInfos()); // get the table blocks + BlockIndexStore blockLoaderInstance = BlockIndexStore.getInstance(); + // remove the invalid table blocks, block which is deleted or compacted + blockLoaderInstance.removeTableBlocks(queryModel.getInvalidSegmentIds(), + queryModel.getAbsoluteTableIdentifier()); try { - queryProperties.dataBlocks = BlockIndexStore.getInstance() + queryProperties.dataBlocks = blockLoaderInstance .loadAndGetBlocks(queryModel.getTableBlockInfos(), queryModel.getAbsoluteTableIdentifier()); } catch (IndexBuilderException e) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java index 81eb728..1c819a2 100644 --- a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java +++ b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java @@ -132,6 +132,13 @@ public class QueryModel implements Serializable { private QueryStatisticsRecorder statisticsRecorder; + /** + * Invalid table blocks, which need to be removed from + * memory, invalid blocks can be segment which are deleted + * or compacted + */ + private List<String> invalidSegmentIds; + public QueryModel() { tableBlockInfos = new ArrayList<TableBlockInfo>(); queryDimension = new ArrayList<QueryDimension>(); @@ -139,6 +146,7 @@ public class QueryModel implements Serializable { sortDimension = new ArrayList<QueryDimension>(); sortOrder = new byte[0]; paritionColumns = new ArrayList<String>(); + invalidSegmentIds = new ArrayList<>(); } public static QueryModel createModel(AbsoluteTableIdentifier absoluteTableIdentifier, @@ -504,4 +512,12 @@ public class QueryModel implements Serializable { public void setStatisticsRecorder(QueryStatisticsRecorder statisticsRecorder) { this.statisticsRecorder = statisticsRecorder; } + + public List<String> getInvalidSegmentIds() { + return invalidSegmentIds; + } + + public void setInvalidSegmentIds(List<String> invalidSegmentIds) { + this.invalidSegmentIds = invalidSegmentIds; + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index da697ad..8fa1bb6 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -54,6 +54,7 @@ import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl; import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; @@ -93,6 +94,9 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.util.StringUtils; + + + /** * Carbon Input format class representing one carbon table */ @@ -236,21 +240,29 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { /** * Set List of segments to access */ - private void setSegmentsToAccess(Configuration configuration, List<String> segmentNosList) { - - //serialize to comma separated string - StringBuilder stringSegmentsBuilder = new StringBuilder(); - for (int i = 0; i < segmentNosList.size(); i++) { - String segmentNo = segmentNosList.get(i); - stringSegmentsBuilder.append(segmentNo); - if (i < segmentNosList.size() - 1) { - stringSegmentsBuilder.append(","); - } - } - configuration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, stringSegmentsBuilder.toString()); + public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) { + configuration + .set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments)); } /** + * Below method will be used to set the segments details if + * segments are not added in the configuration + * + * @param job + * @param absoluteTableIdentifier + * @throws IOException + */ + private void addSegmentsIfEmpty(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier) + throws IOException { + if (getSegmentsFromConfiguration(job).length == 0) { + // Get the valid segments from the carbon store. + SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = + new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments(); + setSegmentsToAccess(job.getConfiguration(), validAndInvalidSegments.getValidSegments()); + } + } + /** * {@inheritDoc} * Configurations FileInputFormat.INPUT_DIR * are used to get table path to read. @@ -263,25 +275,17 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { try { CarbonTable carbonTable = getCarbonTable(job.getConfiguration()); Object filterPredicates = getFilterPredicates(job.getConfiguration()); - if (getValidSegments(job).length == 0) { - // Get the valid segments from the carbon store. - SegmentStatusManager.ValidSegmentsInfo validSegments = - new SegmentStatusManager(getAbsoluteTableIdentifier(job.getConfiguration())) - .getValidSegments(); - if (validSegments.listOfValidSegments.isEmpty()) { - return new ArrayList<InputSplit>(); - } - setSegmentsToAccess(job.getConfiguration(), validSegments.listOfValidSegments); - } - + AbsoluteTableIdentifier absoluteTableIdentifier = + getAbsoluteTableIdentifier(job.getConfiguration()); + addSegmentsIfEmpty(job, absoluteTableIdentifier); if (filterPredicates == null) { return getSplitsNonFilter(job); } else { if (filterPredicates instanceof Expression) { //process and resolve the expression. CarbonInputFormatUtil.processFilterExpression((Expression) filterPredicates, carbonTable); - return getSplits(job, CarbonInputFormatUtil.resolveFilter((Expression) filterPredicates, - getAbsoluteTableIdentifier(job.getConfiguration()))); + return getSplits(job, CarbonInputFormatUtil + .resolveFilter((Expression) filterPredicates, absoluteTableIdentifier)); } else { //It means user sets already resolved expression. return getSplits(job, (FilterResolverIntf) filterPredicates); @@ -339,7 +343,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { getAbsoluteTableIdentifier(job.getConfiguration()); //for each segment fetch blocks matching filter in Driver BTree - for (String segmentNo : getValidSegments(job)) { + for (String segmentNo : getSegmentsFromConfiguration(job)) { List<DataRefNode> dataRefNodes = getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier, filterResolver, segmentNo); @@ -365,11 +369,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { long rowCount = 0; AbsoluteTableIdentifier absoluteTableIdentifier = getAbsoluteTableIdentifier(job.getConfiguration()); - SegmentStatusManager.ValidSegmentsInfo validSegments = - new SegmentStatusManager(getAbsoluteTableIdentifier(job.getConfiguration())) - .getValidSegments(); - setSegmentsToAccess(job.getConfiguration(), validSegments.listOfValidSegments); // no of core to load the blocks in driver + addSegmentsIfEmpty(job, absoluteTableIdentifier); int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE; try { numberOfCores = Integer.parseInt(CarbonProperties.getInstance() @@ -382,7 +383,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { List<Future<Map<String, AbstractIndex>>> loadedBlocks = new ArrayList<Future<Map<String, AbstractIndex>>>(); //for each segment fetch blocks matching filter in Driver BTree - for (String segmentNo : getValidSegments(job)) { + for (String segmentNo : getSegmentsFromConfiguration(job)) { // submitting the task loadedBlocks .add(threadPool.submit(new BlocksLoaderThread(job, absoluteTableIdentifier, segmentNo))); @@ -491,6 +492,39 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { return resultFilterredBlocks; } + /** + * Below method will be used to get the table block info + * + * @param job job context + * @param absoluteTableIdentifier absolute table identifier + * @param segmentId number of segment id + * @return list of table block + * @throws IOException + */ + private List<TableBlockInfo> getTableBlockInfo(JobContext job, + AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) throws IOException { + // List<FileStatus> fileStatusList = new LinkedList<FileStatus>(); + List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>(); + // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList); + + // get file location of all files of given segment + JobContext newJob = + new JobContextImpl(new Configuration(job.getConfiguration()), job.getJobID()); + newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, segmentId + ""); + + // identify table blocks + for (InputSplit inputSplit : getSplitsInternal(newJob)) { + CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit; + BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0, + carbonInputSplit.getNumberOfBlocklets()); + tableBlockInfoList.add( + new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(), + segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength(), + blockletInfos)); + } + return tableBlockInfoList; + } + private Map<String, AbstractIndex> getSegmentAbstractIndexs(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) throws IOException, IndexBuilderException { @@ -500,25 +534,10 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { // if segment tree is not loaded, load the segment tree if (segmentIndexMap == null) { // List<FileStatus> fileStatusList = new LinkedList<FileStatus>(); - List<TableBlockInfo> tableBlockInfoList = new LinkedList<TableBlockInfo>(); + List<TableBlockInfo> tableBlockInfoList = + getTableBlockInfo(job, absoluteTableIdentifier, segmentId); // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList); - // get file location of all files of given segment - JobContext newJob = - new JobContextImpl(new Configuration(job.getConfiguration()), job.getJobID()); - newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, segmentId + ""); - - // identify table blocks - for (InputSplit inputSplit : getSplitsInternal(newJob)) { - CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit; - BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0, - carbonInputSplit.getNumberOfBlocklets()); - tableBlockInfoList.add( - new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(), - segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength(), - blockletInfos)); - } - Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>(); segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList); @@ -621,7 +640,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { @Override protected List<FileStatus> listStatus(JobContext job) throws IOException { List<FileStatus> result = new ArrayList<FileStatus>(); - String[] segmentsToConsider = getValidSegments(job); + String[] segmentsToConsider = getSegmentsFromConfiguration(job); if (segmentsToConsider.length == 0) { throw new IOException("No segments found"); } @@ -703,7 +722,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { /** * @return updateExtension */ - private String[] getValidSegments(JobContext job) throws IOException { + private String[] getSegmentsFromConfiguration(JobContext job) + throws IOException { String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); // if no segments if (segmentString.trim().isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 497d9f8..5ee3e83 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.InputSplit import org.apache.hadoop.mapreduce.Job import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD @@ -32,9 +33,11 @@ import org.apache.carbondata.common.CarbonIterator import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.Dictionary import org.apache.carbondata.core.carbon.datastore.block.{BlockletInfos, TableBlockInfo} -import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder} +import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore +import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants} import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit} +import org.apache.carbondata.lcm.status.SegmentStatusManager import org.apache.carbondata.scan.executor.QueryExecutorFactory import org.apache.carbondata.scan.expression.Expression import org.apache.carbondata.scan.model.QueryModel @@ -82,15 +85,24 @@ class CarbonScanRDD[V: ClassTag]( val result = new util.ArrayList[Partition](defaultParallelism) val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val validAndInvalidSegments = new SegmentStatusManager(queryModel.getAbsoluteTableIdentifier) + .getValidAndInvalidSegments // set filter resolver tree try { // before applying filter check whether segments are available in the table. - val splits = carbonInputFormat.getSplits(job) - if (!splits.isEmpty) { + if (!validAndInvalidSegments.getValidSegments.isEmpty) { val filterResolver = carbonInputFormat .getResolvedFilter(job.getConfiguration, filterExpression) CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver) queryModel.setFilterExpressionResolverTree(filterResolver) + CarbonInputFormat + .setSegmentsToAccess(job.getConfiguration, + validAndInvalidSegments.getValidSegments + ) + SegmentTaskIndexStore.getInstance() + .removeTableBlocks(validAndInvalidSegments.getInvalidSegments, + queryModel.getAbsoluteTableIdentifier + ) } } catch { @@ -102,7 +114,7 @@ class CarbonScanRDD[V: ClassTag]( val splits = carbonInputFormat.getSplits(job) if (!splits.isEmpty) { val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) - + queryModel.setInvalidSegmentIds(validAndInvalidSegments.getInvalidSegments) val blockListTemp = carbonInputSplits.map(inputSplit => new TableBlockInfo(inputSplit.getPath.toString, inputSplit.getStart, inputSplit.getSegmentId, http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala index ae29650..d9e1349 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala @@ -118,7 +118,7 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll { ) ) // merged segment should not be there - val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList assert(segments.contains("0.1")) assert(!segments.contains("0")) assert(!segments.contains("1")) @@ -130,7 +130,7 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll { ) ) // merged segment should not be there - val segments2 = segmentStatusManager2.getValidSegments.listOfValidSegments.asScala.toList + val segments2 = segmentStatusManager2.getValidAndInvalidSegments.getValidSegments.asScala.toList assert(segments2.contains("0.1")) assert(!segments2.contains("0")) assert(!segments2.contains("1")) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala index 7bbee54..4ec00ac 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala @@ -92,7 +92,7 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter new CarbonTableIdentifier("default", "cardinalityTest", "1") ) ) - val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList if (!segments.contains("0.1")) { // wait for 2 seconds for compaction to complete. http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala index b01e66b..4eb873a 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala @@ -104,7 +104,7 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll { val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager( absoluteTableIdentifier ) - val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList if (!segments.contains("0.1")) { // wait for 2 seconds for compaction to complete. http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala index 09fb427..17fc1e5 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala @@ -44,7 +44,7 @@ class DataCompactionNoDictionaryTest extends QueryTest with BeforeAndAfterAll { new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId) ) ) - val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList segments } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala index 7737745..39dba52 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala @@ -90,7 +90,7 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll { new CarbonTableIdentifier("default", "normalcompaction", "1") ) ) - val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList if (!segments.contains("0.1")) { // wait for 2 seconds for compaction to complete. @@ -138,7 +138,7 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll { ) ) // merged segment should not be there - val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.toList assert(!segments.contains("0")) assert(!segments.contains("1")) assert(!segments.contains("2")) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala index a062153..9fe178f 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala @@ -103,7 +103,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll new CarbonTableIdentifier("default", "ignoremajor", noOfRetries + "") ) ) - val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList segments.foreach(seg => System.out.println( "valid segment is =" + seg) ) @@ -135,7 +135,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll ) ) // merged segment should not be there - val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList assert(segments.contains("0.1")) assert(segments.contains("2.1")) assert(!segments.contains("2")) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala index 2b0afe6..3e51002 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala @@ -93,7 +93,7 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA new CarbonTableIdentifier("default", "stopmajor", noOfRetries + "") ) ) - val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList segments.foreach(seg => System.out.println( "valid segment is =" + seg) ) @@ -125,7 +125,7 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA ) ) // merged segment should not be there - val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList assert(segments.contains("0.1")) assert(!segments.contains("0.2")) assert(!segments.contains("0")) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/57ac4a5c/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java index 2b83b11..25e81f8 100644 --- a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java +++ b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java @@ -49,7 +49,6 @@ import org.apache.carbondata.lcm.locks.ICarbonLock; import org.apache.carbondata.lcm.locks.LockUsage; import com.google.gson.Gson; - /** * Manages Load/Segment status */ @@ -64,17 +63,6 @@ public class SegmentStatusManager { this.absoluteTableIdentifier = absoluteTableIdentifier; } - public static class ValidSegmentsInfo { - public final List<String> listOfValidSegments; - public final List<String> listOfValidUpdatedSegments; - - public ValidSegmentsInfo(List<String> listOfValidSegments, - List<String> listOfValidUpdatedSegments) { - this.listOfValidSegments = listOfValidSegments; - this.listOfValidUpdatedSegments = listOfValidUpdatedSegments; - } - } - /** * This will return the lock object used to lock the table status file before updation. * @@ -89,9 +77,9 @@ public class SegmentStatusManager { * This method will return last modified time of tablestatus file */ public long getTableStatusLastModifiedTime() throws IOException { - String tableStatusPath = CarbonStorePath.getCarbonTablePath( - absoluteTableIdentifier.getStorePath(), absoluteTableIdentifier.getCarbonTableIdentifier()) - .getTableStatusFilePath(); + String tableStatusPath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()).getTableStatusFilePath(); if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) { return 0L; } else { @@ -102,14 +90,16 @@ public class SegmentStatusManager { /** * get valid segment for given table + * * @return * @throws IOException */ - public ValidSegmentsInfo getValidSegments() throws IOException { + public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException { // @TODO: move reading LoadStatus file to separate class List<String> listOfValidSegments = new ArrayList<String>(10); List<String> listOfValidUpdatedSegments = new ArrayList<String>(10); + List<String> listOfInvalidSegments = new ArrayList<String>(10); CarbonTablePath carbonTablePath = CarbonStorePath .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), absoluteTableIdentifier.getCarbonTableIdentifier()); @@ -157,8 +147,15 @@ public class SegmentStatusManager { listOfValidUpdatedSegments.add(loadMetadataDetails.getLoadName()); } listOfValidSegments.add(loadMetadataDetails.getLoadName()); - + } else if ((CarbonCommonConstants.STORE_LOADSTATUS_FAILURE + .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()) + || CarbonCommonConstants.SEGMENT_COMPACTED + .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()) + || CarbonCommonConstants.MARKED_FOR_DELETE + .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()))) { + listOfInvalidSegments.add(loadMetadataDetails.getLoadName()); } + } } } catch (IOException e) { @@ -176,7 +173,8 @@ public class SegmentStatusManager { } } - return new ValidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments); + return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments, + listOfInvalidSegments); } /** @@ -218,6 +216,7 @@ public class SegmentStatusManager { /** * returns current time + * * @return */ private String readCurrentTime() { @@ -244,6 +243,7 @@ public class SegmentStatusManager { /** * updates deletion status + * * @param loadIds * @param tableFolderPath * @return @@ -271,13 +271,10 @@ public class SegmentStatusManager { listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath); if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) { updateDeletionStatus(loadIds, listOfLoadFolderDetailsArray, invalidLoadIds); - if(invalidLoadIds.isEmpty()) - { + if (invalidLoadIds.isEmpty()) { // All or None , if anything fails then dont write writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray); - } - else - { + } else { return invalidLoadIds; } @@ -330,13 +327,11 @@ public class SegmentStatusManager { // read existing metadata details in load metadata. listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath); if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) { - updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray, - invalidLoadTimestamps, loadStartTime); - if(invalidLoadTimestamps.isEmpty()) { + updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray, invalidLoadTimestamps, + loadStartTime); + if (invalidLoadTimestamps.isEmpty()) { writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray); - } - else - { + } else { return invalidLoadTimestamps; } @@ -395,6 +390,7 @@ public class SegmentStatusManager { /** * updates deletion status details for each load and returns invalidLoadIds + * * @param loadIds * @param listOfLoadFolderDetailsArray * @param invalidLoadIds @@ -503,17 +499,39 @@ public class SegmentStatusManager { /** * unlocks given file + * * @param carbonLock */ private void fileUnlock(ICarbonLock carbonLock) { if (carbonLock.unlock()) { LOG.info("Metadata lock has been successfully released"); } else { - LOG - .error("Not able to release the metadata lock"); + LOG.error("Not able to release the metadata lock"); } } + public static class ValidAndInvalidSegmentsInfo { + private final List<String> listOfValidSegments; + private final List<String> listOfValidUpdatedSegments; + private final List<String> listOfInvalidSegments; + + private ValidAndInvalidSegmentsInfo(List<String> listOfValidSegments, + List<String> listOfValidUpdatedSegments, List<String> listOfInvalidUpdatedSegments) { + this.listOfValidSegments = listOfValidSegments; + this.listOfValidUpdatedSegments = listOfValidUpdatedSegments; + this.listOfInvalidSegments = listOfInvalidUpdatedSegments; + } + + public List<String> getInvalidSegments() { + return listOfInvalidSegments; + } + public List<String> getValidSegments() { + return listOfValidSegments; + } + public List<String> getUpadtedSegments() { + return listOfValidUpdatedSegments; + } + } }