[GitHub] weijietong commented on issue #1334: DRILL-6385: Support JPPD feature
weijietong commented on issue #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#issuecomment-401686350 @amansinha100 The scan node's memory copy logic has removed. Thanks for the knowledge of `SelectionVectorPrelVisitor.addSelectionRemoverWhereNecessary`. I appreciate what you payed out. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199395144 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ## @@ -696,6 +780,18 @@ public void executeBuildPhase() throws SchemaChangeException { if ( cycleNum > 0 ) { read_right_HV_vector = (IntVector) buildBatch.getContainer().getLast(); } +//create runtime filter +if (cycleNum == 0 && enableRuntimeFilter) { + //create runtime filter and send out async + int condFieldIndex = 0; + for (BloomFilter bloomFilter : bloomFilters) { +for (int ind = 0; ind < currentRecordCount; ind++) { + long hashCode = hash64.hash64Code(ind, 0, condFieldIndex); + bloomFilter.insert(hashCode); Review comment: I will enhance this at another JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #1342: DRILL-6537:Limit the batch size for buffering operators based on how …
asfgit closed pull request #1342: DRILL-6537:Limit the batch size for buffering operators based on how … URL: https://github.com/apache/drill/pull/1342 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index bc16272ffb..49f149b37a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -85,6 +85,10 @@ private ExecConstants() { // need to produce very large batches that take up lot of memory. public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new RangeLongValidator(OUTPUT_BATCH_SIZE, 128, 512 * 1024 * 1024); + // Based on available memory, adjust output batch size for buffered operators by this factor. + public static final String OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR = "drill.exec.memory.operator.output_batch_size_avail_mem_factor"; + public static final DoubleValidator OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR = new RangeDoubleValidator(OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR, 0.01, 1.0); + // External Sort Boot configuration public static final String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 428a47ebf3..047c597051 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -886,9 +886,13 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, partitions = new HashPartition[0]; // get the output batch size from config. -int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); -batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right); -logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); +final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); +final double avail_mem_factor = (double) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR); +int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor))); +logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}", + configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize); + +batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index e6368f5aa5..a9c4742816 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -233,6 +233,7 @@ new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)), new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), + new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)), new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)), }; @@ -294,7 +295,7 @@ public SystemOptionManager(final DrillConfig bootConfig) { * Initializes this option manager. * * @return this option manager - * @throws IOException + * @throws Exception */ public SystemOptionManager init() throws Exception { options = provider.getOrCreateStore(config); diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 23d59d3f80..2e8c2e783a 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/dr
[GitHub] asfgit closed pull request #309: DRILL-4020: The not-equal operator returns incorrect results when used on the HBase row key
asfgit closed pull request #309: DRILL-4020: The not-equal operator returns incorrect results when used on the HBase row key URL: https://github.com/apache/drill/pull/309 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java index 8d2e8ffd7c..6e1efe512c 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java @@ -61,6 +61,7 @@ public HBaseScanSpec parseTree() { * remove it since its effect is also achieved through startRow and stopRow. */ if (parsedSpec.filter instanceof RowFilter && + ((RowFilter)parsedSpec.filter).getOperator() != CompareOp.NOT_EQUAL && ((RowFilter)parsedSpec.filter).getComparator() instanceof BinaryComparator) { parsedSpec.filter = null; } diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java index 0e14cb183e..c17b00ee62 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java @@ -44,6 +44,24 @@ public void testFilterPushDownRowKeyEqual() throws Exception { PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan, excludedPlan); } + @Test + public void testFilterPushDownRowKeyNotEqual() throws Exception { +setColumnWidths(new int[] {8, 38, 38}); +final String sql = "SELECT\n" ++ " *\n" ++ "FROM\n" ++ " hbase.`[TABLE_NAME]` tableName\n" ++ "WHERE\n" ++ " row_key <> 'b4'"; + +runHBaseSQLVerifyCount(sql, 7); + +final String[] expectedPlan = {".*startRow=, stopRow=, filter=RowFilter \\(NOT_EQUAL, b4\\).*"}; +final String[] excludedPlan ={}; +final String sqlHBase = canonizeHBaseSQL(sql); +PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan, excludedPlan); + } + @Test public void testFilterPushDownRowKeyEqualWithItem() throws Exception { setColumnWidths(new int[] {20, 30}); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #1352: DRILL-6548: IllegalStateException: Unexpected EMIT outcome received i…
asfgit closed pull request #1352: DRILL-6548: IllegalStateException: Unexpected EMIT outcome received i… URL: https://github.com/apache/drill/pull/1352 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index a8c6804fd8..4fc0d1596a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -174,6 +174,7 @@ public void buildSchema() throws SchemaChangeException { return; case NONE: state = BatchState.DONE; +return; case EMIT: throw new IllegalStateException("Unexpected EMIT outcome received in buildSchema phase"); default: diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java index 9358ff7457..04d06aacd5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java @@ -638,4 +638,20 @@ public void testRegularTopNWithEmptyDataSet() { assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA); assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE); } + + @Test + public void testRegularTopNWithEmptyDataSetAndNoneOutcome() { +inputContainer.add(emptyInputRowSet.container()); +inputOutcomes.add(RecordBatch.IterOutcome.NONE); + +final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, + inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema()); + +final TopN topNConfig = new TopN(null, + Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING, +RelFieldCollation.NullDirection.FIRST)), false, 4); +final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch); + +assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE); + } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #1350: DRILL-4580: Support for exporting storage plugin configurations
asfgit closed pull request #1350: DRILL-4580: Support for exporting storage plugin configurations URL: https://github.com/apache/drill/pull/1350 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java index ca108606ca..b6f839ba8f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java @@ -35,6 +35,7 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import javax.ws.rs.core.SecurityContext; import javax.xml.bind.annotation.XmlRootElement; @@ -134,6 +135,15 @@ public JsonResult enablePlugin(@PathParam("name") String name, @PathParam("val") } } + @GET + @Path("/storage/{name}/export") + @Produces(MediaType.APPLICATION_JSON) + public Response exportPlugin(@PathParam("name") String name) { +Response.ResponseBuilder response = Response.ok(getStoragePluginJSON(name)); +response.header("Content-Disposition", String.format("attachment;filename=\"%s.json\"", name)); +return response.build(); + } + @DELETE @Path("/storage/{name}.json") @Produces(MediaType.APPLICATION_JSON) diff --git a/exec/java-exec/src/main/resources/rest/storage/list.ftl b/exec/java-exec/src/main/resources/rest/storage/list.ftl index ca20063835..7dfcf2591a 100644 --- a/exec/java-exec/src/main/resources/rest/storage/list.ftl +++ b/exec/java-exec/src/main/resources/rest/storage/list.ftl @@ -38,6 +38,7 @@ Update Disable +Export diff --git a/exec/java-exec/src/main/resources/rest/storage/update.ftl b/exec/java-exec/src/main/resources/rest/storage/update.ftl index a15cc98652..a30b65ee7e 100644 --- a/exec/java-exec/src/main/resources/rest/storage/update.ftl +++ b/exec/java-exec/src/main/resources/rest/storage/update.ftl @@ -48,6 +48,7 @@ <#else> Enable + Export Delete This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #1349: DRILL-6554: Minor code improvements in parquet statistics handling
asfgit closed pull request #1349: DRILL-6554: Minor code improvements in parquet statistics handling URL: https://github.com/apache/drill/pull/1349 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java index 547dc06704..42e6e0b6a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java @@ -113,9 +113,9 @@ public boolean canDrop(RangeExprEvaluator evaluator) { * IS TRUE predicate. */ private static LogicalExpression createIsTruePredicate(LogicalExpression expr) { -return new ParquetIsPredicate(expr, +return new ParquetIsPredicate(expr, (exprStat, evaluator) -> //if max value is not true or if there are all nulls -> canDrop -(exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() || isAllNulls(exprStat, evaluator.getRowCount()) +isAllNulls(exprStat, evaluator.getRowCount()) || exprStat.hasNonNullValue() && !((BooleanStatistics) exprStat).getMax() ); } @@ -123,9 +123,9 @@ private static LogicalExpression createIsTruePredicate(LogicalExpression expr) { * IS FALSE predicate. */ private static LogicalExpression createIsFalsePredicate(LogicalExpression expr) { -return new ParquetIsPredicate(expr, +return new ParquetIsPredicate(expr, (exprStat, evaluator) -> //if min value is not false or if there are all nulls -> canDrop -(exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() || isAllNulls(exprStat, evaluator.getRowCount()) +isAllNulls(exprStat, evaluator.getRowCount()) || exprStat.hasNonNullValue() && ((BooleanStatistics) exprStat).getMin() ); } @@ -133,9 +133,9 @@ private static LogicalExpression createIsFalsePredicate(LogicalExpression expr) * IS NOT TRUE predicate. */ private static LogicalExpression createIsNotTruePredicate(LogicalExpression expr) { -return new ParquetIsPredicate(expr, +return new ParquetIsPredicate(expr, (exprStat, evaluator) -> //if min value is not false or if there are no nulls -> canDrop -(exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() && hasNoNulls(exprStat) +hasNoNulls(exprStat) && exprStat.hasNonNullValue() && ((BooleanStatistics) exprStat).getMin() ); } @@ -143,9 +143,9 @@ private static LogicalExpression createIsNotTruePredicate(LogicalExpression expr * IS NOT FALSE predicate. */ private static LogicalExpression createIsNotFalsePredicate(LogicalExpression expr) { -return new ParquetIsPredicate(expr, +return new ParquetIsPredicate(expr, (exprStat, evaluator) -> //if max value is not true or if there are no nulls -> canDrop -(exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() && hasNoNulls(exprStat) +hasNoNulls(exprStat) && exprStat.hasNonNullValue() && !((BooleanStatistics) exprStat).getMax() ); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java index f804a7b06f..de4df1f5b9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.expr.stat; +import org.apache.parquet.Preconditions; import org.apache.parquet.column.statistics.Statistics; /** @@ -28,7 +29,7 @@ private ParquetPredicatesHelper() { /** * @param stat statistics object - * @return true if the input stat object has valid statistics; false otherwise + * @return true if the input stat object has valid statistics; false otherwise */ static boolean isNullOrEmpty(Statistics stat) { return stat == null || stat.isEmpty(); @@ -39,22 +40,21 @@ static boolean isNullOrEmpty(Statistics stat) { * * @param stat parquet column statistics * @param rowCount number of rows in the parquet file - * @return True if all rows are null in the parquet file - * False if at least one row is not null. + * @return true if all rows are null in the parquet file and false otherwise */ static boolean isAllNulls(Statistics stat, long rowCount) { -return stat.isNumNullsSet() && stat.getNumNulls() == rowCount; +Preconditions.checkArgument(rowCount >= 0, String.format("negative rowCount %d is not valid", rowCount)); +return sta
[GitHub] asfgit closed pull request #1347: DRILL-6545: Projection Push down into Lateral Join operator.
asfgit closed pull request #1347: DRILL-6545: Projection Push down into Lateral Join operator. URL: https://github.com/apache/drill/pull/1347 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java index a12fed1267..55ede96282 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.AbstractJoinPop; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; @@ -34,6 +35,9 @@ public class LateralJoinPOP extends AbstractJoinPop { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinPOP.class); + @JsonProperty("excludedColumns") + private List excludedColumns; + @JsonProperty("unnestForLateralJoin") private UnnestPOP unnestForLateralJoin; @@ -41,19 +45,21 @@ public LateralJoinPOP( @JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right, - @JsonProperty("joinType") JoinRelType joinType) { + @JsonProperty("joinType") JoinRelType joinType, + @JsonProperty("excludedColumns") List excludedColumns) { super(left, right, joinType, null, null); Preconditions.checkArgument(joinType != JoinRelType.FULL, "Full outer join is currently not supported with Lateral Join"); Preconditions.checkArgument(joinType != JoinRelType.RIGHT, "Right join is currently not supported with Lateral Join"); +this.excludedColumns = excludedColumns; } @Override public PhysicalOperator getNewWithChildren(List children) { Preconditions.checkArgument(children.size() == 2, "Lateral join should have two physical operators"); -LateralJoinPOP newPOP = new LateralJoinPOP(children.get(0), children.get(1), joinType); +LateralJoinPOP newPOP = new LateralJoinPOP(children.get(0), children.get(1), joinType, this.excludedColumns); newPOP.unnestForLateralJoin = this.unnestForLateralJoin; return newPOP; } @@ -63,6 +69,11 @@ public UnnestPOP getUnnestForLateralJoin() { return this.unnestForLateralJoin; } + @JsonProperty("excludedColumns") + public List getExcludedColumns() { +return this.excludedColumns; + } + public void setUnnestForLateralJoin(UnnestPOP unnest) { this.unnestForLateralJoin = unnest; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java index c8bb2a4f56..519d5036e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java @@ -37,6 +37,8 @@ import org.apache.drill.exec.planner.logical.DrillJoinRule; import org.apache.drill.exec.planner.logical.DrillLimitRule; import org.apache.drill.exec.planner.logical.DrillMergeProjectRule; +import org.apache.drill.exec.planner.logical.DrillProjectLateralJoinTransposeRule; +import org.apache.drill.exec.planner.logical.DrillProjectPushIntoLateralJoinRule; import org.apache.drill.exec.planner.logical.DrillProjectRule; import org.apache.drill.exec.planner.logical.DrillPushFilterPastProjectRule; import org.apache.drill.exec.planner.logical.DrillPushLimitToScanRule; @@ -287,7 +289,8 @@ static RuleSet getDrillUserConfigurableLogicalRules(OptimizerRulesContext optimi // Due to infinite loop in planning (DRILL-3257/CALCITE-1271), temporarily use this rule in Hep planner // RuleInstance.FILTER_SET_OP_TRANSPOSE_RULE, DrillFilterAggregateTransposeRule.INSTANCE, - + DrillProjectLateralJoinTransposeRule.INSTANCE, + DrillProjectPushIntoLateralJoinRule.INSTANCE, RuleInstance.FILTER_MERGE_RULE, RuleInstance.FILTER_CORRELATE_RULE, RuleInstance.AGGREGATE_REMOVE_RULE, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java index a7bbbca927..28e5246b0e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java +++ b/exec/java-exec/src/main/java/org/apache
[GitHub] ppadma commented on issue #1324: DRILL-6310: limit batch size for hash aggregate
ppadma commented on issue #1324: DRILL-6310: limit batch size for hash aggregate URL: https://github.com/apache/drill/pull/1324#issuecomment-401675042 @vvysotskyi sorry about that. fixed the problem.updated the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199376103 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ## @@ -226,6 +244,96 @@ public IterOutcome next() { } } + private void applyRuntimeFilter() throws SchemaChangeException { +RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter(); +if (runtimeFilterWritable == null) { + return; +} +if (recordCount <= 0) { + return; +} +List bloomFilters = runtimeFilterWritable.unwrap(); +if (hash64 == null) { + ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(this, context); + try { +//generate hash helper +this.toFilterFields = runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList(); +List hashFieldExps = new ArrayList<>(); +List typedFieldIds = new ArrayList<>(); +for (String toFilterField : toFilterFields) { + SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN); + TypedFieldId typedFieldId = container.getValueVectorId(schemaPath); + this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]); + typedFieldIds.add(typedFieldId); + ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId); + hashFieldExps.add(toHashFieldExp); +} +hash64 = hashHelper.getHash64(hashFieldExps.toArray(new LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()])); + } catch (Exception e) { +throw UserException.internalError(e).build(logger); + } +} +selectionVector2.allocateNew(recordCount); +BitSet bitSet = new BitSet(recordCount); +for (int i = 0; i < toFilterFields.size(); i++) { + BloomFilter bloomFilter = bloomFilters.get(i); + String fieldName = toFilterFields.get(i); + computeBitSet(field2id.get(fieldName), bloomFilter, bitSet); +} +int svIndex = 0; +int tmpFilterRows = 0; +for (int i = 0; i < recordCount; i++) { + boolean contain = bitSet.get(i); + if (contain) { +selectionVector2.setIndex(svIndex, i); +svIndex++; + } else { +tmpFilterRows++; + } +} +selectionVector2.setRecordCount(svIndex); +if (tmpFilterRows > 0 && tmpFilterRows == recordCount) { + recordCount = 0; + selectionVector2.clear(); + logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows); + return; +} +if (tmpFilterRows > 0 && tmpFilterRows != recordCount ) { + totalFilterRows = totalFilterRows + tmpFilterRows; + recordCount = svIndex; + BatchSchema batchSchema = this.schema; + VectorContainer backUpContainer = new VectorContainer(this.oContext.getAllocator(), batchSchema); + int fieldCount = batchSchema.getFieldCount(); + for (int i = 0; i < fieldCount; i++) { +ValueVector from = this.getContainer().getValueVector(i).getValueVector(); +ValueVector to = backUpContainer.getValueVector(i).getValueVector(); +to.setInitialCapacity(svIndex); +for (int r = 0; r < svIndex; r++) { + to.copyEntry(r, from, selectionVector2.getIndex(r)); Review comment: Hi @weijietong , the physical plan operators are supposed to implement 2 APIs: `getSupportedEncodings()` API (see [1]) which indicates whether or not it accepts an SelectionVector2 (other options are SV4 and NONE) and the corresponding `getEncoding()` API. If an operator does not accept an SV2 or SV4, the planner will insert a SelectionVectorRemover just below that node which essentially does the copying of qualified rows. Note that the SVRemover is implemented by RemovingRecordBatch [2] which uses a `StraightCopier` whenever the child has NONE, so in that case it just does a simple transfer instead of copy. In the case of Filter, it can accept both NONE and SV2 because it is possible in some cases to have a filter on top of another filter (with some intermediate non-blocking operator). Let me know if this makes sense. The main reason I am proposing this is the Filter-Scan is a very common pattern and we would want to minimize the copy overhead as much as possible. Unfortunately, I am tied up with some other work otherwise I could make the changes on top of your branch to experiment. [1] https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java#L51 [2] https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java#L57 This is an automated message from the Apache Git Ser
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199374038 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java ## @@ -32,35 +33,53 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.drill.exec.physical.base.AbstractJoinPop; +import org.apache.drill.exec.work.filter.RuntimeFilterDef; + @JsonTypeName("hash-join") +@JsonIgnoreProperties(ignoreUnknown = true) Review comment: The primary reason is to backward compatible. PhysicalPlans without `runtimeFilterDef` property (maybe not support RuntimeFilter) can still work out. That is the `ignoreUnknown=true` usage. The `@JsonIgnore` is to not output the property content as the final json. That's not our target. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (DRILL-6061) Doc Request: Global Query List showing queries from all Drill foreman nodes
[ https://issues.apache.org/jira/browse/DRILL-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bridget Bevens resolved DRILL-6061. --- Resolution: Fixed I've added the following doc: https://drill.apache.org/docs/global-query-list/ Hopefully this resolves the issue. If not, please update this JIRA with feedback. Thanks, Bridget > Doc Request: Global Query List showing queries from all Drill foreman nodes > --- > > Key: DRILL-6061 > URL: https://issues.apache.org/jira/browse/DRILL-6061 > Project: Apache Drill > Issue Type: Task > Components: Documentation, Metadata, Web Server >Affects Versions: 1.11.0 > Environment: MapR 5.2 >Reporter: Hari Sekhon >Assignee: Bridget Bevens >Priority: Major > Labels: doc-impacting > Fix For: 1.14.0 > > > Documentation Request to improve doc around Global Query List to show all > queries executed across all Drill nodes in a cluster for better management > and auditing. > It wasn't obvious to be able to see all queries across all nodes in a Drill > cluster. The Web UI on any given Drill node only shows the queries > coordinated by that local node if acting as the foreman for the query, so if > using ZooKeeper or a Load Balancer to distribute queries via different Drill > nodes (eg. > [https://github.com/HariSekhon/nagios-plugins/tree/master/haproxy|https://github.com/HariSekhon/nagios-plugins/tree/master/haproxy]) > then the query list will be spread across lots of different nodes with no > global timeline of queries. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199369589 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ## @@ -226,6 +244,96 @@ public IterOutcome next() { } } + private void applyRuntimeFilter() throws SchemaChangeException { +RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter(); +if (runtimeFilterWritable == null) { + return; +} +if (recordCount <= 0) { + return; +} +List bloomFilters = runtimeFilterWritable.unwrap(); +if (hash64 == null) { + ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(this, context); + try { +//generate hash helper +this.toFilterFields = runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList(); +List hashFieldExps = new ArrayList<>(); +List typedFieldIds = new ArrayList<>(); +for (String toFilterField : toFilterFields) { + SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN); + TypedFieldId typedFieldId = container.getValueVectorId(schemaPath); + this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]); + typedFieldIds.add(typedFieldId); + ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId); + hashFieldExps.add(toHashFieldExp); +} +hash64 = hashHelper.getHash64(hashFieldExps.toArray(new LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()])); + } catch (Exception e) { +throw UserException.internalError(e).build(logger); + } +} +selectionVector2.allocateNew(recordCount); +BitSet bitSet = new BitSet(recordCount); +for (int i = 0; i < toFilterFields.size(); i++) { + BloomFilter bloomFilter = bloomFilters.get(i); + String fieldName = toFilterFields.get(i); + computeBitSet(field2id.get(fieldName), bloomFilter, bitSet); +} +int svIndex = 0; +int tmpFilterRows = 0; +for (int i = 0; i < recordCount; i++) { + boolean contain = bitSet.get(i); + if (contain) { +selectionVector2.setIndex(svIndex, i); +svIndex++; + } else { +tmpFilterRows++; + } +} +selectionVector2.setRecordCount(svIndex); +if (tmpFilterRows > 0 && tmpFilterRows == recordCount) { + recordCount = 0; + selectionVector2.clear(); + logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows); + return; +} +if (tmpFilterRows > 0 && tmpFilterRows != recordCount ) { + totalFilterRows = totalFilterRows + tmpFilterRows; + recordCount = svIndex; + BatchSchema batchSchema = this.schema; + VectorContainer backUpContainer = new VectorContainer(this.oContext.getAllocator(), batchSchema); + int fieldCount = batchSchema.getFieldCount(); + for (int i = 0; i < fieldCount; i++) { +ValueVector from = this.getContainer().getValueVector(i).getValueVector(); +ValueVector to = backUpContainer.getValueVector(i).getValueVector(); +to.setInitialCapacity(svIndex); +for (int r = 0; r < svIndex; r++) { + to.copyEntry(r, from, selectionVector2.getIndex(r)); Review comment: @aman thanks for other valuable reviews ,will soon update that. To this point, I initially tend to output the SV2. But to fellow reasons, I give up: * Not all the possible operators support the SelectionModel. If users’ rule pushed down the filter conditions,the output SV2 maybe not processed by the not supported operators(i.e. it’s not definitive to have a operator which supports the SelectionModel above the Scan node). * The BatchSchema’s SelectionModel also becomes a runtime var.This will also affect the upper filter node’s code-gen logic to dynamically generate fresh filter codes to the Scan’s SV2. I agree that the memory copy cost will be less if the above filter node can filter more rows over the Scan’s SV2.So what your opinion about this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aravi5 commented on issue #1272: DRILL-5977: Filter Pushdown in Drill-Kafka plugin
aravi5 commented on issue #1272: DRILL-5977: Filter Pushdown in Drill-Kafka plugin URL: https://github.com/apache/drill/pull/1272#issuecomment-401639836 @akumarb2010 - Thank you for reviewing the design and implementation, and providing valuable inputs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler
vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler URL: https://github.com/apache/drill/pull/1345#discussion_r199358015 ## File path: contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json ## @@ -2,8 +2,8 @@ "storage":{ kafka : { type:"kafka", - enabled: false, - kafkaConsumerProps: {"bootstrap.servers":"localhost:9092", "group.id" : "drill-consumer"} + kafkaConsumerProps: {"bootstrap.servers":"localhost:9092", "group.id" : "drill-consumer"}, + enabled: false Review comment: It looks like Hive plugin is the only one plugin with such order of properties. For all other plugins the enabled status appears in the end of config after deserializing. So I return enabled status for Hive plugin and leave for others. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler
vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler URL: https://github.com/apache/drill/pull/1345#discussion_r199359888 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; +import com.google.common.io.Resources; +import com.jasonclawson.jackson.dataformat.hocon.HoconFactory; +import org.apache.drill.common.config.CommonConstants; +import org.apache.drill.common.config.LogicalPlanPersistence; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.common.scanner.ClassPathScanner; +import org.apache.drill.exec.planner.logical.StoragePlugins; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.sys.PersistentStore; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; +import java.io.IOException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Drill plugins handler, which allows to update storage plugins configs from the + * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} conf file + * + * TODO: DRILL-6564: It can be improved with configs versioning and service of creating + * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} + */ +public class StoragePluginsHandlerService implements StoragePluginsHandler { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginsHandlerService.class); + + private final LogicalPlanPersistence hoconLogicalPlanPersistence; + + public StoragePluginsHandlerService(DrillbitContext context) { +hoconLogicalPlanPersistence = new LogicalPlanPersistence(context.getConfig(), context.getClasspathScan(), +new ObjectMapper(new HoconFactory())); + } + + @Override + public void loadPlugins(@NotNull PersistentStore persistentStore, + @Nullable StoragePlugins bootstrapPlugins) { +// if bootstrapPlugins is not null -- fresh Drill set up +StoragePlugins pluginsToBeWrittenToPersistentStore; + +StoragePlugins newPlugins = getNewStoragePlugins(); + +if (newPlugins != null) { + pluginsToBeWrittenToPersistentStore = new StoragePlugins(new HashMap<>()); + Optional.ofNullable(bootstrapPlugins) + .ifPresent(pluginsToBeWrittenToPersistentStore::putAll); + + for (Map.Entry newPlugin : newPlugins) { +String pluginName = newPlugin.getKey(); +StoragePluginConfig oldPluginConfig = Optional.ofNullable(bootstrapPlugins) +.map(plugins -> plugins.getConfig(pluginName)) +.orElse(persistentStore.get(pluginName)); +StoragePluginConfig updatedStatusPluginConfig = updatePluginStatus(oldPluginConfig, newPlugin.getValue()); +pluginsToBeWrittenToPersistentStore.put(pluginName, updatedStatusPluginConfig); + } +} else { + pluginsToBeWrittenToPersistentStore = bootstrapPlugins; +} + +// load pluginsToBeWrittenToPersistentStore to Persistent Store +Optional.ofNullable(pluginsToBeWrittenToPersistentStore) +.ifPresent(plugins -> plugins.forEach(plugin -> persistentStore.put(plugin.getKey(), plugin.getValue(; + } + + /** + * Helper method to identify the enabled status for new storage plugins config. If this status is absent in the updater + * file, the status is kept from the configs, which are going to be updated + * + * @param oldPluginConfig current storage plugin config from Persistent Store or bootstrap config file + * @param newPluginConfig new storage plugin config + * @return new storage plugin config with updated enabled status + */ + private StoragePluginConfig updatePluginStatus(@Nullable StoragePluginConfig oldPluginConfig, + @NotNull StoragePluginConfig newPluginConfig) { +if (!newPluginConfig.isE
[GitHub] vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler
vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler URL: https://github.com/apache/drill/pull/1345#discussion_r199359807 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandler.java ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.planner.logical.StoragePlugins; +import org.apache.drill.exec.store.sys.PersistentStore; + + +/** + * Storage plugins handler is an additional service for updating storage plugins configs from the file + */ +public interface StoragePluginsHandler { + + /** + * Update incoming storage plugins configs from persistence store if present, otherwise bootstrap plugins configs. + * One of the params should be null, second shouldn't + * + * @param persistentStore the last storage plugins configs from persistence store + * @param bootstrapPlugins bootstrap storage plugins, which are used in case of first Drill start up + * @return all storage plugins, which should be loaded into persistence store + */ + void loadPlugins(PersistentStore persistentStore, StoragePlugins bootstrapPlugins); Review comment: Let's say the current approach is method1 and the approach of transferring the all configs from PS to handler is method2. So for both of them the first stage is common: 1. Getting the iterator and check for any element - `pluginSystemTable.getAll().hasNext()`, which is used to detect whether any plugins are present in PStore or this is the first set-up. 2. Then for method 2 I need to extract every plugin configs and put it to the `StoragePlugins` - N calls to PStore. For method 1 I need pass just reference to PStore. 3. For method2 I need to update plugins and to determine only updated plugins configs to put them to PStore - <= N calls. Similar in method1 I need to get new plugins if exist and put them to PStore - <= N calls. _ The benefit in the second stage. I agree with you regarding registry responsibilities. But current approach is also good: anyway currently `StoragePluginsHandler` instance is a part of `StoragePluginRegistryImpl` and also loading configs is better from code understanding point of view than updating and returning different results for different modes (you can compare with changes from the first commit). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler
vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler URL: https://github.com/apache/drill/pull/1345#discussion_r199357677 ## File path: contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java ## @@ -52,18 +50,18 @@ public class HiveSchemaFactory implements SchemaFactory { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveSchemaFactory.class); - // MetaStoreClient created using process user credentials - private final DrillHiveMetaStoreClient processUserMetastoreClient; - // MetasStoreClient created using SchemaConfig credentials - private final LoadingCache metaStoreClientLoadingCache; + // MetaStoreClient created using process user credentials. Null if client can't be instantiated + private DrillHiveMetaStoreClient processUserMetastoreClient; Review comment: It may not be initialized in the constructor. It was done to avoid `null` value in Hive storage plugin update window. But I don't like this approach anymore. I have described the issue here: [DRILL-6412](https://issues.apache.org/jira/browse/DRILL-6412?focusedCommentId=16528944&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16528944). I think until it will be solved as mentioned in DRILL-6412, the workaround could be used,: `"hive.metastore.schema.verification": "false"` property in Hive `bootstrap-storage.json`. It allows instantiate Hive client properly as in earlier 1.2 version of Drill Hive client. Also this properties should be documented for configuring Hive Embedded Metastore: https://drill.apache.org/docs/hive-storage-plugin/#hive-embedded-metastore-configuration This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler
vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler URL: https://github.com/apache/drill/pull/1345#discussion_r199358139 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java ## @@ -17,22 +17,51 @@ */ package org.apache.drill.exec.store; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.drill.common.logical.StoragePluginConfig; import com.fasterxml.jackson.annotation.JsonTypeName; -@JsonTypeName("named") +@JsonTypeName(NamedStoragePluginConfig.NAME) public class NamedStoragePluginConfig extends StoragePluginConfig { - public String name; + + public static final String NAME = "named"; + + private final String name; + + public NamedStoragePluginConfig(@JsonProperty("name") String name) { Review comment: Missed that, thanks. Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vvysotskyi commented on issue #1324: DRILL-6310: limit batch size for hash aggregate
vvysotskyi commented on issue #1324: DRILL-6310: limit batch size for hash aggregate URL: https://github.com/apache/drill/pull/1324#issuecomment-401618980 Unit tests in travis build for this PR failed, @ppadma, could you please take a look at this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ppadma commented on issue #1342: DRILL-6537:Limit the batch size for buffering operators based on how …
ppadma commented on issue #1342: DRILL-6537:Limit the batch size for buffering operators based on how … URL: https://github.com/apache/drill/pull/1342#issuecomment-401618524 @vvysotskyi Done. I accidentally closed the PR. Reopened it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ppadma opened a new pull request #1342: DRILL-6537:Limit the batch size for buffering operators based on how …
ppadma opened a new pull request #1342: DRILL-6537:Limit the batch size for buffering operators based on how … URL: https://github.com/apache/drill/pull/1342 …much memory they get @Ben-Zvi please review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199338763 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java ## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + + +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.exec.proto.BitData; + +import java.util.ArrayList; +import java.util.List; + +public class RuntimeFilterWritable implements AutoCloseables.Closeable { Review comment: Pls add a javadoc for this class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199334516 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ## @@ -226,6 +244,96 @@ public IterOutcome next() { } } + private void applyRuntimeFilter() throws SchemaChangeException { +RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter(); +if (runtimeFilterWritable == null) { + return; +} +if (recordCount <= 0) { + return; +} +List bloomFilters = runtimeFilterWritable.unwrap(); +if (hash64 == null) { + ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(this, context); + try { +//generate hash helper +this.toFilterFields = runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList(); +List hashFieldExps = new ArrayList<>(); +List typedFieldIds = new ArrayList<>(); +for (String toFilterField : toFilterFields) { + SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN); + TypedFieldId typedFieldId = container.getValueVectorId(schemaPath); + this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]); + typedFieldIds.add(typedFieldId); + ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId); + hashFieldExps.add(toHashFieldExp); +} +hash64 = hashHelper.getHash64(hashFieldExps.toArray(new LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()])); + } catch (Exception e) { +throw UserException.internalError(e).build(logger); + } +} +selectionVector2.allocateNew(recordCount); +BitSet bitSet = new BitSet(recordCount); +for (int i = 0; i < toFilterFields.size(); i++) { + BloomFilter bloomFilter = bloomFilters.get(i); + String fieldName = toFilterFields.get(i); + computeBitSet(field2id.get(fieldName), bloomFilter, bitSet); +} +int svIndex = 0; +int tmpFilterRows = 0; +for (int i = 0; i < recordCount; i++) { + boolean contain = bitSet.get(i); + if (contain) { +selectionVector2.setIndex(svIndex, i); +svIndex++; + } else { +tmpFilterRows++; + } +} +selectionVector2.setRecordCount(svIndex); +if (tmpFilterRows > 0 && tmpFilterRows == recordCount) { + recordCount = 0; + selectionVector2.clear(); + logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows); + return; +} +if (tmpFilterRows > 0 && tmpFilterRows != recordCount ) { + totalFilterRows = totalFilterRows + tmpFilterRows; + recordCount = svIndex; + BatchSchema batchSchema = this.schema; + VectorContainer backUpContainer = new VectorContainer(this.oContext.getAllocator(), batchSchema); + int fieldCount = batchSchema.getFieldCount(); + for (int i = 0; i < fieldCount; i++) { +ValueVector from = this.getContainer().getValueVector(i).getValueVector(); +ValueVector to = backUpContainer.getValueVector(i).getValueVector(); +to.setInitialCapacity(svIndex); +for (int r = 0; r < svIndex; r++) { + to.copyEntry(r, from, selectionVector2.getIndex(r)); +} + } + this.container.exchange(backUpContainer); + backUpContainer.clear(); + selectionVector2.clear(); + logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows); + return; +} + } + + + private void computeBitSet(int fieldId, BloomFilter bloomFilter, BitSet bitSet) throws SchemaChangeException { +for (int rowIndex = 0; rowIndex < recordCount; rowIndex++) { + long hash = hash64.hash64Code(rowIndex, 0, fieldId); + boolean contain = bloomFilter.find(hash); + if (contain) { +bitSet.set(rowIndex, true); +bitSet.set(rowIndex); Review comment: Why call set() second time ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199336580 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ## @@ -491,6 +508,50 @@ private void setupHashTable() throws SchemaChangeException { // Create the chained hash table baseHashTable = new ChainedHashTable(htConfig, context, allocator, buildBatch, probeBatch, null); +if (enableRuntimeFilter) { + setupHash64(htConfig); +} + } + + private void setupHash64(HashTableConfig htConfig) throws SchemaChangeException { +LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig.getKeyExprsBuild().size()]; +ErrorCollector collector = new ErrorCollectorImpl(); +int i = 0; +for (NamedExpression ne : htConfig.getKeyExprsBuild()) { + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, context.getFunctionRegistry()); + if (collector.hasErrors()) { +throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + if (expr == null) { +continue; + } + keyExprsBuild[i] = expr; + i++; +} +i = 0; +boolean meetNotExistField = false; +TypedFieldId[] buildSideTypeFieldIds = new TypedFieldId[keyExprsBuild.length]; +for (NamedExpression ne : htConfig.getKeyExprsBuild()) { + SchemaPath schemaPath = (SchemaPath) ne.getExpr(); + TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath); + if (typedFieldId == null) { +meetNotExistField = true; +continue; + } + buildSideTypeFieldIds[i] = typedFieldId; + i++; +} +if (meetNotExistField) { + logger.info("as some build side key fileds not found, runtime filter was disabled"); Review comment: mis-spelled 'fileds' => 'fields'. Pls use upper-case to begin the message: 'As some build ...' This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199338879 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java ## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + + +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.exec.proto.BitData; + +import java.util.ArrayList; +import java.util.List; + +public class RuntimeFilterWritable implements AutoCloseables.Closeable { + + private BitData.RuntimeFilterBDef runtimeFilterBDef; + + private DrillBuf[] data; + + public RuntimeFilterWritable() { + + } + + + public BitData.RuntimeFilterBDef getRuntimeFilterBDef() { +return runtimeFilterBDef; + } + + public void setRuntimeFilterBDef(BitData.RuntimeFilterBDef runtimeFilterBDef) { +this.runtimeFilterBDef = runtimeFilterBDef; + } + + public DrillBuf[] getData() { +return data; + } + + public void setData(DrillBuf... data) { +this.data = data; + } + + + public List unwrap() { +List sizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList(); +List bloomFilters = new ArrayList<>(sizeInBytes.size()); +for (int i = 0; i < sizeInBytes.size(); i++) { + DrillBuf byteBuf = data[i]; + int offset = 0; + int size = sizeInBytes.get(i); + DrillBuf bloomFilterContent = byteBuf.slice(offset, size); + BloomFilter bloomFilter = new BloomFilter(bloomFilterContent); + bloomFilters.add(bloomFilter); +} +return bloomFilters; + } + + public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { +List thisFilters = this.unwrap(); +List otherFilters = runtimeFilterWritable.unwrap(); +for (int i = 0; i < thisFilters.size(); i++) { + BloomFilter thisOne = thisFilters.get(0); Review comment: Shouldn't this be get(i), not get(0) ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199336665 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,735 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Store; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashAggregate; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.parquet.ParquetRGFilterEvaluator; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * which is JPDD (join predicate push down) possible. The prerequisite to do JPDD Review comment: I think you mean 'JPPD', not JPDD. Also change it to '...for which JPPD is possble' This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199334349 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java ## @@ -32,35 +33,53 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.drill.exec.physical.base.AbstractJoinPop; +import org.apache.drill.exec.work.filter.RuntimeFilterDef; + @JsonTypeName("hash-join") +@JsonIgnoreProperties(ignoreUnknown = true) Review comment: Instead of ignoring the property at global class level, can you ignore specific one using `@JsonIgnore` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199336779 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,735 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Store; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashAggregate; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.parquet.ParquetRGFilterEvaluator; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * which is JPDD (join predicate push down) possible. The prerequisite to do JPDD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocked operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + + private Map> joinMjId2probdeScanEps = new HashMap<>(); + + private Map joinMjId2scanSize = new ConcurrentHashMap<>(); + + + private Map joinMjId2ScanMjId = new HashMap<>(); + + private RuntimeFilterWritable aggregatedRuntimeFilter; + + private DrillbitContext drillbitContext; + + private QueryContext queryContext; + + private SendingAccountor sendingAccountor = new SendingAccountor(); + + private String lineSeparator; + + + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + + /** + * Here we leverage the root Wrapper to do the traverse which indirectly Review comment: 'traverse' => 'traversal'. Also, the comment should be re-phrased to something like: 'This class maintains context for the run-time join pushdown's filter management. It does a traversal of the physical operators by leveraging the root wrapper which indirectly holds the global PhysicalOperator tree and contains the minor fragment endpoints. ' This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment.
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199334575 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java ## @@ -144,7 +144,7 @@ public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws C // Uncomment out this line to debug the generated code. // This code is called from generated code, so to step into this code, // persist the code generated in HashAggBatch also. -// top.saveCodeForDebugging(true); +//top.saveCodeForDebugging(true); Review comment: Pls keep original spacing (would be good if you maintain that code convention for comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199338885 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java ## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + + +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.exec.proto.BitData; + +import java.util.ArrayList; +import java.util.List; + +public class RuntimeFilterWritable implements AutoCloseables.Closeable { + + private BitData.RuntimeFilterBDef runtimeFilterBDef; + + private DrillBuf[] data; + + public RuntimeFilterWritable() { + + } + + + public BitData.RuntimeFilterBDef getRuntimeFilterBDef() { +return runtimeFilterBDef; + } + + public void setRuntimeFilterBDef(BitData.RuntimeFilterBDef runtimeFilterBDef) { +this.runtimeFilterBDef = runtimeFilterBDef; + } + + public DrillBuf[] getData() { +return data; + } + + public void setData(DrillBuf... data) { +this.data = data; + } + + + public List unwrap() { +List sizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList(); +List bloomFilters = new ArrayList<>(sizeInBytes.size()); +for (int i = 0; i < sizeInBytes.size(); i++) { + DrillBuf byteBuf = data[i]; + int offset = 0; + int size = sizeInBytes.get(i); + DrillBuf bloomFilterContent = byteBuf.slice(offset, size); + BloomFilter bloomFilter = new BloomFilter(bloomFilterContent); + bloomFilters.add(bloomFilter); +} +return bloomFilters; + } + + public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { +List thisFilters = this.unwrap(); +List otherFilters = runtimeFilterWritable.unwrap(); +for (int i = 0; i < thisFilters.size(); i++) { + BloomFilter thisOne = thisFilters.get(0); + BloomFilter otherOne = otherFilters.get(0); Review comment: Same as above.. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199337353 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ## @@ -491,6 +508,50 @@ private void setupHashTable() throws SchemaChangeException { // Create the chained hash table baseHashTable = new ChainedHashTable(htConfig, context, allocator, buildBatch, probeBatch, null); +if (enableRuntimeFilter) { + setupHash64(htConfig); +} + } + + private void setupHash64(HashTableConfig htConfig) throws SchemaChangeException { +LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig.getKeyExprsBuild().size()]; +ErrorCollector collector = new ErrorCollectorImpl(); +int i = 0; +for (NamedExpression ne : htConfig.getKeyExprsBuild()) { + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, context.getFunctionRegistry()); + if (collector.hasErrors()) { +throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + if (expr == null) { +continue; + } + keyExprsBuild[i] = expr; + i++; +} +i = 0; +boolean meetNotExistField = false; +TypedFieldId[] buildSideTypeFieldIds = new TypedFieldId[keyExprsBuild.length]; +for (NamedExpression ne : htConfig.getKeyExprsBuild()) { + SchemaPath schemaPath = (SchemaPath) ne.getExpr(); + TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath); + if (typedFieldId == null) { +meetNotExistField = true; +continue; Review comment: why not break from the loop if you encountered this condition since you are going to disable runtime filter and return if even 1 field was missing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199329850 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ValueVectorHashHelper.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.fn.impl; + +import com.sun.codemodel.JBlock; +import com.sun.codemodel.JExpr; +import com.sun.codemodel.JFieldRef; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.compile.TemplateClassDefinition; +import org.apache.drill.exec.compile.sig.GeneratorMapping; +import org.apache.drill.exec.compile.sig.MappingSet; +import org.apache.drill.exec.compile.sig.RuntimeOverridden; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.ExpressionTreeMaterializer; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.planner.physical.HashPrelUtil; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorAccessible; + +import javax.inject.Named; +import java.io.IOException; + +public class ValueVectorHashHelper { + + private RecordBatch recordBatch; + + private FragmentContext context; + + private TemplateClassDefinition TEMPLATE_DEFINITION = new TemplateClassDefinition(Hash64.class, Hash64Template.class); + + private static final GeneratorMapping DO_SETUP_CONSTANT = GeneratorMapping.create("doSetup" /* setup method */, "doSetup" /* eval method */, null /* reset */, null /* cleanup */); + + private static final GeneratorMapping GET_HASH_BUILD_INNERE = GeneratorMapping.create("doSetup" /* setup method */, "hash64Code" /* eval method */, null /* reset */, null /* cleanup */); Review comment: 'INNER' instead of INNERE This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199336591 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ## @@ -491,6 +508,50 @@ private void setupHashTable() throws SchemaChangeException { // Create the chained hash table baseHashTable = new ChainedHashTable(htConfig, context, allocator, buildBatch, probeBatch, null); +if (enableRuntimeFilter) { + setupHash64(htConfig); +} + } + + private void setupHash64(HashTableConfig htConfig) throws SchemaChangeException { +LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig.getKeyExprsBuild().size()]; +ErrorCollector collector = new ErrorCollectorImpl(); +int i = 0; +for (NamedExpression ne : htConfig.getKeyExprsBuild()) { + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, context.getFunctionRegistry()); + if (collector.hasErrors()) { +throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + if (expr == null) { +continue; + } + keyExprsBuild[i] = expr; + i++; +} +i = 0; +boolean meetNotExistField = false; +TypedFieldId[] buildSideTypeFieldIds = new TypedFieldId[keyExprsBuild.length]; +for (NamedExpression ne : htConfig.getKeyExprsBuild()) { + SchemaPath schemaPath = (SchemaPath) ne.getExpr(); + TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath); + if (typedFieldId == null) { +meetNotExistField = true; +continue; + } + buildSideTypeFieldIds[i] = typedFieldId; + i++; +} +if (meetNotExistField) { + logger.info("as some build side key fileds not found, runtime filter was disabled"); + enableRuntimeFilter = false; + return; +} +ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch, context); +try { + hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds); +} catch (Exception e) { + throw new SchemaChangeException("fail to construct a field's hash64 dynamic codes", e); Review comment: 'fail' => 'failed' This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199354240 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ## @@ -226,6 +244,96 @@ public IterOutcome next() { } } + private void applyRuntimeFilter() throws SchemaChangeException { +RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter(); +if (runtimeFilterWritable == null) { + return; +} +if (recordCount <= 0) { + return; +} +List bloomFilters = runtimeFilterWritable.unwrap(); +if (hash64 == null) { + ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(this, context); + try { +//generate hash helper +this.toFilterFields = runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList(); +List hashFieldExps = new ArrayList<>(); +List typedFieldIds = new ArrayList<>(); +for (String toFilterField : toFilterFields) { + SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN); + TypedFieldId typedFieldId = container.getValueVectorId(schemaPath); + this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]); + typedFieldIds.add(typedFieldId); + ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId); + hashFieldExps.add(toHashFieldExp); +} +hash64 = hashHelper.getHash64(hashFieldExps.toArray(new LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new TypedFieldId[typedFieldIds.size()])); + } catch (Exception e) { +throw UserException.internalError(e).build(logger); + } +} +selectionVector2.allocateNew(recordCount); +BitSet bitSet = new BitSet(recordCount); +for (int i = 0; i < toFilterFields.size(); i++) { + BloomFilter bloomFilter = bloomFilters.get(i); + String fieldName = toFilterFields.get(i); + computeBitSet(field2id.get(fieldName), bloomFilter, bitSet); +} +int svIndex = 0; +int tmpFilterRows = 0; +for (int i = 0; i < recordCount; i++) { + boolean contain = bitSet.get(i); + if (contain) { +selectionVector2.setIndex(svIndex, i); +svIndex++; + } else { +tmpFilterRows++; + } +} +selectionVector2.setRecordCount(svIndex); +if (tmpFilterRows > 0 && tmpFilterRows == recordCount) { + recordCount = 0; + selectionVector2.clear(); + logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows); + return; +} +if (tmpFilterRows > 0 && tmpFilterRows != recordCount ) { + totalFilterRows = totalFilterRows + tmpFilterRows; + recordCount = svIndex; + BatchSchema batchSchema = this.schema; + VectorContainer backUpContainer = new VectorContainer(this.oContext.getAllocator(), batchSchema); + int fieldCount = batchSchema.getFieldCount(); + for (int i = 0; i < fieldCount; i++) { +ValueVector from = this.getContainer().getValueVector(i).getValueVector(); +ValueVector to = backUpContainer.getValueVector(i).getValueVector(); +to.setInitialCapacity(svIndex); +for (int r = 0; r < svIndex; r++) { + to.copyEntry(r, from, selectionVector2.getIndex(r)); Review comment: To summarize the way you have implemented this part: suppose the original ScanBatch contained 100 rows and 10 of them qualified the bloom filter, you create an SV2 of size 10, set the original qualifying row's index in the SV2, then for each ValueVector you copy the qualifying row's data into the backupContainer, followed by exchanging it with the current output container. A couple of thoughts about this: - Why not produce the SV2 in the output batch and let the downstream operator handle it ? Quite often there may be a Filter operator above the Scan which would be applying other filters (i.e not the run-time filters) and it can combine its own SV2 with the SV2 produced by the Scan. That way you avoid the extra copying and let Filter handle it (note that Filter does code-gen so it is more efficient way to handle bulk filtering). - There is clearly trade-offs with the extra copy approach : it depends on selectivity, i.e how many rows actually get eliminated by the run-time filter. I suppose you have mentioned this as a TODO depending on the NDV statistics ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199336633 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java ## @@ -125,6 +126,47 @@ public RexNode createCall(String funcName, List inputFields) { return func; } + /** + * Create hash expression based on the given input fields. + * + * @param inputExprs Expression list based on which the hash expression is constructed. + * @param helper Implementation of {@link HashExpressionCreatorHelper} + * which is used to create function expressions. + * @param hashAsDouble Whether to use the hash as double function or regular hash64 function. + * @param Input and output expression type. + * Currently it could be either {@link RexNode} or {@link LogicalExpression} + * @return + */ + public static T createHash64Expression( +List inputExprs, +T seed, +HashExpressionCreatorHelper helper, +boolean hashAsDouble) { + +assert inputExprs.size() > 0; Review comment: Use Preconditions.checkArgument() since asserts are not enabled by default. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199336934 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,735 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Store; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashAggregate; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.parquet.ParquetRGFilterEvaluator; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * which is JPDD (join predicate push down) possible. The prerequisite to do JPDD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocked operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + + private Map> joinMjId2probdeScanEps = new HashMap<>(); + + private Map joinMjId2scanSize = new ConcurrentHashMap<>(); + + + private Map joinMjId2ScanMjId = new HashMap<>(); + + private RuntimeFilterWritable aggregatedRuntimeFilter; + + private DrillbitContext drillbitContext; + + private QueryContext queryContext; + + private SendingAccountor sendingAccountor = new SendingAccountor(); + + private String lineSeparator; + + + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + + /** + * Here we leverage the root Wrapper to do the traverse which indirectly + * holds the whole global PhysicalOperator tree but also contains the endpoints + * of all the MinorFragments. + * + * @param workUnit + * @param queryContext + */ + public RuntimeFilterManager(QueryWorkUnit workUnit, QueryContext queryContext, DrillbitContext drillbitContext) { +this.rootWrapper = workUnit.getRootWrapper(); +this.queryContext = queryContext; +this.drillbitContext = drillbitContext; +lineSeparator = java.security.AccessController.doPrivileged(new sun.security.action.GetPropertyAction("line.separator")); + } + + /** + * Apply runtime filter
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199354604 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ## @@ -696,6 +780,18 @@ public void executeBuildPhase() throws SchemaChangeException { if ( cycleNum > 0 ) { read_right_HV_vector = (IntVector) buildBatch.getContainer().getLast(); } +//create runtime filter +if (cycleNum == 0 && enableRuntimeFilter) { + //create runtime filter and send out async + int condFieldIndex = 0; + for (BloomFilter bloomFilter : bloomFilters) { +for (int ind = 0; ind < currentRecordCount; ind++) { + long hashCode = hash64.hash64Code(ind, 0, condFieldIndex); + bloomFilter.insert(hashCode); Review comment: Currently, the hash join relies on memory calculations (both build and probe sides) to do accounting for spilling purposes. The bloom filter memory use should also be included in that calculation, although it would fine if you create an enhancement JIRA and address it separately. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199336769 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,735 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Store; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashAggregate; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.parquet.ParquetRGFilterEvaluator; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * which is JPDD (join predicate push down) possible. The prerequisite to do JPDD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocked operator like HashAgg + */ +public class RuntimeFilterManager { + + private Wrapper rootWrapper; + + private Map> joinMjId2probdeScanEps = new HashMap<>(); Review comment: Pls add a brief comment about what this data structure is intended for. Also for the Maps below. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199334413 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java ## @@ -86,8 +105,17 @@ public void setMaxAllocation(long maxAllocation) { */ @Override public boolean isBufferedOperator(QueryContext queryContext) { -// In case forced to use a single partition - do not consider this a buffered op (when memory is divided) -return queryContext == null || - 1 < (int)queryContext.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR) ; + // In case forced to use a single partition - do not consider this a buffered op (when memory is divided) + return queryContext==null|| Review comment: pls keep the original formatting (indentation and spaces etc.). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199336697 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java ## @@ -0,0 +1,735 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.physical.PhysicalPlan; + +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Store; +import org.apache.drill.exec.physical.config.BroadcastExchange; +import org.apache.drill.exec.physical.config.HashAggregate; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.parquet.ParquetRGFilterEvaluator; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class traverses the physical operator tree to find the HashJoin operator + * which is JPDD (join predicate push down) possible. The prerequisite to do JPDD + * is: + * 1. The join condition is equality + * 2. The physical join node is a HashJoin one + * 3. The probe side children of the HashJoin node should not contain a blocked operator like HashAgg Review comment: 'blocked operator' => 'blocking operator' This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature URL: https://github.com/apache/drill/pull/1334#discussion_r199337337 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ## @@ -491,6 +508,50 @@ private void setupHashTable() throws SchemaChangeException { // Create the chained hash table baseHashTable = new ChainedHashTable(htConfig, context, allocator, buildBatch, probeBatch, null); +if (enableRuntimeFilter) { + setupHash64(htConfig); +} + } + + private void setupHash64(HashTableConfig htConfig) throws SchemaChangeException { +LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig.getKeyExprsBuild().size()]; +ErrorCollector collector = new ErrorCollectorImpl(); +int i = 0; +for (NamedExpression ne : htConfig.getKeyExprsBuild()) { + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, context.getFunctionRegistry()); + if (collector.hasErrors()) { +throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + if (expr == null) { +continue; + } + keyExprsBuild[i] = expr; + i++; +} +i = 0; +boolean meetNotExistField = false; Review comment: Better name would be 'missingField' . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ppadma closed pull request #1342: DRILL-6537:Limit the batch size for buffering operators based on how …
ppadma closed pull request #1342: DRILL-6537:Limit the batch size for buffering operators based on how … URL: https://github.com/apache/drill/pull/1342 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami opened a new pull request #1356: DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules
sohami opened a new pull request #1356: DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules URL: https://github.com/apache/drill/pull/1356 This PR is dependent upon DRILL-6545 and has 2 commits from it for compilation. Once that is merged in this PR has to be rebased on master and then checked-in. @parthchandra - Please review it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] vvysotskyi commented on issue #1342: DRILL-6537:Limit the batch size for buffering operators based on how …
vvysotskyi commented on issue #1342: DRILL-6537:Limit the batch size for buffering operators based on how … URL: https://github.com/apache/drill/pull/1342#issuecomment-401614243 @ppadma, could you please resolve merge conflicts? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] arina-ielchiieva commented on a change in pull request #1331: DRILL-6519: Add String Distance and Phonetic Functions
arina-ielchiieva commented on a change in pull request #1331: DRILL-6519: Add String Distance and Phonetic Functions URL: https://github.com/apache/drill/pull/1331#discussion_r199343698 ## File path: exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestPhoneticFunctions.java ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.fn.impl; + +import org.apache.drill.categories.SqlFunctionTest; +import org.apache.drill.categories.UnlikelyTest; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.QueryResultSet; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; + +@Category({UnlikelyTest.class, SqlFunctionTest.class}) +public class TestPhoneticFunctions extends ClusterTest { + + private QueryResultSet result; + + @BeforeClass + public static void setup() throws Exception { +ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher); +startCluster(builder); + } + + @Test + public void testSoundex() throws Exception { +String result = queryBuilder() +.sql("select soundex('jaime') as soundex from (values(1))") +.singletonString(); +assertEquals("J500", result); + } + + @Test + public void testCaverphone1() throws Exception { +String result = queryBuilder() +.sql("SELECT caverphone1('jaime') as caverphone FROM (VALUES(1))") +.singletonString(); +assertEquals("YM", result); + } + + @Test + public void testCaverphone2() throws Exception { +String result = queryBuilder() +.sql("SELECT caverphone2('steve') as caverphone FROM (VALUES(1))") +.singletonString(); +assertEquals("STF111", result); + } + + @Test + public void testCologne() throws Exception { +String result = queryBuilder() +.sql("SELECT cologne_phonetic('steve') AS CP FROM (VALUES(1))") +.singletonString(); +assertEquals("823", result); + } + + @Test + public void testMatchRatingEncoder() throws Exception { +String result = queryBuilder() +.sql("SELECT match_rating_encoder('Boston') AS MR FROM (VALUES(1))") +.singletonString(); +assertEquals("BSTN", result); + } + + @Test + public void testNYSIIS() throws Exception { +String result = queryBuilder() +.sql("SELECT nysiis('Boston') AS ny FROM (VALUES(1))") +.singletonString(); +assertEquals("BASTAN", result); + } + + @Test + public void testRefinedSoundex() throws Exception { +String result = queryBuilder() +.sql("SELECT refined_soundex('Boston') AS rs FROM (VALUES(1))") +.singletonString(); +assertEquals("B103608", result); + } + + @Test + public void testSoundsLike() throws Exception { + int result = queryBuilder() Review comment: Sure, please squash. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services