[GitHub] [flink] lsyldliu commented on a diff in pull request #23282: [FlINK-32865][table-planner] Add ExecutionOrderEnforcer to exec plan and put it into BatchExecMultipleInput

2023-08-29 Thread via GitHub


lsyldliu commented on code in PR #23282:
URL: https://github.com/apache/flink/pull/23282#discussion_r1309575673


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java:
##
@@ -53,45 +52,41 @@ public class BatchExecTableSourceScan extends 
CommonExecTableSourceScan
 implements BatchExecNode {
 
 // Avoids creating different ids if translated multiple times
-private final String dynamicFilteringDataListenerID = 
UUID.randomUUID().toString();
+@Nullable private final String dynamicFilteringDataListenerID;

Review Comment:
   If we don't make this change, it also can work, so I think we can keep 
original code here, I don't recommend exposing the 
dynamicFilteringDataListenerID to the developer.



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java:
##
@@ -53,45 +52,41 @@ public class BatchExecTableSourceScan extends 
CommonExecTableSourceScan
 implements BatchExecNode {
 
 // Avoids creating different ids if translated multiple times
-private final String dynamicFilteringDataListenerID = 
UUID.randomUUID().toString();
+@Nullable private final String dynamicFilteringDataListenerID;
 
-private boolean needDynamicFilteringDependency;
+private final ReadableConfig tableConfig;
 
 // This constructor can be used only when table source scan has
 // BatchExecDynamicFilteringDataCollector input
 public BatchExecTableSourceScan(
 ReadableConfig tableConfig,
 DynamicTableSourceSpec tableSourceSpec,
-InputProperty inputProperty,
+List inputProperties,
 RowType outputType,
-String description) {
+String description,
+@Nullable String dynamicFilteringDataListenerID) {
 super(
 ExecNodeContext.newNodeId(),
 ExecNodeContext.newContext(BatchExecTableSourceScan.class),
 
ExecNodeContext.newPersistedConfig(BatchExecTableSourceScan.class, tableConfig),
 tableSourceSpec,
-Collections.singletonList(inputProperty),
+inputProperties,
 outputType,
 description);
+this.dynamicFilteringDataListenerID = dynamicFilteringDataListenerID;
+this.tableConfig = tableConfig;
 }
 
 public BatchExecTableSourceScan(
 ReadableConfig tableConfig,
 DynamicTableSourceSpec tableSourceSpec,
 RowType outputType,
 String description) {
-super(
-ExecNodeContext.newNodeId(),
-ExecNodeContext.newContext(BatchExecTableSourceScan.class),
-
ExecNodeContext.newPersistedConfig(BatchExecTableSourceScan.class, tableConfig),
-tableSourceSpec,
-Collections.emptyList(),
-outputType,
-description);
+this(tableConfig, tableSourceSpec, Collections.emptyList(), 
outputType, description, null);

Review Comment:
   I think the original constructor can work for our needs, just needs to do a 
small enforcement.
   ```
   public BatchExecTableSourceScan(
   ReadableConfig tableConfig,
   DynamicTableSourceSpec tableSourceSpec,
   RowType outputType,
   String description) {
   super(
   ExecNodeContext.newNodeId(),
   ExecNodeContext.newContext(BatchExecTableSourceScan.class),
   
ExecNodeContext.newPersistedConfig(BatchExecTableSourceScan.class, tableConfig),
   tableSourceSpec,
   Collections.emptyList(),
   outputType,
   description);
   this.tableConfig = tableConfig;
   }
   ```



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java:
##
@@ -177,4 +132,17 @@ public Transformation 
createInputFormatTransformation(
 new InputFormatSourceFunction<>(inputFormat, outputTypeInfo);
 return env.addSource(function, operatorName, 
outputTypeInfo).getTransformation();
 }
+
+public BatchExecTableSourceScan copyAndRemoveInputs() {
+BatchExecTableSourceScan tableSourceScan =

Review Comment:
   We can reuse the contructor `BatchExecTableSourceScan(
   ReadableConfig tableConfig,
   DynamicTableSourceSpec tableSourceSpec,
   RowType outputType,
   String description)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact 

[GitHub] [flink] lsyldliu commented on a diff in pull request #23282: [FlINK-32865][table-planner] Add ExecutionOrderEnforcer to exec plan and put it into BatchExecMultipleInput

2023-08-29 Thread via GitHub


lsyldliu commented on code in PR #23282:
URL: https://github.com/apache/flink/pull/23282#discussion_r1308505600


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java:
##
@@ -52,50 +55,91 @@ public class DynamicFilteringDependencyProcessor implements 
ExecNodeGraphProcess
 
 @Override
 public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext 
context) {
-ExecNodeGraph factSideProcessedGraph = 
checkIfFactSourceNeedEnforceDependency(execGraph);
+ExecNodeGraph factSideProcessedGraph =
+checkIfFactSourceNeedEnforceDependency(execGraph, context);
 return enforceDimSideBlockingExchange(factSideProcessedGraph, context);
 }
 
-private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(ExecNodeGraph 
execGraph) {
-Map>> 
dynamicFilteringScanDescendants =
+private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(
+ExecNodeGraph execGraph, ProcessorContext context) {
+Map> 
dynamicFilteringScanDescendants =
 new HashMap<>();
 
 AbstractExecNodeExactlyOnceVisitor dynamicFilteringScanCollector =
 new AbstractExecNodeExactlyOnceVisitor() {
 @Override
 protected void visitNode(ExecNode node) {
-node.getInputEdges().stream()
-.map(ExecEdge::getSource)
-.forEach(
-input -> {
-// The character of the dynamic 
filter scan is that it
-// has an input.
-if (input instanceof 
BatchExecTableSourceScan
-&& 
input.getInputEdges().size() > 0) {
-dynamicFilteringScanDescendants
-.computeIfAbsent(
-
(BatchExecTableSourceScan) input,
-ignored -> new 
ArrayList<>())
-.add(node);
-}
-});
+for (int i = 0; i < node.getInputEdges().size(); ++i) {
+ExecEdge edge = node.getInputEdges().get(i);
+ExecNode input = edge.getSource();
+
+// The character of the dynamic filter scan is 
that it
+// has an input.
+if (input instanceof BatchExecTableSourceScan
+&& input.getInputEdges().size() > 0) {
+dynamicFilteringScanDescendants
+.computeIfAbsent(
+(BatchExecTableSourceScan) 
input,
+ignored -> new ArrayList<>())
+.add(new DescendantInfo(node, i));
+}
+}
 
 visitInputs(node);
 }
 };
 execGraph.getRootNodes().forEach(node -> 
node.accept(dynamicFilteringScanCollector));
 
-for (Map.Entry>> entry :
+for (Map.Entry> entry :
 dynamicFilteringScanDescendants.entrySet()) {
-if (entry.getValue().size() == 1) {
-ExecNode next = entry.getValue().get(0);
-if (next instanceof BatchExecMultipleInput) {
-// the source can be chained with BatchExecMultipleInput
-continue;
-}
-}
-// otherwise we need dependencies
-entry.getKey().setNeedDynamicFilteringDependency(true);
+BatchExecTableSourceScan tableSourceScan = entry.getKey();
+BatchExecDynamicFilteringDataCollector 
dynamicFilteringDataCollector =
+getDynamicFilteringDataCollector(tableSourceScan);
+
+// Add exchange between collector and enforcer
+BatchExecExchange exchange =
+new BatchExecExchange(
+context.getPlanner().getTableConfig(),
+InputProperty.builder()
+
.requiredDistribution(InputProperty.ANY_DISTRIBUTION)
+
.damBehavior(InputProperty.DamBehavior.BLOCKING)
+.build(),
+(RowType) 
dynamicFilteringDataCollector.getOutputType(),
+"Exchange");
+

[GitHub] [flink] lsyldliu commented on a diff in pull request #23282: [FlINK-32865][table-planner] Add ExecutionOrderEnforcer to exec plan and put it into BatchExecMultipleInput

2023-08-28 Thread via GitHub


lsyldliu commented on code in PR #23282:
URL: https://github.com/apache/flink/pull/23282#discussion_r1308172368


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java:
##
@@ -52,50 +55,91 @@ public class DynamicFilteringDependencyProcessor implements 
ExecNodeGraphProcess
 
 @Override
 public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext 
context) {
-ExecNodeGraph factSideProcessedGraph = 
checkIfFactSourceNeedEnforceDependency(execGraph);
+ExecNodeGraph factSideProcessedGraph =
+checkIfFactSourceNeedEnforceDependency(execGraph, context);
 return enforceDimSideBlockingExchange(factSideProcessedGraph, context);
 }
 
-private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(ExecNodeGraph 
execGraph) {
-Map>> 
dynamicFilteringScanDescendants =
+private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(
+ExecNodeGraph execGraph, ProcessorContext context) {
+Map> 
dynamicFilteringScanDescendants =
 new HashMap<>();
 
 AbstractExecNodeExactlyOnceVisitor dynamicFilteringScanCollector =
 new AbstractExecNodeExactlyOnceVisitor() {
 @Override
 protected void visitNode(ExecNode node) {
-node.getInputEdges().stream()
-.map(ExecEdge::getSource)
-.forEach(
-input -> {
-// The character of the dynamic 
filter scan is that it
-// has an input.
-if (input instanceof 
BatchExecTableSourceScan
-&& 
input.getInputEdges().size() > 0) {
-dynamicFilteringScanDescendants
-.computeIfAbsent(
-
(BatchExecTableSourceScan) input,
-ignored -> new 
ArrayList<>())
-.add(node);
-}
-});
+for (int i = 0; i < node.getInputEdges().size(); ++i) {
+ExecEdge edge = node.getInputEdges().get(i);
+ExecNode input = edge.getSource();
+
+// The character of the dynamic filter scan is 
that it
+// has an input.
+if (input instanceof BatchExecTableSourceScan
+&& input.getInputEdges().size() > 0) {
+dynamicFilteringScanDescendants
+.computeIfAbsent(
+(BatchExecTableSourceScan) 
input,
+ignored -> new ArrayList<>())
+.add(new DescendantInfo(node, i));
+}
+}
 
 visitInputs(node);
 }
 };
 execGraph.getRootNodes().forEach(node -> 
node.accept(dynamicFilteringScanCollector));
 
-for (Map.Entry>> entry :
+for (Map.Entry> entry :
 dynamicFilteringScanDescendants.entrySet()) {
-if (entry.getValue().size() == 1) {
-ExecNode next = entry.getValue().get(0);
-if (next instanceof BatchExecMultipleInput) {
-// the source can be chained with BatchExecMultipleInput
-continue;
-}
-}
-// otherwise we need dependencies
-entry.getKey().setNeedDynamicFilteringDependency(true);
+BatchExecTableSourceScan tableSourceScan = entry.getKey();
+BatchExecDynamicFilteringDataCollector 
dynamicFilteringDataCollector =
+getDynamicFilteringDataCollector(tableSourceScan);
+
+// Add exchange between collector and enforcer
+BatchExecExchange exchange =
+new BatchExecExchange(
+context.getPlanner().getTableConfig(),
+InputProperty.builder()
+
.requiredDistribution(InputProperty.ANY_DISTRIBUTION)
+
.damBehavior(InputProperty.DamBehavior.BLOCKING)
+.build(),
+(RowType) 
dynamicFilteringDataCollector.getOutputType(),
+"Exchange");
+

[GitHub] [flink] lsyldliu commented on a diff in pull request #23282: [FlINK-32865][table-planner] Add ExecutionOrderEnforcer to exec plan and put it into BatchExecMultipleInput

2023-08-27 Thread via GitHub


lsyldliu commented on code in PR #23282:
URL: https://github.com/apache/flink/pull/23282#discussion_r1306878807


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/ExecutionOrderEnforcerFusionCodegenSpec.scala:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.fusion.spec
+
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
GeneratedExpression}
+import org.apache.flink.table.planner.plan.fusion.{OpFusionCodegenSpecBase, 
OpFusionContext}
+
+import java.util
+
+/** The operator fusion codegen spec for ExecutionOrderEnforcer. */
+class ExecutionOrderEnforcerFusionCodegenSpec(opCodegenCtx: 
CodeGeneratorContext)
+  extends OpFusionCodegenSpecBase(opCodegenCtx) {
+  private lazy val sourceInputId = 2
+
+  private var dynamicFilteringInputContext: OpFusionContext = _
+  private var sourceInputContext: OpFusionContext = _
+
+  override def setup(opFusionContext: OpFusionContext): Unit = {
+super.setup(opFusionContext)
+val inputContexts = fusionContext.getInputFusionContexts
+assert(inputContexts.size == 2)
+dynamicFilteringInputContext = inputContexts.get(0)
+sourceInputContext = inputContexts.get(1)
+  }
+
+  override def variablePrefix(): String = "orderEnforcer"
+
+  override def doProcessProduce(codegenCtx: CodeGeneratorContext): Unit = {
+dynamicFilteringInputContext.processProduce(codegenCtx)
+sourceInputContext.processProduce(codegenCtx)
+  }
+
+  override def doEndInputProduce(codegenCtx: CodeGeneratorContext): Unit = {
+dynamicFilteringInputContext.endInputProduce(codegenCtx)
+sourceInputContext.endInputProduce(codegenCtx)
+  }
+
+  override def doProcessConsume(
+  inputId: Int,
+  inputVars: util.List[GeneratedExpression],
+  row: GeneratedExpression): String = {
+if (inputId == sourceInputId) {
+  s"""
+ |// call downstream to consume the row
+ |${row.code}
+ |${fusionContext.processConsume(inputVars)}

Review Comment:
   ```suggestion
|${fusionContext.processConsume(null, ${row.resultTerm})}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org