Repository: drill
Updated Branches:
  refs/heads/master 7bfcb40a0 -> 0a2518d7c


DRILL-4363: Row count based pruning for parquet table used in Limit n query.

Modify two existint unit testcase:
1) TestPartitionFilter.testMainQueryFalseCondition(): rowCount pruning applied 
after false condition is transformed into LIMIT 0
2) TestLimitWithExchanges.testPushLimitPastUnionExchange(): modify the testcase 
to use Json source, so that it does not mix with PushLimitIntoScanRule.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0a2518d7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0a2518d7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0a2518d7

Branch: refs/heads/master
Commit: 0a2518d7cf01a92a27a82e29edac5424bedf31d5
Parents: 7bfcb40
Author: Jinfeng Ni <j...@apache.org>
Authored: Tue Feb 2 15:31:47 2016 -0800
Committer: Jinfeng Ni <j...@apache.org>
Committed: Thu Feb 11 15:01:15 2016 -0800

----------------------------------------------------------------------
 .../exec/physical/base/AbstractGroupScan.java   |  21 ++++
 .../drill/exec/physical/base/GroupScan.java     |  12 +++
 .../logical/DrillPushLimitToScanRule.java       | 108 +++++++++++++++++++
 .../exec/planner/logical/DrillRuleSets.java     |   4 +-
 .../exec/store/parquet/ParquetGroupScan.java    |  55 +++++++++-
 .../org/apache/drill/TestPartitionFilter.java   |   3 +-
 .../impl/limit/TestLimitWithExchanges.java      |  43 ++++++--
 .../exec/store/TestAffinityCalculator.java      |   4 +-
 8 files changed, 236 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 1277ec4..b6b1a1e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -128,4 +128,25 @@ public abstract class AbstractGroupScan extends 
AbstractBase implements GroupSca
   public List<SchemaPath> getPartitionColumns() {
     return Lists.newArrayList();
   }
