[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-09-02 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r482286576



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput implements 
WatermarkGaugeExposingOutput> {

Review comment:
   Great, thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-09-02 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r482281944



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##
@@ -90,16 +91,19 @@
 
private final RecordWriterOutput[] streamOutputs;
 
-   private final WatermarkGaugeExposingOutput> 
chainEntryPoint;
+   private final WatermarkGaugeExposingOutput> 
mainOperatorOutput;
 
/**
 * For iteration, {@link StreamIterationHead} and {@link 
StreamIterationTail} used for executing
-* feedback edges do not contain any operators, in which case, {@code 
headOperatorWrapper} and
+* feedback edges do not contain any operators, in which case, {@code 
mainOperatorWrapper} and
 * {@code tailOperatorWrapper} are null.
 */
-   @Nullable private final StreamOperatorWrapper 
headOperatorWrapper;
+   @Nullable private final StreamOperatorWrapper 
mainOperatorWrapper;
+   @Nullable private final StreamOperatorWrapper 
firstOperatorWrapper;

Review comment:
   Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-09-01 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r481246654



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput implements 
WatermarkGaugeExposingOutput> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MultipleInputChainingOutput.class);
+
+   protected final Input input;
+   protected final Counter numRecordsIn;
+   protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
+   protected final StreamStatusProvider streamStatusProvider;
+   @Nullable protected final OutputTag outputTag;
+
+   public MultipleInputChainingOutput(
+   Input input,
+   OperatorMetricGroup operatorMetricGroup,
+   StreamStatusProvider streamStatusProvider,
+   @Nullable OutputTag outputTag) {
+   this.input = input;
+
+   {
+   Counter tmpNumRecordsIn;
+   try {
+   OperatorIOMetricGroup ioMetricGroup = 
operatorMetricGroup.getIOMetricGroup();
+   tmpNumRecordsIn = 
ioMetricGroup.getNumRecordsInCounter();
+   } catch (Exception e) {
+   LOG.warn("An exception occurred during the 
metrics setup.", e);
+   tmpNumRecordsIn = new SimpleCounter();
+   }
+   numRecordsIn = tmpNumRecordsIn;
+   }
+
+   this.streamStatusProvider = streamStatusProvider;
+   this.outputTag = outputTag;
+   }
+
+   @Override
+   public void collect(StreamRecord record) {
+   if (this.outputTag != null) {
+   // we are not responsible for emitting to the main 
output.
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
+   @Override
+   public  void collect(OutputTag outputTag, StreamRecord record) 
{
+   if (this.outputTag == null || 
!this.outputTag.equals(outputTag)) {

Review comment:
   > In Flink we have convention that everything is @NonNull 
   
   Why there are so many explicit @NonNull and null checks then :) 
   
   > Also, the null check is covered by org.apache.flink.util.OutputTag#equals 
smile_cat .
   
   It drops the record instead of throwing an exception.
   
   But ok, let's leave it as is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-08-28 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479150979



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##
@@ -208,19 +212,97 @@ public OperatorChain(
OperatorChain(
List> allOperatorWrappers,
RecordWriterOutput[] streamOutputs,
-   WatermarkGaugeExposingOutput> 
chainEntryPoint,
-   StreamOperatorWrapper headOperatorWrapper) {
+   WatermarkGaugeExposingOutput> 
mainOperatorOutput,
+   StreamOperatorWrapper mainOperatorWrapper) {
 
this.streamOutputs = checkNotNull(streamOutputs);
-   this.chainEntryPoint = checkNotNull(chainEntryPoint);
+   this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
this.operatorEventDispatcher = null;
 
checkState(allOperatorWrappers != null && 
allOperatorWrappers.size() > 0);
-   this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+   this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
this.tailOperatorWrapper = allOperatorWrappers.get(0);
this.numOperators = allOperatorWrappers.size();
+   this.chainedSources = Collections.emptyMap();
+
+   firstOperatorWrapper = 
linkOperatorWrappers(allOperatorWrappers);
+   }
+
+   private void createChainOutputs(
+   List outEdgesInOrder,
+   
RecordWriterDelegate>> 
recordWriterDelegate,
+   Map chainedConfigs,
+   StreamTask containingTask,
+   Map> streamOutputMap) 
{
+   for (int i = 0; i < outEdgesInOrder.size(); i++) {
+   StreamEdge outEdge = outEdgesInOrder.get(i);
+
+   RecordWriterOutput streamOutput = createStreamOutput(
+   recordWriterDelegate.getRecordWriter(i),
+   outEdge,
+   chainedConfigs.get(outEdge.getSourceId()),
+   containingTask.getEnvironment());
+
+   this.streamOutputs[i] = streamOutput;
+   streamOutputMap.put(outEdge, streamOutput);
+   }
+   }
+
+   private Map createChainedInputs(

Review comment:
   This method as well as `createChainedInputs` is full of compiler 
warnings too - can you fix them please?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-08-28 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479150979



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##
@@ -208,19 +212,97 @@ public OperatorChain(
OperatorChain(
List> allOperatorWrappers,
RecordWriterOutput[] streamOutputs,
-   WatermarkGaugeExposingOutput> 
chainEntryPoint,
-   StreamOperatorWrapper headOperatorWrapper) {
+   WatermarkGaugeExposingOutput> 
mainOperatorOutput,
+   StreamOperatorWrapper mainOperatorWrapper) {
 
this.streamOutputs = checkNotNull(streamOutputs);
-   this.chainEntryPoint = checkNotNull(chainEntryPoint);
+   this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
this.operatorEventDispatcher = null;
 
checkState(allOperatorWrappers != null && 
allOperatorWrappers.size() > 0);
-   this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+   this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
this.tailOperatorWrapper = allOperatorWrappers.get(0);
this.numOperators = allOperatorWrappers.size();
+   this.chainedSources = Collections.emptyMap();
+
+   firstOperatorWrapper = 
linkOperatorWrappers(allOperatorWrappers);
+   }
+
+   private void createChainOutputs(
+   List outEdgesInOrder,
+   
RecordWriterDelegate>> 
recordWriterDelegate,
+   Map chainedConfigs,
+   StreamTask containingTask,
+   Map> streamOutputMap) 
{
+   for (int i = 0; i < outEdgesInOrder.size(); i++) {
+   StreamEdge outEdge = outEdgesInOrder.get(i);
+
+   RecordWriterOutput streamOutput = createStreamOutput(
+   recordWriterDelegate.getRecordWriter(i),
+   outEdge,
+   chainedConfigs.get(outEdge.getSourceId()),
+   containingTask.getEnvironment());
+
+   this.streamOutputs[i] = streamOutput;
+   streamOutputMap.put(outEdge, streamOutput);
+   }
+   }
+
+   private Map createChainedInputs(

Review comment:
   This method as well as `createChainedInputs` is full of compiler 
warnings too - can you fix them?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-08-28 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479136421



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##
@@ -90,16 +91,19 @@
 
private final RecordWriterOutput[] streamOutputs;
 
-   private final WatermarkGaugeExposingOutput> 
chainEntryPoint;
+   private final WatermarkGaugeExposingOutput> 
mainOperatorOutput;
 
/**
 * For iteration, {@link StreamIterationHead} and {@link 
StreamIterationTail} used for executing
-* feedback edges do not contain any operators, in which case, {@code 
headOperatorWrapper} and
+* feedback edges do not contain any operators, in which case, {@code 
mainOperatorWrapper} and
 * {@code tailOperatorWrapper} are null.
 */
-   @Nullable private final StreamOperatorWrapper 
headOperatorWrapper;
+   @Nullable private final StreamOperatorWrapper 
mainOperatorWrapper;
+   @Nullable private final StreamOperatorWrapper 
firstOperatorWrapper;

Review comment:
   I'm confused :)
   
   Is it something like this:
   ```
   first
 \ main (multi-input) -> ... -> tail
 /
   second
   ```
   ?
   If yes, how would wrappers be linked? Will `firstOperatorWrapper.close` also 
close the second operator?
   
   nit: would be nice to have that kind of diagram in code too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-08-28 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479120389



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##
@@ -628,4 +621,61 @@ public String toString() {
 
return builder.toString();
}
+
+   /**
+* Interface representing chained inputs.
+*/
+   public interface InputConfig extends Serializable {
+   }
+
+   /**
+* A representation of a Network {@link InputConfig}.
+*/
+   public static class NetworkInputConfig implements InputConfig {
+   private final TypeSerializer typeSerializer;
+   private int inputGateIndex;
+
+   public NetworkInputConfig(TypeSerializer typeSerializer, int 
inputGateIndex) {
+   this.typeSerializer = typeSerializer;
+   this.inputGateIndex = inputGateIndex;
+   }
+
+   public TypeSerializer getTypeSerializer() {
+   return typeSerializer;
+   }
+
+   public int getInputGateIndex() {
+   return inputGateIndex;
+   }
+   }
+
+   /**
+* A serialized representation of an input.
+*/
+   public static class SourceInputConfig implements InputConfig {
+   private final StreamEdge inputEdge;
+
+   public SourceInputConfig(StreamEdge inputEdge) {
+   this.inputEdge = inputEdge;
+   }
+
+   public StreamEdge getInputEdge() {
+   return inputEdge;
+   }
+
+   @Override
+   public String toString() {
+   return inputEdge.toString();
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return Objects.equals(obj, inputEdge);

Review comment:
   Are we comparing `InputEdge` vs `SourceInputConfig` here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-08-28 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479115224



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##
@@ -208,19 +206,100 @@ public OperatorChain(
OperatorChain(
List> allOperatorWrappers,
RecordWriterOutput[] streamOutputs,
-   WatermarkGaugeExposingOutput> 
chainEntryPoint,
-   StreamOperatorWrapper headOperatorWrapper) {
+   WatermarkGaugeExposingOutput> 
mainOperatorOutput,
+   StreamOperatorWrapper mainOperatorWrapper) {
 
this.streamOutputs = checkNotNull(streamOutputs);
-   this.chainEntryPoint = checkNotNull(chainEntryPoint);
+   this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
this.operatorEventDispatcher = null;
 
checkState(allOperatorWrappers != null && 
allOperatorWrappers.size() > 0);
-   this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+   this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
this.tailOperatorWrapper = allOperatorWrappers.get(0);
this.numOperators = allOperatorWrappers.size();
+   this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-   linkOperatorWrappers(allOperatorWrappers);
+   firstOperatorWrapper = 
linkOperatorWrappers(allOperatorWrappers);
+   }
+
+   private void createChainOutputs(
+   List outEdgesInOrder,
+   RecordWriterDelegate>> 
recordWriterDelegate,
+   Map chainedConfigs,
+   StreamTask containingTask,
+   Map> streamOutputMap) {
+   for (int i = 0; i < outEdgesInOrder.size(); i++) {
+   StreamEdge outEdge = outEdgesInOrder.get(i);
+
+   RecordWriterOutput streamOutput = createStreamOutput(
+   recordWriterDelegate.getRecordWriter(i),
+   outEdge,
+   chainedConfigs.get(outEdge.getSourceId()),
+   containingTask.getEnvironment());
+
+   this.streamOutputs[i] = streamOutput;
+   streamOutputMap.put(outEdge, streamOutput);
+   }
+   }
+
+   private ChainedSourceOutputs createChainedInputs(
+   StreamTask containingTask,
+   StreamConfig.Input[] configuredInputs,
+   Map chainedConfigs,
+   ClassLoader userCodeClassloader,
+   List> allOpWrappers) {
+   if (Arrays.stream(configuredInputs).noneMatch(input -> input 
instanceof SourceInput)) {
+   return new ChainedSourceOutputs();
+   }
+   checkState(
+   mainOperatorWrapper.getStreamOperator() instanceof 
MultipleInputStreamOperator,

Review comment:
   Makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-08-28 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479113108



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##
@@ -208,19 +206,100 @@ public OperatorChain(
OperatorChain(
List> allOperatorWrappers,
RecordWriterOutput[] streamOutputs,
-   WatermarkGaugeExposingOutput> 
chainEntryPoint,
-   StreamOperatorWrapper headOperatorWrapper) {
+   WatermarkGaugeExposingOutput> 
mainOperatorOutput,
+   StreamOperatorWrapper mainOperatorWrapper) {
 
this.streamOutputs = checkNotNull(streamOutputs);
-   this.chainEntryPoint = checkNotNull(chainEntryPoint);
+   this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
this.operatorEventDispatcher = null;
 
checkState(allOperatorWrappers != null && 
allOperatorWrappers.size() > 0);
-   this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+   this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
this.tailOperatorWrapper = allOperatorWrappers.get(0);
this.numOperators = allOperatorWrappers.size();
+   this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-   linkOperatorWrappers(allOperatorWrappers);
+   firstOperatorWrapper = 
linkOperatorWrappers(allOperatorWrappers);
+   }
+
+   private void createChainOutputs(
+   List outEdgesInOrder,
+   RecordWriterDelegate>> 
recordWriterDelegate,
+   Map chainedConfigs,
+   StreamTask containingTask,
+   Map> streamOutputMap) {
+   for (int i = 0; i < outEdgesInOrder.size(); i++) {
+   StreamEdge outEdge = outEdgesInOrder.get(i);
+
+   RecordWriterOutput streamOutput = createStreamOutput(
+   recordWriterDelegate.getRecordWriter(i),
+   outEdge,
+   chainedConfigs.get(outEdge.getSourceId()),
+   containingTask.getEnvironment());
+
+   this.streamOutputs[i] = streamOutput;
+   streamOutputMap.put(outEdge, streamOutput);
+   }
+   }
+
+   private ChainedSourceOutputs createChainedInputs(

Review comment:
   Given the new signature: `Map 
createChainedInputs`,
   nit: `createChainedSources`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-08-28 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479107582



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput implements 
WatermarkGaugeExposingOutput> {

Review comment:
   I ran diff on them and see the difference in only 
`setKeyContextElement1` vs `setKeyContextElement` and in `close`. 
   
   I think the root cause is that `Input` duplicates `OneInputStreamOperator`. 
And the proper solution would be to refactor `SourceOperator`. But that's 
apparently out of scope.
   
   Instead, we could use an adapter from operator to input and use only 
`MultipleInputChainingOutput`, like this:
   ```
   // instead of new ChainingOutput()
   currentOperatorOutput = new 
MultipleInputChainingOutput<>(Input.from(operator), ((OperatorMetricGroup) 
operator.getMetricGroup()), this, outputTag) {};
   // where Input.from returns some Input delegating to operator
   ```
   (I think the delegation overhead will be eliminated by JVM, but it can be 
tested).
   
   Just a suggestion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-08-28 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479061227



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput implements 
WatermarkGaugeExposingOutput> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MultipleInputChainingOutput.class);
+
+   protected final Input input;
+   protected final Counter numRecordsIn;
+   protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
+   protected final StreamStatusProvider streamStatusProvider;
+   @Nullable protected final OutputTag outputTag;
+
+   public MultipleInputChainingOutput(
+   Input input,
+   OperatorMetricGroup operatorMetricGroup,
+   StreamStatusProvider streamStatusProvider,
+   @Nullable OutputTag outputTag) {
+   this.input = input;
+
+   {
+   Counter tmpNumRecordsIn;
+   try {
+   OperatorIOMetricGroup ioMetricGroup = 
operatorMetricGroup.getIOMetricGroup();
+   tmpNumRecordsIn = 
ioMetricGroup.getNumRecordsInCounter();
+   } catch (Exception e) {
+   LOG.warn("An exception occurred during the 
metrics setup.", e);
+   tmpNumRecordsIn = new SimpleCounter();
+   }
+   numRecordsIn = tmpNumRecordsIn;
+   }
+
+   this.streamStatusProvider = streamStatusProvider;
+   this.outputTag = outputTag;
+   }
+
+   @Override
+   public void collect(StreamRecord record) {
+   if (this.outputTag != null) {
+   // we are not responsible for emitting to the main 
output.
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
+   @Override
+   public  void collect(OutputTag outputTag, StreamRecord record) 
{
+   if (this.outputTag == null || 
!this.outputTag.equals(outputTag)) {

Review comment:
   It's unclear to me what is the contract and the intended behavior.
   If
   > outputTag is @ NonNull (by default)
   
   then we probably should check for it.
   Currently, we'll just drop the record.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-08-28 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r479061227



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput implements 
WatermarkGaugeExposingOutput> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MultipleInputChainingOutput.class);
+
+   protected final Input input;
+   protected final Counter numRecordsIn;
+   protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
+   protected final StreamStatusProvider streamStatusProvider;
+   @Nullable protected final OutputTag outputTag;
+
+   public MultipleInputChainingOutput(
+   Input input,
+   OperatorMetricGroup operatorMetricGroup,
+   StreamStatusProvider streamStatusProvider,
+   @Nullable OutputTag outputTag) {
+   this.input = input;
+
+   {
+   Counter tmpNumRecordsIn;
+   try {
+   OperatorIOMetricGroup ioMetricGroup = 
operatorMetricGroup.getIOMetricGroup();
+   tmpNumRecordsIn = 
ioMetricGroup.getNumRecordsInCounter();
+   } catch (Exception e) {
+   LOG.warn("An exception occurred during the 
metrics setup.", e);
+   tmpNumRecordsIn = new SimpleCounter();
+   }
+   numRecordsIn = tmpNumRecordsIn;
+   }
+
+   this.streamStatusProvider = streamStatusProvider;
+   this.outputTag = outputTag;
+   }
+
+   @Override
+   public void collect(StreamRecord record) {
+   if (this.outputTag != null) {
+   // we are not responsible for emitting to the main 
output.
+   return;
+   }
+
+   pushToOperator(record);
+   }
+
+   @Override
+   public  void collect(OutputTag outputTag, StreamRecord record) 
{
+   if (this.outputTag == null || 
!this.outputTag.equals(outputTag)) {

Review comment:
   It's unclear to me what is the contract and the intended behavior.
   If
   > outputTag is @NonNull (by default)
   
   then we probably should check for it.
   Currently, we'll just drop the record.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-08-26 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r477498178



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##
@@ -208,19 +206,100 @@ public OperatorChain(
OperatorChain(
List> allOperatorWrappers,
RecordWriterOutput[] streamOutputs,
-   WatermarkGaugeExposingOutput> 
chainEntryPoint,
-   StreamOperatorWrapper headOperatorWrapper) {
+   WatermarkGaugeExposingOutput> 
mainOperatorOutput,
+   StreamOperatorWrapper mainOperatorWrapper) {
 
this.streamOutputs = checkNotNull(streamOutputs);
-   this.chainEntryPoint = checkNotNull(chainEntryPoint);
+   this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
this.operatorEventDispatcher = null;
 
checkState(allOperatorWrappers != null && 
allOperatorWrappers.size() > 0);
-   this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+   this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
this.tailOperatorWrapper = allOperatorWrappers.get(0);
this.numOperators = allOperatorWrappers.size();
+   this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-   linkOperatorWrappers(allOperatorWrappers);
+   firstOperatorWrapper = 
linkOperatorWrappers(allOperatorWrappers);
+   }
+
+   private void createChainOutputs(
+   List outEdgesInOrder,
+   RecordWriterDelegate>> 
recordWriterDelegate,
+   Map chainedConfigs,
+   StreamTask containingTask,
+   Map> streamOutputMap) {
+   for (int i = 0; i < outEdgesInOrder.size(); i++) {
+   StreamEdge outEdge = outEdgesInOrder.get(i);
+
+   RecordWriterOutput streamOutput = createStreamOutput(
+   recordWriterDelegate.getRecordWriter(i),
+   outEdge,
+   chainedConfigs.get(outEdge.getSourceId()),
+   containingTask.getEnvironment());
+
+   this.streamOutputs[i] = streamOutput;
+   streamOutputMap.put(outEdge, streamOutput);
+   }
+   }
+
+   private ChainedSourceOutputs createChainedInputs(
+   StreamTask containingTask,
+   StreamConfig.Input[] configuredInputs,
+   Map chainedConfigs,
+   ClassLoader userCodeClassloader,
+   List> allOpWrappers) {
+   if (Arrays.stream(configuredInputs).noneMatch(input -> input 
instanceof SourceInput)) {
+   return new ChainedSourceOutputs();
+   }
+   checkState(
+   mainOperatorWrapper.getStreamOperator() instanceof 
MultipleInputStreamOperator,

Review comment:
   Isn't this method called for any operator?
   The `if` check above will not return from function if there are 
`NetworkInput`s configured.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] rkhachatryan commented on a change in pull request #13234: [FLINK-18905][task] Allow SourceOperator chaining with MultipleInputStreamTask

2020-08-26 Thread GitBox


rkhachatryan commented on a change in pull request #13234:
URL: https://github.com/apache/flink/pull/13234#discussion_r477487377



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.util.OutputTag;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+class MultipleInputChainingOutput implements 
WatermarkGaugeExposingOutput> {

Review comment:
   This class duplicates quite some parts of `ChainingOutput`.
   Why not re-use it by extending/delegating?

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##
@@ -628,4 +622,46 @@ public String toString() {
 
return builder.toString();
}
+
+   /**
+* Interface representing chained inputs.
+*/
+   public static class Input implements Serializable {

Review comment:
   1. Rename `Input` to `InputConfig` as there is `..api.operators.Input` 
already? (and subclasses)
   2. Make it `interface` and not `class`? This will allow children to extend 
other classes and ease testing
   3. Why is this class not parameterized? Its serializer is passed to 
parameterized classes
   
   

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##
@@ -208,19 +206,100 @@ public OperatorChain(
OperatorChain(
List> allOperatorWrappers,
RecordWriterOutput[] streamOutputs,
-   WatermarkGaugeExposingOutput> 
chainEntryPoint,
-   StreamOperatorWrapper headOperatorWrapper) {
+   WatermarkGaugeExposingOutput> 
mainOperatorOutput,
+   StreamOperatorWrapper mainOperatorWrapper) {
 
this.streamOutputs = checkNotNull(streamOutputs);
-   this.chainEntryPoint = checkNotNull(chainEntryPoint);
+   this.mainOperatorOutput = checkNotNull(mainOperatorOutput);
this.operatorEventDispatcher = null;
 
checkState(allOperatorWrappers != null && 
allOperatorWrappers.size() > 0);
-   this.headOperatorWrapper = checkNotNull(headOperatorWrapper);
+   this.mainOperatorWrapper = checkNotNull(mainOperatorWrapper);
this.tailOperatorWrapper = allOperatorWrappers.get(0);
this.numOperators = allOperatorWrappers.size();
+   this.chainedSourceOutputs = new ChainedSourceOutputs();
 
-   linkOperatorWrappers(allOperatorWrappers);
+   firstOperatorWrapper = 
linkOperatorWrappers(allOperatorWrappers);
+   }
+
+   private void createChainOutputs(
+   List outEdgesInOrder,
+   RecordWriterDelegate>> 
recordWriterDelegate,
+   Map chainedConfigs,
+   StreamTask containingTask,
+   Map> streamOutputMap) {

Review comment:
   nit: indentation

##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputChainingOutput.java
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for