This is an automated email from the ASF dual-hosted git repository. amansinha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit f3eef38fd633be28f961139b371f524f5172914f Author: Hanumath Rao Maduri <hmad...@maprtech.com> AuthorDate: Sat Sep 30 13:26:40 2017 -0700 DRILL-6882: Handle the cases where RowKeyJoin's left pipeline being called multiple times. close apache/drill#1562 --- .../exec/physical/config/IteratorValidator.java | 14 +++++++-- .../exec/physical/impl/join/RowKeyJoinBatch.java | 5 ++++ .../validate/IteratorValidatorBatchIterator.java | 20 +++++++++---- .../impl/validate/IteratorValidatorCreator.java | 2 +- .../impl/validate/IteratorValidatorInjector.java | 33 +++++++++++++++++++--- 5 files changed, 62 insertions(+), 12 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java index 9fbef97..4f73b00 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java @@ -23,10 +23,20 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor; public class IteratorValidator extends AbstractSingle{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidator.class); + /* isRepeatable flag will be set to true if this validator is created by a Repeatable pipeline. + * In a repeatable pipeline some state transitions are valid i.e downstream operator + * can call the upstream operator even after receiving NONE. + */ + public final boolean isRepeatable; - public IteratorValidator(PhysicalOperator child) { + public IteratorValidator(PhysicalOperator child, boolean repeatable) { super(child); setCost(child.getCost()); + this.isRepeatable = repeatable; + } + + public IteratorValidator(PhysicalOperator child) { + this(child, false); } @Override @@ -36,7 +46,7 @@ public class IteratorValidator extends AbstractSingle{ @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new IteratorValidator(child); + return new IteratorValidator(child, isRepeatable); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java index 941f321..2910da5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java @@ -132,6 +132,11 @@ public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implemen return IterOutcome.OK; } + if (rightUpstream == IterOutcome.NONE) { + rkJoinState = RowKeyJoinState.DONE; + state = BatchState.DONE; + return rightUpstream; + } rightUpstream = next(right); logger.debug("right input IterOutcome: {}", rightUpstream); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 1ea3895..5c70f5d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -47,6 +47,9 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch { /** For logging/debuggability only. */ private static volatile int instanceCount; + /** @see org.apache.drill.exec.physical.config.IteratorValidator */ + private final boolean isRepeatable; + /** For logging/debuggability only. */ private final int instNum; { @@ -102,12 +105,17 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch { */ private boolean validateBatches; - public IteratorValidatorBatchIterator(RecordBatch incoming) { + public IteratorValidatorBatchIterator(RecordBatch incoming, boolean isRepeatable) { this.incoming = incoming; batchTypeName = incoming.getClass().getSimpleName(); + this.isRepeatable = isRepeatable; // (Log construction and close() at same level to bracket instance's activity.) - logger.trace( "[#{}; on {}]: Being constructed.", instNum, batchTypeName); + logger.trace( "[#{}; on {}; repeatable: {}]: Being constructed.", instNum, batchTypeName, isRepeatable); + } + + public IteratorValidatorBatchIterator(RecordBatch incoming) { + this(incoming, false); } @@ -217,7 +225,7 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch { instNum, batchTypeName, exceptionState, batchState)); } // (Note: This could use validationState.) - if (batchState == NONE || batchState == STOP) { + if ((!isRepeatable && batchState == NONE) || batchState == STOP) { throw new IllegalStateException( String.format( "next() [on #%d, %s] called again after it returned %s." @@ -256,8 +264,10 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch { case NONE: // NONE is allowed even without seeing a OK_NEW_SCHEMA. Such NONE is called // FAST NONE. - // NONE moves to terminal high-level state. - validationState = ValidationState.TERMINAL; + // NONE moves to TERMINAL high-level state if NOT repeatable. + if (!isRepeatable) { + validationState = ValidationState.TERMINAL; + } break; case STOP: // STOP is allowed at any time, except if already terminated (checked diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java index 4dc58e5..b7be8ab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java @@ -37,7 +37,7 @@ public class IteratorValidatorCreator implements BatchCreator<IteratorValidator> throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); RecordBatch child = children.iterator().next(); - IteratorValidatorBatchIterator iter = new IteratorValidatorBatchIterator(child); + IteratorValidatorBatchIterator iter = new IteratorValidatorBatchIterator(child, config.isRepeatable); boolean validateBatches = context.getOptions().getOption(ExecConstants.ENABLE_VECTOR_VALIDATOR) || context.getConfig().getBoolean(ExecConstants.ENABLE_VECTOR_VALIDATION); iter.enableBatchValidation(validateBatches); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java index 20eba16..6d86fb3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.IteratorValidator; +import org.apache.drill.exec.physical.config.RowKeyJoinPOP; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -32,6 +33,17 @@ public class IteratorValidatorInjector extends AbstractPhysicalVisitor<PhysicalOperator, FragmentContext, ExecutionSetupException> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorInjector.class); + /* This flag when set creates all the validators as repeatable validators */ + private final boolean isRepeatablePipeline; + + public IteratorValidatorInjector() { + this(false); + } + + public IteratorValidatorInjector(boolean repeatablePipeline) { + this.isRepeatablePipeline = repeatablePipeline; + } + public static FragmentRoot rewritePlanWithIteratorValidator(FragmentContext context, FragmentRoot root) throws ExecutionSetupException { IteratorValidatorInjector inject = new IteratorValidatorInjector(); PhysicalOperator newOp = root.accept(inject, context); @@ -60,11 +72,24 @@ public class IteratorValidatorInjector extends List<PhysicalOperator> newChildren = Lists.newArrayList(); PhysicalOperator newOp = op; + if (op instanceof RowKeyJoinPOP) { + /* create a RepeatablePipeline for the left side of RowKeyJoin */ + PhysicalOperator left = new IteratorValidator(((RowKeyJoinPOP) op).getLeft() + .accept(new IteratorValidatorInjector(true), context), true); + left.setOperatorId(op.getOperatorId() + 1000); + newChildren.add(left); + /* right pipeline is not repeatable pipeline */ + PhysicalOperator right = new IteratorValidator(((RowKeyJoinPOP) op).getRight() + .accept(this, context)); + right.setOperatorId(op.getOperatorId() + 1000); + newChildren.add(right); + } else { /* Get the list of child operators */ - for (PhysicalOperator child : op) { - PhysicalOperator validator = new IteratorValidator(child.accept(this, context)); - validator.setOperatorId(op.getOperatorId() + 1000); - newChildren.add(validator); + for (PhysicalOperator child : op) { + PhysicalOperator validator = new IteratorValidator(child.accept(this, context), this.isRepeatablePipeline); + validator.setOperatorId(op.getOperatorId() + 1000); + newChildren.add(validator); + } } /* Inject trace operator */