+
+  /**
+   * Default is not to support limit pushdown.
+   * @return
+   */
+  @Override
+  @JsonIgnore
+  public boolean supportsLimitPushdown() {
+    return false;
+  }
+
+  /**
+   * By default, return null to indicate rowcount based prune is not supported.
+   * Each groupscan subclass should override, if it supports rowcount based 
prune.
+   */
+  @Override
+  @JsonIgnore
+  public GroupScan applyLimit(long maxRecords) {
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 946c7e8..041f10a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -98,4 +98,16 @@ public interface GroupScan extends Scan, HasAffinity{
   @JsonIgnore
   public List<SchemaPath> getPartitionColumns();
 
+  /**
+   * Whether or not this GroupScan supports limit pushdown
+   */
+  public boolean supportsLimitPushdown();
+
+  /**
+   * Apply rowcount based prune for "LIMIT n" query.
+   * @param maxRecords : the number of rows requested from group scan.
+   * @return  a new instance of group scan if the prune is successful.
+   *          null when either if row-based prune is not supported, or if 
prune is not successful.
+   */
+  public GroupScan applyLimit(long maxRecords);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
new file mode 100644
index 0000000..9f762f0
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.logical;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.partition.PruneScanRule;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public abstract class DrillPushLimitToScanRule extends RelOptRule {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DrillPushLimitToScanRule.class);
+
+  private DrillPushLimitToScanRule(RelOptRuleOperand operand, String 
description) {
+    super(operand, description);
+  }
+
+  public static DrillPushLimitToScanRule LIMIT_ON_SCAN = new 
DrillPushLimitToScanRule(
+      RelOptHelper.some(DrillLimitRel.class, 
RelOptHelper.any(DrillScanRel.class)), "DrillPushLimitToScanRule_LimitOnScan") {
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      DrillScanRel scanRel = call.rel(1);
+      return scanRel.getGroupScan().supportsLimitPushdown(); // For now only 
applies to Parquet.
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        DrillLimitRel limitRel = call.rel(0);
+        DrillScanRel scanRel = call.rel(1);
+        doOnMatch(call, limitRel, scanRel, null);
+    }
+  };
+
+  public static DrillPushLimitToScanRule LIMIT_ON_PROJECT = new 
DrillPushLimitToScanRule(
+      RelOptHelper.some(DrillLimitRel.class, 
RelOptHelper.some(DrillProjectRel.class, 
RelOptHelper.any(DrillScanRel.class))), 
"DrillPushLimitToScanRule_LimitOnProject") {
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      DrillScanRel scanRel = call.rel(2);
+      return scanRel.getGroupScan().supportsLimitPushdown(); // For now only 
applies to Parquet.
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      DrillLimitRel limitRel = call.rel(0);
+      DrillProjectRel projectRel = call.rel(1);
+      DrillScanRel scanRel = call.rel(2);
+      doOnMatch(call, limitRel, scanRel, projectRel);
+    }
+  };
+
+
+  protected void doOnMatch(RelOptRuleCall call, DrillLimitRel limitRel, 
DrillScanRel scanRel, DrillProjectRel projectRel){
+    try {
+      final int rowCountRequested = (int) limitRel.getRows();
+
+      final GroupScan newGroupScan = 
scanRel.getGroupScan().applyLimit(rowCountRequested);
+
+      if (newGroupScan == null ) {
+        return;
+      }
+
+      DrillScanRel newScanRel = new DrillScanRel(scanRel.getCluster(),
+          scanRel.getTraitSet(),
+          scanRel.getTable(),
+          newGroupScan,
+          scanRel.getRowType(),
+          scanRel.getColumns(),
+          scanRel.partitionFilterPushdown());
+
+      final RelNode newLimit;
+      if (projectRel != null) {
+        final RelNode newProject = projectRel.copy(projectRel.getTraitSet(), 
ImmutableList.of((RelNode)newScanRel));
+        newLimit = limitRel.copy(limitRel.getTraitSet(), 
ImmutableList.of((RelNode)newProject));
+      } else {
+        newLimit = limitRel.copy(limitRel.getTraitSet(), 
ImmutableList.of((RelNode)newScanRel));
+      }
+
+      call.transformTo(newLimit);
+      logger.debug("Converted to a new DrillScanRel" + 
newScanRel.getGroupScan());
+    }  catch (Exception e) {
+      logger.warn("Exception while using the pruned partitions.", e);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index d9609d2..230cee2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -207,7 +207,9 @@ public class DrillRuleSets {
             PruneScanRule.getDirFilterOnProject(optimizerRulesContext),
             PruneScanRule.getDirFilterOnScan(optimizerRulesContext),
             
ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext),
-            ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext)
+            ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext),
+            DrillPushLimitToScanRule.LIMIT_ON_SCAN,
+            DrillPushLimitToScanRule.LIMIT_ON_PROJECT
         )
         .build();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index dfe9084..30e0846 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -30,6 +30,7 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import org.apache.calcite.util.Pair;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
@@ -485,12 +486,14 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
     private EndpointByteMap byteMap;
     private int rowGroupIndex;
     private String root;
