This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 18efd84 [CARBONDATA-3879] Filtering Segments Optimazation 18efd84 is described below commit 18efd84184773e8734eb8b83b1e065d5a14126c6 Author: haomarch <marchp...@126.com> AuthorDate: Mon Jun 29 21:53:24 2020 +0800 [CARBONDATA-3879] Filtering Segments Optimazation Why is this PR needed? During filter segments flow, there are a lot of LIST.CONTAINS, which has heavy time overhead when there are tens of thousands segments. For example, if there are 50000 segments. it will trigger LIST.CONTAINS for each segment, the LIST also has about 50000 elements. so the time complexity will be O(50000 * 50000 ) What changes were proposed in this PR? Change List.CONTAINS to MAP.containsKEY Does this PR introduce any user interface change? No Is any new testcase added? No This closes #3816 --- .../apache/carbondata/core/index/TableIndex.java | 3 +- .../hadoop/api/CarbonTableInputFormat.java | 61 +++++++++------------- 2 files changed, 28 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java index a76b533..7aa5645 100644 --- a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java +++ b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java @@ -206,7 +206,8 @@ public final class TableIndex extends OperationEventListener { Set<Path> partitionLocations, List<ExtendedBlocklet> blocklets, Map<Segment, List<Index>> indexes) throws IOException { for (Segment segment : segments) { - if (indexes.get(segment).isEmpty() || indexes.get(segment) == null) { + if (segment == null || + indexes.get(segment) == null || indexes.get(segment).isEmpty()) { continue; } boolean isExternalSegment = segment.getSegmentPath() != null; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index bca03f8..2d06222 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import org.apache.carbondata.common.exceptions.DeprecatedFeatureException; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -64,6 +65,7 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.CarbonInputSplit; +import com.google.common.collect.Sets; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -232,47 +234,36 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { private List<Segment> getFilteredSegment(JobContext job, List<Segment> validSegments, boolean validationRequired, ReadCommittedScope readCommittedScope) { Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope); - List<Segment> segmentToAccessSet = - new ArrayList<>(new HashSet<>(Arrays.asList(segmentsToAccess))); - List<Segment> filteredSegmentToAccess = new ArrayList<>(); if (segmentsToAccess.length == 0 || segmentsToAccess[0].getSegmentNo().equalsIgnoreCase("*")) { - filteredSegmentToAccess.addAll(validSegments); - } else { - for (Segment validSegment : validSegments) { - int index = segmentToAccessSet.indexOf(validSegment); - if (index > -1) { - // In case of in progress reading segment, segment file name is set to the property itself - if (segmentToAccessSet.get(index).getSegmentFileName() != null - && validSegment.getSegmentFileName() == null) { - filteredSegmentToAccess.add(segmentToAccessSet.get(index)); - } else { - filteredSegmentToAccess.add(validSegment); - } - } - } - if (filteredSegmentToAccess.size() != segmentToAccessSet.size() && !validationRequired) { - for (Segment segment : segmentToAccessSet) { - if (!filteredSegmentToAccess.contains(segment)) { - filteredSegmentToAccess.add(segment); - } + return validSegments; + } + Map<String, Segment> segmentToAccessMap = Arrays.stream(segmentsToAccess) + .collect(Collectors.toMap(Segment::getSegmentNo, segment -> segment, (e1, e2) -> e1)); + Map<String, Segment> filteredSegmentToAccess = new HashMap<>(segmentToAccessMap.size()); + for (Segment validSegment : validSegments) { + String segmentNoOfValidSegment = validSegment.getSegmentNo(); + if (segmentToAccessMap.containsKey(segmentNoOfValidSegment)) { + Segment segmentToAccess = segmentToAccessMap.get(segmentNoOfValidSegment); + if (segmentToAccess.getSegmentFileName() != null && + validSegment.getSegmentFileName() == null) { + validSegment = segmentToAccess; } + filteredSegmentToAccess.put(segmentNoOfValidSegment, validSegment); } - // TODO: add validation for set segments access based on valid segments in table status - if (filteredSegmentToAccess.size() != segmentToAccessSet.size() && !validationRequired) { - for (Segment segment : segmentToAccessSet) { - if (!filteredSegmentToAccess.contains(segment)) { - filteredSegmentToAccess.add(segment); - } + } + if (!validationRequired && filteredSegmentToAccess.size() != segmentToAccessMap.size()) { + for (Segment segment : segmentToAccessMap.values()) { + if (!filteredSegmentToAccess.containsKey(segment.getSegmentNo())) { + filteredSegmentToAccess.put(segment.getSegmentNo(), segment); } } - if (!filteredSegmentToAccess.containsAll(segmentToAccessSet)) { - List<Segment> filteredSegmentToAccessTemp = new ArrayList<>(filteredSegmentToAccess); - filteredSegmentToAccessTemp.removeAll(segmentToAccessSet); - LOG.info( - "Segments ignored are : " + Arrays.toString(filteredSegmentToAccessTemp.toArray())); - } } - return filteredSegmentToAccess; + if (LOG.isDebugEnabled()) { + LOG.debug("Segments ignored are : " + + Arrays.toString(Sets.difference(new HashSet<>(filteredSegmentToAccess.values()), + new HashSet<>(segmentToAccessMap.values())).toArray())); + } + return new ArrayList<>(filteredSegmentToAccess.values()); } public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments,