[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37174502
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

I would like to address that in a followup, as this creates conflicts with 
#988 otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1017#issuecomment-131772364
  
Allright, if there are no further comments, I'll merge this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1017


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37168797
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 ---
@@ -39,61 +37,40 @@
  */
 public class SourceStreamTaskOUT extends StreamTaskOUT, 
StreamSourceOUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(SourceStreamTask.class);
-
@Override
-   public void invoke() throws Exception {
-   final SourceOutputStreamRecordOUT output = new 
SourceOutputStreamRecordOUT(outputHandler.getOutput(), checkpointLock);
-
-   boolean operatorOpen = false;
-
-   if (LOG.isDebugEnabled()) {
-   LOG.debug(Task {} invoked, getName());
-   }
-
-   try {
-   openOperator();
-   operatorOpen = true;
-
-   streamOperator.run(checkpointLock, output);
-
-   closeOperator();
-   operatorOpen = false;
-
-   if (LOG.isDebugEnabled()) {
-   LOG.debug(Task {} invocation finished, 
getName());
-   }
-
-   }
-   catch (Exception e) {
-   LOG.error(getEnvironment().getTaskNameWithSubtasks() + 
 failed, e);
-
-   if (operatorOpen) {
-   try {
-   closeOperator();
-   }
-   catch (Throwable t) {
-   LOG.warn(Exception while closing 
operator., t);
-   }
-   }
-   throw e;
-   }
-   finally {
-   this.isRunning = false;
-   // Cleanup
-   outputHandler.flushOutputs();
-   clearBuffers();
-   }
+   protected void init() {
+   // does not hold any resources, so no initialization needed
+   }
 
+   @Override
+   protected void cleanup() {
+   // does not hold any resources, so no cleanup needed
}
+   
 
@Override
-   public void cancel() {
-   super.cancel();
+   protected void run() throws Exception {
+   final Object checkpointLock = getCheckpointLock();
+   
+   final SourceOutputStreamRecordOUT output = 
+   new 
SourceOutputStreamRecordOUT(outputHandler.getOutput(), checkpointLock);
+   
+   streamOperator.run(checkpointLock, output);
+   }
+   
+   @Override
+   protected void cancelTask() throws Exception {
streamOperator.cancel();
}
 
+   // 

+   
+   // TODO:
+   // does this help with anything? The losk should be already held by the 
source function that
+   // emits. If that one does not hold the lock, then this does not help 
either.
+   
--- End diff --

This is for the case where the source does not actually acquire the lock 
(because it is not participating in checkpointing). This might be a degenerate 
case, however, and not really necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37174684
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
 ---
@@ -33,9 +33,8 @@
 * Emits a {@link Watermark} from an operator. This watermark is 
broadcast to all downstream
 * operators.
 *
-* p
-* A watermark specifies that no element with a timestamp older or 
equal to the watermark
-* timestamp will be emitted in the future.
+* pA watermark specifies that no element with a timestamp older or 
equal to the watermark
--- End diff --

You are right. I think I saw it differently in some of Sun's classes, and 
copied the style.

It seems the changes to not hurt (JavaDocs interpret the HTML properly), 
but I'll stick with the official style in the future. Thanks for pointing that 
out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1017#issuecomment-131735854
  
This looks like a very nice continuation of the cleanup work. I'd suggest 
to merge it rather sooner than later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37166759
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

The StreamEdge (and StreamNode) stuff is both part of the API and the 
runtime, the separation is not very clear. This is not changed in #988 but 
would have to be addressed in a follow-up PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-17 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37167205
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
 ---
@@ -33,9 +33,8 @@
 * Emits a {@link Watermark} from an operator. This watermark is 
broadcast to all downstream
 * operators.
 *
-* p
-* A watermark specifies that no element with a timestamp older or 
equal to the watermark
-* timestamp will be emitted in the future.
+* pA watermark specifies that no element with a timestamp older or 
equal to the watermark
--- End diff --

Why did you change this? The oracle Javadoc literature 
(http://www.oracle.com/technetwork/articles/java/index-137868.html) and style 
guides (http://blog.joda.org/2012/11/javadoc-coding-standards.html) have it 
like it was. Javadoc is not HTML so tags like li p and so on are not closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37143081
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

i don't think that's necessary as it is just an index starting at 1. The 
possible values 1 and 2 are clearly related to which inputList the reader is 
added to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-16 Thread ffbin
Github user ffbin commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37142604
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

what about use a enum to instead of 2, it will be easier to understand it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-16 Thread gyfora
Github user gyfora commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37144298
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 ---
@@ -25,65 +25,68 @@
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StreamIterationTailIN extends OneInputStreamTaskIN, IN {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationTail.class);
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
--- End diff --

This is probably a copy-paste error - StreamIterationTail.class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37146789
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
 ---
@@ -25,65 +25,68 @@
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StreamIterationTailIN extends OneInputStreamTaskIN, IN {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationTail.class);
+   private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
--- End diff --

Yep, it is. Good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37146808
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

This was actually part of the original code - I did not modify it as part 
of this pull request.
As far as I see it, the `StreamEdge` code is part of the API, not the 
runtime. It may be adjusted as part of  #988 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1017#issuecomment-131431977
  
Fixed the issues with the tests. Builds locally, waiting for Travis to 
confirm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-14 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1017

[FLINK-2462] [streaming] Major cleanup of streaming task structure

This pull request addresses exception handling, code duplication, and 
missed resource cleanups in the streaming operators.

I mixed multiple issues in this pull request, which would have been better 
separated, but many were recognized in the rework, and it was tricky to pull 
the fixes apart.

**NOTE** I have not managed to adjust all tests, yet, but I wanted to open 
this early for feedback.

## Exception handling

The exceptions are no longer logged by the operators themselves. Operators 
perform only cleanup in reaction to exceptions.

Exceptions are reported only the the root Task object, which knows whether 
this is the first failure-causing exception (root cause), or is a subsequent 
exception, or whether the task was actually canceled already. In the later 
case, exceptions are ignored, because many cancellations lead to meaningless 
exceptions.

Added more exception in signatures, less exception wrapping where not needed

## Unified setup / teardown structure in streaming tasks

Core resource acquisition/release logic is in `StreamTask`, reducing code 
duplication.
Subtasks (e.g., `OneInputStreamTask`, `IterationTailStreamTask`) implement 
slim methods for certain parts of the life cycle. The `OneInputStreamTask` 
becomes as simple as this

```java
public void init() throws Exception {
TypeSerializerIN inSerializer = 
configuration.getTypeSerializerIn1(getUserCodeClassLoader());
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessorIN(inputGates, inSerializer,
getCheckpointBarrierListener(), 
configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
getExecutionConfig().areTimestampsEnabled());

// make sure that stream tasks report their I/O statistics
AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();
AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
inputProcessor.setReporter(reporter);
}

protected void run() throws Exception {
while (running  inputProcessor.processInput(streamOperator));
}

protected void cleanup() throws Exception {
inputProcessor.cleanup();
}

protected void cancelTask() {
running = false;
}
```
Guaranteed cleanup of output buffer and input buffer resources (formerly 
missed when other exceptions where encountered).

Unified `StreamRecordWriter` and `RecordWriter` usage.

## Cleanup in the StreamSource

Fix mixup in instantiation of source contexts in the stream source task

Auto watermark generators correctly shut down their interval scheduler

## General

Improve use of generics, got rid of many raw types

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink stream_cleanup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1017.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1017


commit 68efed0a3b4184980de956bd57ba301569adac86
Author: Stephan Ewen se...@apache.org
Date:   2015-08-14T21:32:35Z

[FLINK-2462] [streaming] Major cleanup of operator structure for exception 
handling and code simplication

  - The exceptions are no longer logged by the operators themselves.
Operators perform only cleanup in reaction to exceptions.
Exceptions are reported only the the root Task object, which knows 
whether this is the first
failure-causing exception (root cause), or is a subsequent exception, 
or whether the task was
actually canceled already. In the later case, exceptions are ignored, 
because many
cancellations lead to meaningless exceptions.

  - more exception in signatures, less wrapping where not needed

  - Core resource acquisition/release logic is in one streaming task, 
reducing code duplication

  - Guaranteed cleanup of output buffer and input buffer resources 
(formerly missed when other exceptions where encountered)

  - Fix mixup in instantiation of source contexts in the stream source task

  - Auto watermark generators correctly shut down their interval scheduler

  - Improve use of generics, got rid of many raw types




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes