[CARBONDATA-982] Fixed Bug For NotIn Clause In Presto This closes #1062
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0d469761 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0d469761 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0d469761 Branch: refs/heads/datamap Commit: 0d46976105239dd9ab94b83c3f0ca287b34fee8d Parents: 27d520c Author: Geetika gupta <geetika.gu...@knoldus.in> Authored: Fri Jun 16 13:07:52 2017 +0530 Committer: chenliang613 <chenliang...@apache.org> Committed: Mon Jul 3 17:43:56 2017 +0800 ---------------------------------------------------------------------- .../carbondata/presto/CarbondataRecordSet.java | 5 +- .../presto/CarbondataRecordSetProvider.java | 42 ++++----- .../presto/CarbondataSplitManager.java | 96 ++++++++++++-------- 3 files changed, 84 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d469761/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java index a28342e..d75cbfb 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java @@ -21,6 +21,7 @@ import com.facebook.presto.spi.*; import com.facebook.presto.spi.predicate.TupleDomain; import com.facebook.presto.spi.type.Type; import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.datastore.block.BlockletInfos; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -83,9 +84,9 @@ public class CarbondataRecordSet implements RecordSet { tableBlockInfoList.add(new TableBlockInfo(split.getLocalInputSplit().getPath().toString(), split.getLocalInputSplit().getStart(), split.getLocalInputSplit().getSegmentId(), split.getLocalInputSplit().getLocations().toArray(new String[0]), - split.getLocalInputSplit().getLength(), + split.getLocalInputSplit().getLength(),new BlockletInfos(), //blockletInfos, - ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion()), null)); + ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion()),null)); queryModel.setTableBlockInfos(tableBlockInfoList); queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d469761/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java index a9652cc..71649f3 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java @@ -142,13 +142,13 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider { } List<Object> singleValues = new ArrayList<>(); - List<Expression> rangeFilter = new ArrayList<>(); + List<Expression> disjuncts = new ArrayList<>(); for (Range range : domain.getValues().getRanges().getOrderedRanges()) { checkState(!range.isAll()); // Already checked if (range.isSingleValue()) { singleValues.add(range.getLow().getValue()); } else { - List<String> rangeConjuncts = new ArrayList<>(); + List<Expression> rangeConjuncts = new ArrayList<>(); if (!range.getLow().isLowerUnbounded()) { Object value = ConvertDataByType(range.getLow().getValue(), type); switch (range.getLow().getBound()) { @@ -157,15 +157,15 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider { //todo not now } else { GreaterThanExpression greater = new GreaterThanExpression(colExpression, - new LiteralExpression(value, coltype)); - rangeFilter.add(greater); + new LiteralExpression(value, coltype)); + rangeConjuncts.add(greater); } break; case EXACTLY: GreaterThanEqualToExpression greater = - new GreaterThanEqualToExpression(colExpression, - new LiteralExpression(value, coltype)); - rangeFilter.add(greater); + new GreaterThanEqualToExpression(colExpression, + new LiteralExpression(value, coltype)); + rangeConjuncts.add(greater); break; case BELOW: throw new IllegalArgumentException("Low marker should never use BELOW bound"); @@ -180,21 +180,21 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider { throw new IllegalArgumentException("High marker should never use ABOVE bound"); case EXACTLY: LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, - new LiteralExpression(value, coltype)); - rangeFilter.add(less); + new LiteralExpression(value, coltype)); + rangeConjuncts.add(less); break; case BELOW: LessThanExpression less2 = - new LessThanExpression(colExpression, new LiteralExpression(value, coltype)); - rangeFilter.add(less2); + new LessThanExpression(colExpression, new LiteralExpression(value, coltype)); + rangeConjuncts.add(less2); break; default: throw new AssertionError("Unhandled bound: " + range.getHigh().getBound()); } } + disjuncts.addAll(rangeConjuncts); } } - if (singleValues.size() == 1) { Expression ex = null; if (coltype.equals(DataType.STRING)) { @@ -215,25 +215,25 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider { candidates = new ListExpression(exs); if (candidates != null) filters.add(new InExpression(colExpression, candidates)); - } else if (rangeFilter.size() > 0) { - if (rangeFilter.size() > 1) { - Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1)); - if (rangeFilter.size() > 2) { - for (int i = 2; i < rangeFilter.size(); i++) { - filters.add(new AndExpression(finalFilters, rangeFilter.get(i))); + } else if (disjuncts.size() > 0) { + if (disjuncts.size() > 1) { + Expression finalFilters = new OrExpression(disjuncts.get(0), disjuncts.get(1)); + if (disjuncts.size() > 2) { + for (int i = 2; i < disjuncts.size(); i++) { + filters.add(new AndExpression(finalFilters, disjuncts.get(i))); } } - } else if (rangeFilter.size() == 1) filters.add(rangeFilter.get(0)); + } else if (disjuncts.size() == 1) filters.add(disjuncts.get(0)); } } Expression finalFilters; List<Expression> tmp = filters.build(); if (tmp.size() > 1) { - finalFilters = new AndExpression(tmp.get(0), tmp.get(1)); + finalFilters = new OrExpression(tmp.get(0), tmp.get(1)); if (tmp.size() > 2) { for (int i = 2; i < tmp.size(); i++) { - finalFilters = new AndExpression(finalFilters, tmp.get(i)); + finalFilters = new OrExpression(finalFilters, tmp.get(i)); } } } else if (tmp.size() == 1) finalFilters = tmp.get(0); http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d469761/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java index e39ee58..0ce0600 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java @@ -17,38 +17,59 @@ package org.apache.carbondata.presto; +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.scan.expression.ColumnExpression; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.expression.LiteralExpression; +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; +import org.apache.carbondata.core.scan.expression.conditional.GreaterThanEqualToExpression; +import org.apache.carbondata.core.scan.expression.conditional.GreaterThanExpression; +import org.apache.carbondata.core.scan.expression.conditional.InExpression; +import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression; +import org.apache.carbondata.core.scan.expression.conditional.LessThanExpression; +import org.apache.carbondata.core.scan.expression.conditional.ListExpression; +import org.apache.carbondata.core.scan.expression.logical.AndExpression; +import org.apache.carbondata.core.scan.expression.logical.OrExpression; import org.apache.carbondata.presto.impl.CarbonLocalInputSplit; import org.apache.carbondata.presto.impl.CarbonTableCacheModel; import org.apache.carbondata.presto.impl.CarbonTableReader; -import com.facebook.presto.spi.*; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.connector.ConnectorSplitManager; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.facebook.presto.spi.predicate.Domain; import com.facebook.presto.spi.predicate.Range; import com.facebook.presto.spi.predicate.TupleDomain; -import com.facebook.presto.spi.type.*; +import com.facebook.presto.spi.type.BigintType; +import com.facebook.presto.spi.type.BooleanType; +import com.facebook.presto.spi.type.DateType; +import com.facebook.presto.spi.type.DecimalType; +import com.facebook.presto.spi.type.DoubleType; +import com.facebook.presto.spi.type.IntegerType; +import com.facebook.presto.spi.type.SmallintType; +import com.facebook.presto.spi.type.TimestampType; +import com.facebook.presto.spi.type.Type; +import com.facebook.presto.spi.type.VarcharType; import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.scan.expression.ColumnExpression; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.scan.expression.LiteralExpression; -import org.apache.carbondata.core.scan.expression.conditional.*; -import org.apache.carbondata.core.scan.expression.logical.AndExpression; -import org.apache.carbondata.core.scan.expression.logical.OrExpression; - -import javax.inject.Inject; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import static org.apache.carbondata.presto.Types.checkType; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; +import static org.apache.carbondata.presto.Types.checkType; /** * Build Carbontable splits @@ -152,13 +173,14 @@ public class CarbondataSplitManager implements ConnectorSplitManager { } List<Object> singleValues = new ArrayList<>(); - List<Expression> rangeFilter = new ArrayList<>(); + + List<Expression> disjuncts = new ArrayList<>(); + for (Range range : domain.getValues().getRanges().getOrderedRanges()) { - checkState(!range.isAll()); // Already checked if (range.isSingleValue()) { singleValues.add(range.getLow().getValue()); } else { - List<String> rangeConjuncts = new ArrayList<>(); + List<Expression> rangeConjuncts = new ArrayList<>(); if (!range.getLow().isLowerUnbounded()) { Object value = ConvertDataByType(range.getLow().getValue(), type); switch (range.getLow().getBound()) { @@ -168,14 +190,14 @@ public class CarbondataSplitManager implements ConnectorSplitManager { } else { GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype)); - rangeFilter.add(greater); + rangeConjuncts.add(greater); } break; case EXACTLY: GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype)); - rangeFilter.add(greater); + rangeConjuncts.add(greater); break; case BELOW: throw new IllegalArgumentException("Low marker should never use BELOW bound"); @@ -191,17 +213,18 @@ public class CarbondataSplitManager implements ConnectorSplitManager { case EXACTLY: LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype)); - rangeFilter.add(less); + rangeConjuncts.add(less); break; case BELOW: LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype)); - rangeFilter.add(less2); + rangeConjuncts.add(less2); break; default: throw new AssertionError("Unhandled bound: " + range.getHigh().getBound()); } } + disjuncts.addAll(rangeConjuncts); } } @@ -221,26 +244,26 @@ public class CarbondataSplitManager implements ConnectorSplitManager { candidates = new ListExpression(exs); if (candidates != null) filters.add(new InExpression(colExpression, candidates)); - } else if (rangeFilter.size() > 0) { - if (rangeFilter.size() > 1) { - Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1)); - if (rangeFilter.size() > 2) { - for (int i = 2; i < rangeFilter.size(); i++) { - filters.add(new AndExpression(finalFilters, rangeFilter.get(i))); + } else if (disjuncts.size() > 0) { + if (disjuncts.size() > 1) { + Expression finalFilters = new OrExpression(disjuncts.get(0), disjuncts.get(1)); + if (disjuncts.size() > 2) { + for (int i = 2; i < disjuncts.size(); i++) { + filters.add(new AndExpression(finalFilters, disjuncts.get(i))); } } - } else if (rangeFilter.size() == 1)//only have one value - filters.add(rangeFilter.get(0)); + } else if (disjuncts.size() == 1)//only have one value + filters.add(disjuncts.get(0)); } } Expression finalFilters; List<Expression> tmp = filters.build(); if (tmp.size() > 1) { - finalFilters = new AndExpression(tmp.get(0), tmp.get(1)); + finalFilters = new OrExpression(tmp.get(0), tmp.get(1)); if (tmp.size() > 2) { for (int i = 2; i < tmp.size(); i++) { - finalFilters = new AndExpression(finalFilters, tmp.get(i)); + finalFilters = new OrExpression(finalFilters, tmp.get(i)); } } } else if (tmp.size() == 1) finalFilters = tmp.get(0); @@ -252,6 +275,7 @@ public class CarbondataSplitManager implements ConnectorSplitManager { /** * Convert presto spi Type into Carbondata Type + * * @param colType * @return */