+    private long rowCount;  // rowCount = -1 indicates to include all rows.
 
     @JsonCreator
     public RowGroupInfo(@JsonProperty("path") String path, 
@JsonProperty("start") long start,
-        @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") 
int rowGroupIndex) {
+        @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") 
int rowGroupIndex, long rowCount) {
       super(path, start, length);
       this.rowGroupIndex = rowGroupIndex;
+      this.rowCount = rowCount;
     }
 
     public RowGroupReadEntry getRowGroupReadEntry() {
@@ -519,6 +522,11 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
     public void setEndpointByteMap(EndpointByteMap byteMap) {
       this.byteMap = byteMap;
     }
+
+    public long getRowCount() {
+      return rowCount;
+    }
+
   }
 
   private void init() throws IOException {
@@ -582,7 +590,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan 
{
       int rgIndex = 0;
       for (RowGroupMetadata rg : file.getRowGroups()) {
         RowGroupInfo rowGroupInfo =
-            new RowGroupInfo(file.getPath(), rg.getStart(), rg.getLength(), 
rgIndex);
+            new RowGroupInfo(file.getPath(), rg.getStart(), rg.getLength(), 
rgIndex, rg.getRowCount());
         EndpointByteMap endpointByteMap = new EndpointByteMapImpl();
         for (String host : rg.getHostAffinity().keySet()) {
           if (hostEndpointMap.containsKey(host)) {
@@ -791,6 +799,49 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
   }
 
   @Override
+  public boolean supportsLimitPushdown() {
+    return true;
+  }
+
+  @Override
+  public GroupScan applyLimit(long maxRecords) {
+    Preconditions.checkArgument(rowGroupInfos.size() >= 0);
+
+    maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 
row -> 1 rowGroup.
+    // further optimization : minimize # of files chosen, or the affinity of 
files chosen.
+    long count = 0;
+    int index = 0;
+    for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
+      if (count < maxRecords) {
+        count += rowGroupInfo.getRowCount();
+        index ++;
+      } else {
+        break;
+      }
+    }
+
+    Set<String> fileNames = Sets.newHashSet(); // HashSet keeps a fileName 
unique.
+    for (RowGroupInfo rowGroupInfo : rowGroupInfos.subList(0, index)) {
+      fileNames.add(rowGroupInfo.getPath());
+    }
+
+    if (fileNames.size() == fileSet.size() ) {
+      // There is no reduction of rowGroups. Return the original groupScan.
+      logger.debug("applyLimit() does not apply!");
+      return null;
+    }
+
+    try {
+      FileSelection newSelection = new FileSelection(null, 
Lists.newArrayList(fileNames), getSelectionRoot());
+      logger.debug("applyLimit() reduce parquet file # from {} to {}", 
fileSet.size(), fileNames.size());
+      return this.clone(newSelection);
+    } catch (IOException e) {
+      logger.warn("Could not apply rowcount based prune due to Exception : 
{}", e);
+      return null;
+    }
+  }
+
+  @Override
   @JsonIgnore
   public boolean canPushdownProjects(List<SchemaPath> columns) {
     return true;

http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
index e5d6603..a2d101e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
@@ -257,7 +257,8 @@ public class TestPartitionFilter extends PlanTestBase {
   public void testMainQueryFalseCondition() throws Exception {
     String root = 
FileUtils.getResourceAsFile("/multilevel/parquet").toURI().toString();
     String query = String.format("select * from (select dir0, o_custkey from 
dfs_test.`%s` where dir0='1994') t where 1 = 0", root);
-    testExcludeFilter(query, 4, "Filter", 0);
+    // the 1 = 0 becomes limit 0, which will require to read only one parquet 
file, in stead of 4 for year '1994'.
+    testExcludeFilter(query, 1, "Filter", 0);
   }
 
   @Test // see DRILL-2712

http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
index 789a536..18f181b 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java
@@ -27,6 +27,8 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 
 public class TestLimitWithExchanges extends BaseTestQuery {
+  final String WORKING_PATH = TestTools.getWorkingPath();
+  final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
 
   @Test
   public void testLimitWithExchanges() throws Exception{
@@ -36,23 +38,20 @@ public class TestLimitWithExchanges extends BaseTestQuery {
   @Test
   public void testPushLimitPastUnionExchange() throws Exception {
     // Push limit past through UnionExchange.
-    final String WORKING_PATH = TestTools.getWorkingPath();
-    final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
-
     try {
       test("alter session set `planner.slice_target` = 1");
       final String[] excludedPlan = {};
 
       // case 1. single table query.
-      final String sql = String.format("select * from 
dfs_test.`%s/tpchmulti/region` limit 1 offset 2", TEST_RES_PATH);
+      final String sql = String.format("select * from 
dfs_test.`%s/multilevel/json` limit 1 offset 2", TEST_RES_PATH);
       final String[] expectedPlan ={"(?s)Limit\\(offset=\\[2\\], 
fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[3\\]\\).*Scan"};
       testLimitHelper(sql, expectedPlan, excludedPlan, 1);
 
-      final String sql2 = String.format("select * from 
dfs_test.`%s/tpchmulti/region` limit 1 offset 0", TEST_RES_PATH);
+      final String sql2 = String.format("select * from 
dfs_test.`%s/multilevel/json` limit 1 offset 0", TEST_RES_PATH);
       final String[] expectedPlan2 = {"(?s)Limit\\(offset=\\[0\\], 
fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[1\\]\\).*Scan"};
       testLimitHelper(sql2, expectedPlan2, excludedPlan, 1);
 
-      final String sql3 = String.format("select * from 
dfs_test.`%s/tpchmulti/region` limit 1", TEST_RES_PATH);
+      final String sql3 = String.format("select * from 
dfs_test.`%s/multilevel/json` limit 1", TEST_RES_PATH);
       final String[] expectedPlan3 = 
{"(?s)Limit\\(fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[1\\]\\).*Scan"};
       testLimitHelper(sql3, expectedPlan3, excludedPlan, 1);
 
@@ -79,9 +78,6 @@ public class TestLimitWithExchanges extends BaseTestQuery {
   @Test
   public void testNegPushLimitPastUnionExchange() throws Exception {
     // Negative case: should not push limit past through UnionExchange.
-    final String WORKING_PATH = TestTools.getWorkingPath();
-    final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
-
     try {
       test("alter session set `planner.slice_target` = 1");
       final String[] expectedPlan ={};
@@ -100,6 +96,35 @@ public class TestLimitWithExchanges extends BaseTestQuery {
     }
   }
 
+  @Test
+  public void testLimitImpactExchange() throws Exception {
+    try {
+      test("alter session set `planner.slice_target` = 5" );
+
+      // nation has 3 files, total 25 rows.
+      // Given slice_target = 5, if # of rows to fetch is < 5 : do NOT insert 
Exchange, and the query should run in single fragment.
+      //                         if # of row to fetch is >= 5:  do insert 
exchange, and query should run in multiple fragments.
+      final String sql = String.format("select * from 
dfs_test.`%s/tpchmulti/nation` limit 2", TEST_RES_PATH);  // Test Limit_On_Scan 
rule.
+      final String sql2 = String.format("select n_nationkey + 1000 from 
dfs_test.`%s/tpchmulti/nation` limit 2", TEST_RES_PATH); // Test 
Limit_On_Project rule.
+      final String [] expectedPlan = {};
+      final String [] excludedPlan = {"UnionExchange"};
+
+      testLimitHelper(sql, expectedPlan, excludedPlan, 2);
+      testLimitHelper(sql2, expectedPlan, excludedPlan, 2);
+
+      final String sql3 = String.format("select * from 
dfs_test.`%s/tpchmulti/nation` limit 10", TEST_RES_PATH); // Test Limit_On_Scan 
rule.
+      final String sql4 = String.format("select n_nationkey + 1000 from 
dfs_test.`%s/tpchmulti/nation` limit 10", TEST_RES_PATH); // Test 
Limit_On_Project rule.
+
+      final String [] expectedPlan2 = {"UnionExchange"};
+      final String [] excludedPlan2 = {};
+
+      testLimitHelper(sql3, expectedPlan2, excludedPlan2, 10);
+      testLimitHelper(sql4, expectedPlan2, excludedPlan2, 10);
+    } finally {
+      test("alter session set `planner.slice_target` = " + 
ExecConstants.SLICE_TARGET_OPTION.getDefault().getValue());
+    }
+  }
+
   private void testLimitHelper(final String sql, final String[] expectedPlan, 
final String[] excludedPattern, int expectedRecordCount) throws Exception {
     // Validate the plan
     PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPattern);

http://git-wip-us.apache.org/repos/asf/drill/blob/0a2518d7/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java
index 0999218..dadb850 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestAffinityCalculator.java
@@ -72,7 +72,9 @@ public class TestAffinityCalculator extends ExecTest {
     rowGroups.clear();
 
     for (int i = 0; i < numberOfRowGroups; i++) {
-      rowGroups.add(new ParquetGroupScan.RowGroupInfo(path, 
(long)i*rowGroupSize, (long)rowGroupSize, i));
+      // buildRowGroups method seems not be used at all.  Pass -1 as rowCount.
+      // Maybe remove this method completely ?
+      rowGroups.add(new ParquetGroupScan.RowGroupInfo(path, 
(long)i*rowGroupSize, (long)rowGroupSize, i, -1));
     }
   }
 

Reply via email to