[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-16 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210509687
 
 

 ##
 File path: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ##
 @@ -63,7 +63,7 @@ private static Path findUtilsModule() {
 
@Test
public void testProgram() throws Exception {
-   Path testEntryPoint = new Path(getBaseTestPythonDir(), 
"examples/word_count.py");
+   Path testEntryPoint = new Path(getBaseTestPythonDir(), 
"run_all_tests.py");
 
 Review comment:
   Is this change relevant? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-16 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210511275
 
 

 ##
 File path: 
flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_output_with_sink_identifier.py
 ##
 @@ -0,0 +1,58 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+
+
+class Tokenizer(FlatMapFunction):
+def flatMap(self, value, collector):
+collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+def reduce(self, input1, input2):
+count1, val1 = input1
+count2, val2 = input2
+return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+def getKey(self, input):
+return input[1]
+
+
+class Main:
+def run(self, flink):
+elements = ["aa" if iii % 2 == 0 else "bbb" for iii in 
range(constants.NUM_ELEMENTS_IN_TEST)]
+
+env = flink.get_execution_environment()
+env.from_collection(elements) \
+.flat_map(Tokenizer()) \
+.key_by(Selector()) \
+.time_window(milliseconds(10)) \
+.reduce(Sum()) \
+.output("testOutputWithIdentifier")
+
+env.execute()
 
 Review comment:
   This is first time I'm looking at our python tests, so take my comment with 
a grain of salt.
   
   Isn't this test (and other python streaming tests) supposed to assert on 
something? Aren't they now testing only that python api doesn't throw anything 
but it would still pass with empty methods? @zentol  ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-15 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210239256
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -18,81 +18,74 @@
 package org.apache.flink.streaming.api.functions.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.util.TaskOutputWriter;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-import java.io.PrintStream;
-
 /**
  * Implementation of the SinkFunction writing every tuple to the standard
  * output or standard error stream.
  *
- * @param 
- *Input record type
+ * 
+ * Four possible format options:
+ * {@code sinkIdentifier}:taskId> output  <- {@code sinkIdentifier} 
provided, parallelism > 1
+ * {@code sinkIdentifier}> output <- {@code sinkIdentifier} 
provided, parallelism == 1
+ *  taskId> output<- no {@code 
sinkIdentifier} provided, parallelism > 1
+ *  output<- no {@code 
sinkIdentifier} provided, parallelism == 1
+ * 
+ *
+ * @param  Input record type
  */
 @PublicEvolving
 public class PrintSinkFunction extends RichSinkFunction {
-   private static final long serialVersionUID = 1L;
 
-   private static final boolean STD_OUT = false;
-   private static final boolean STD_ERR = true;
+   private static final long serialVersionUID = 1L;
 
-   private boolean target;
-   private transient PrintStream stream;
-   private transient String prefix;
+   private final TaskOutputWriter writer;
 
/**
 * Instantiates a print sink function that prints to standard out.
 */
-   public PrintSinkFunction() {}
+   public PrintSinkFunction() {
+   writer = new TaskOutputWriter<>();
+   }
 
/**
 * Instantiates a print sink function that prints to standard out.
 *
 * @param stdErr True, if the format should print to standard error 
instead of standard out.
 */
-   public PrintSinkFunction(boolean stdErr) {
-   target = stdErr;
-   }
-
-   public void setTargetToStandardOut() {
-   target = STD_OUT;
+   public PrintSinkFunction(final boolean stdErr) {
+   writer = new TaskOutputWriter<>(stdErr);
}
 
-   public void setTargetToStandardErr() {
-   target = STD_ERR;
+   /**
+* Instantiates a print sink function that prints to standard out and 
gives a sink identifier.
+*
+* @param stdErr True, if the format should print to standard error 
instead of standard out.
+* @param sinkIdentifier Message that identify sink and is prefixed to 
the output of the value
+*/
+   public PrintSinkFunction(final String sinkIdentifier, final boolean 
stdErr) {
+   writer = new TaskOutputWriter<>(sinkIdentifier, stdErr);
}
 
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
-   // get the target stream
-   stream = target == STD_OUT ? System.out : System.err;
+   int taskNumber = context.getIndexOfThisSubtask();
 
 Review comment:
   nit: inline those `taskNumber` and `numTasks` variables?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-15 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210240596
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link PrintingOutputFormat}.
+ */
+public class PrintingOutputFormatTest {
+
+   private final PrintStream originalSystemOut = System.out;
+   private final PrintStream originalSystemErr = System.err;
+
+   private final ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+   private final ByteArrayOutputStream arrayErrorStream = new 
ByteArrayOutputStream();
+
+   private final String line = System.lineSeparator();
+
+   @Before
+   public void setUp() {
+   System.setOut(new PrintStream(arrayOutputStream));
+   System.setErr(new PrintStream(arrayErrorStream));
+   }
+
+   @After
+   public void tearDown() {
+   if (System.out != originalSystemOut) {
+   System.out.close();
+   }
+   if (System.err != originalSystemErr) {
+   System.err.close();
+   }
+   System.setOut(originalSystemOut);
+   System.setErr(originalSystemErr);
+   }
+
+   @Test
+   public void testPrintOutputFormatStdOut() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatStdErr() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>(true);
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayErrorStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithPrefix() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithIdentifierAndPrefix() throws 
Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>("mySink", false);
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintingOutputFormat<>("mySink", true);
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
 
 Review comment:
   In those 3 lines: were you trying to test for stderr? If so, you haven't 
invoked anything on stderr instance of `printSink` and in this line, you are 
just asserting again the standard output. For asserting stderr you should check 
against `arrayErrorStream` instead of `arrayOutputStream`. 
   
   However, I don't think it is necessary here to test for prefix AND for 
stderr 

[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-15 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210242096
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Task output writer for DataStream and DataSet print API.
+ */
+public class TaskOutputWriter implements Serializable {
 
 Review comment:
   add `@Internal`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-15 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210240875
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -103,4 +102,41 @@ public void testPrintSinkWithPrefix() throws Exception {
assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
printSink.close();
}
+
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>("mySink", false);
+   printSink.setRuntimeContext(new 
MockStreamingRuntimeContext(false, 2, 1));
+   printSink.open(new Configuration());
+
+   printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintSinkFunction<>("mySink", true);
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintSinkWithIdentifierButNoPrefix() throws Exception {
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>("mySink", false);
+   printSink.setRuntimeContext(new MockRuntimeContext(false, 1, 
0));
+   printSink.open(new Configuration());
+
+   printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintSinkFunction<>("mySink", true);
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink> hello world!" + line, 
arrayOutputStream.toString());
 
 Review comment:
   ditto regarding testing stderr


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-15 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210240862
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -103,4 +102,41 @@ public void testPrintSinkWithPrefix() throws Exception {
assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
printSink.close();
}
+
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>("mySink", false);
+   printSink.setRuntimeContext(new 
MockStreamingRuntimeContext(false, 2, 1));
+   printSink.open(new Configuration());
+
+   printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintSinkFunction<>("mySink", true);
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
 
 Review comment:
   ditto regarding testing stderr


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-15 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210239256
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -18,81 +18,74 @@
 package org.apache.flink.streaming.api.functions.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.util.TaskOutputWriter;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-import java.io.PrintStream;
-
 /**
  * Implementation of the SinkFunction writing every tuple to the standard
  * output or standard error stream.
  *
- * @param 
- *Input record type
+ * 
+ * Four possible format options:
+ * {@code sinkIdentifier}:taskId> output  <- {@code sinkIdentifier} 
provided, parallelism > 1
+ * {@code sinkIdentifier}> output <- {@code sinkIdentifier} 
provided, parallelism == 1
+ *  taskId> output<- no {@code 
sinkIdentifier} provided, parallelism > 1
+ *  output<- no {@code 
sinkIdentifier} provided, parallelism == 1
+ * 
+ *
+ * @param  Input record type
  */
 @PublicEvolving
 public class PrintSinkFunction extends RichSinkFunction {
-   private static final long serialVersionUID = 1L;
 
-   private static final boolean STD_OUT = false;
-   private static final boolean STD_ERR = true;
+   private static final long serialVersionUID = 1L;
 
-   private boolean target;
-   private transient PrintStream stream;
-   private transient String prefix;
+   private final TaskOutputWriter writer;
 
/**
 * Instantiates a print sink function that prints to standard out.
 */
-   public PrintSinkFunction() {}
+   public PrintSinkFunction() {
+   writer = new TaskOutputWriter<>();
+   }
 
/**
 * Instantiates a print sink function that prints to standard out.
 *
 * @param stdErr True, if the format should print to standard error 
instead of standard out.
 */
-   public PrintSinkFunction(boolean stdErr) {
-   target = stdErr;
-   }
-
-   public void setTargetToStandardOut() {
-   target = STD_OUT;
+   public PrintSinkFunction(final boolean stdErr) {
+   writer = new TaskOutputWriter<>(stdErr);
}
 
-   public void setTargetToStandardErr() {
-   target = STD_ERR;
+   /**
+* Instantiates a print sink function that prints to standard out and 
gives a sink identifier.
+*
+* @param stdErr True, if the format should print to standard error 
instead of standard out.
+* @param sinkIdentifier Message that identify sink and is prefixed to 
the output of the value
+*/
+   public PrintSinkFunction(final String sinkIdentifier, final boolean 
stdErr) {
+   writer = new TaskOutputWriter<>(sinkIdentifier, stdErr);
}
 
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
-   // get the target stream
-   stream = target == STD_OUT ? System.out : System.err;
+   int taskNumber = context.getIndexOfThisSubtask();
 
 Review comment:
   nit: inline those variables?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-15 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210240642
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link PrintingOutputFormat}.
+ */
+public class PrintingOutputFormatTest {
+
+   private final PrintStream originalSystemOut = System.out;
+   private final PrintStream originalSystemErr = System.err;
+
+   private final ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+   private final ByteArrayOutputStream arrayErrorStream = new 
ByteArrayOutputStream();
+
+   private final String line = System.lineSeparator();
+
+   @Before
+   public void setUp() {
+   System.setOut(new PrintStream(arrayOutputStream));
+   System.setErr(new PrintStream(arrayErrorStream));
+   }
+
+   @After
+   public void tearDown() {
+   if (System.out != originalSystemOut) {
+   System.out.close();
+   }
+   if (System.err != originalSystemErr) {
+   System.err.close();
+   }
+   System.setOut(originalSystemOut);
+   System.setErr(originalSystemErr);
+   }
+
+   @Test
+   public void testPrintOutputFormatStdOut() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatStdErr() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>(true);
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayErrorStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithPrefix() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithIdentifierAndPrefix() throws 
Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>("mySink", false);
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintingOutputFormat<>("mySink", true);
 
 Review comment:
   nit: you haven't closed the previous sink. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-15 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210241537
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockRuntimeContext.java
 ##
 @@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import java.util.Collections;
+
+/**
+ * Simple test runtime context for stream operators.
+ */
+public class MockRuntimeContext extends StreamingRuntimeContext {
 
 Review comment:
   Why did you add this class? I have already extracted the same thing on the 
master branch to `MockStreamingRuntimeContext`. I'm guessing some rebase error?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-15 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210240596
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link PrintingOutputFormat}.
+ */
+public class PrintingOutputFormatTest {
+
+   private final PrintStream originalSystemOut = System.out;
+   private final PrintStream originalSystemErr = System.err;
+
+   private final ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+   private final ByteArrayOutputStream arrayErrorStream = new 
ByteArrayOutputStream();
+
+   private final String line = System.lineSeparator();
+
+   @Before
+   public void setUp() {
+   System.setOut(new PrintStream(arrayOutputStream));
+   System.setErr(new PrintStream(arrayErrorStream));
+   }
+
+   @After
+   public void tearDown() {
+   if (System.out != originalSystemOut) {
+   System.out.close();
+   }
+   if (System.err != originalSystemErr) {
+   System.err.close();
+   }
+   System.setOut(originalSystemOut);
+   System.setErr(originalSystemErr);
+   }
+
+   @Test
+   public void testPrintOutputFormatStdOut() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatStdErr() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>(true);
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayErrorStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithPrefix() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithIdentifierAndPrefix() throws 
Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>("mySink", false);
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintingOutputFormat<>("mySink", true);
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
 
 Review comment:
   Were you trying in those 3 lines test for stderr? If so, you haven't invoked 
anything on stderr instance of `printSink` and in this line, you are just 
asserting again the standard output. For asserting stderr you should check 
against `arrayErrorStream` instead of `arrayOutputStream`. 
   
   However, I don't think it is necessary here to test for prefix AND for 
stderr together. So 

[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-15 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210240728
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link PrintingOutputFormat}.
+ */
+public class PrintingOutputFormatTest {
+
+   private final PrintStream originalSystemOut = System.out;
+   private final PrintStream originalSystemErr = System.err;
+
+   private final ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+   private final ByteArrayOutputStream arrayErrorStream = new 
ByteArrayOutputStream();
+
+   private final String line = System.lineSeparator();
+
+   @Before
+   public void setUp() {
+   System.setOut(new PrintStream(arrayOutputStream));
+   System.setErr(new PrintStream(arrayErrorStream));
+   }
+
+   @After
+   public void tearDown() {
+   if (System.out != originalSystemOut) {
+   System.out.close();
+   }
+   if (System.err != originalSystemErr) {
+   System.err.close();
+   }
+   System.setOut(originalSystemOut);
+   System.setErr(originalSystemErr);
+   }
+
+   @Test
+   public void testPrintOutputFormatStdOut() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatStdErr() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>(true);
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayErrorStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithPrefix() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithIdentifierAndPrefix() throws 
Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>("mySink", false);
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintingOutputFormat<>("mySink", true);
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithIdentifierButNoPrefix() throws 
Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>("mySink", false);
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", 

[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-15 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210239166
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link PrintingOutputFormat}.
+ */
+public class PrintingOutputFormatTest {
 
 Review comment:
   +1 for adding those tests!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-15 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210238813
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Task output writer for DataStream and DataSet print API.
+ */
+public class TaskOutputWriter implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final boolean STD_OUT = false;
+   private static final boolean STD_ERR = true;
+
+   private final boolean target;
+   private transient PrintStream stream;
+   private final String sinkIdentifier;
+   private transient String completedPrefix;
+
+   public TaskOutputWriter() {
+   this("", STD_OUT);
+   }
+
+   public TaskOutputWriter(final boolean stdErr) {
+   this("", stdErr);
+   }
+
+   public TaskOutputWriter(final String sinkIdentifier, final boolean 
stdErr) {
+   this.target = stdErr;
+   this.sinkIdentifier = (sinkIdentifier == null ? "" : 
sinkIdentifier);
+   }
+
+   public void open(int taskNumber, int numTasks) {
+   // get the target stream
+   stream = target == STD_OUT ? System.out : System.err;
+
+   completedPrefix = sinkIdentifier;
+
+   if (numTasks > 1) {
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += ":";
+   }
+   completedPrefix += (taskNumber + 1);
+   }
+
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += "> ";
+   }
+   }
+
+   public void write(IN record) {
+   stream.println(completedPrefix + record.toString());
+   }
+
+   public void close() {
 
 Review comment:
   Drop the close? It seems like a dead empty method. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-14 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209911856
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -70,27 +84,30 @@ public void open(Configuration parameters) throws 
Exception {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
 
-   // set the prefix if we have a >1 parallelism
-   prefix = (context.getNumberOfParallelSubtasks() > 1) ?
-   ((context.getIndexOfThisSubtask() + 1) + "> ") 
: null;
+   completedPrefix = sinkIdentifier;
+
+   if (context.getNumberOfParallelSubtasks() > 1) {
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += ":";
+   }
+   completedPrefix += (context.getIndexOfThisSubtask() + 
1);
+   }
+
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += "> ";
+   }
}
 
@Override
public void invoke(IN record) {
-   if (prefix != null) {
-   stream.println(prefix + record.toString());
+   if (completedPrefix != null) {
 
 Review comment:
   This check is not valid anymore, this field is never null. It can either be 
dropped altogether or replaced by empty string check. Probably it's better to 
drop it. Functionally both will be the same and empty string check's 
performance benefit (if any) doesn't matter while printing to STDOUT/STDERR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-14 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209911856
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -70,27 +84,30 @@ public void open(Configuration parameters) throws 
Exception {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
 
-   // set the prefix if we have a >1 parallelism
-   prefix = (context.getNumberOfParallelSubtasks() > 1) ?
-   ((context.getIndexOfThisSubtask() + 1) + "> ") 
: null;
+   completedPrefix = sinkIdentifier;
+
+   if (context.getNumberOfParallelSubtasks() > 1) {
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += ":";
+   }
+   completedPrefix += (context.getIndexOfThisSubtask() + 
1);
+   }
+
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += "> ";
+   }
}
 
@Override
public void invoke(IN record) {
-   if (prefix != null) {
-   stream.println(prefix + record.toString());
+   if (completedPrefix != null) {
 
 Review comment:
   This check is not valid anymore, this field is never null.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209233947
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -40,6 +40,8 @@
private boolean target;
private transient PrintStream stream;
private transient String prefix;
+   private String sinkIdentifier;
+   private transient String completedPrefix;
 
 Review comment:
   Please drop `prefix` field, it's only a local variable now. Also please 
change `target` and `sinkIdentifier` into `final` fields.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209234301
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -55,6 +57,17 @@ public PrintSinkFunction(boolean stdErr) {
target = stdErr;
}
 
+   /**
+* Instantiates a print sink function that prints to standard out and 
gives a sink identifier.
+*
+* @param stdErr True, if the format should print to standard error 
instead of standard out.
+* @param sinkIdentifier Message that identify sink and is prefixed to 
the output of the value
+*/
+   public PrintSinkFunction(boolean stdErr, String sinkIdentifier) {
+   this(stdErr);
+   this.sinkIdentifier = sinkIdentifier;
 
 Review comment:
   Usually less detailed constructor is calling the more specific ones, not the 
other way around. Here it will allow you to mark fields as `final`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209239877
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception {
stream.close();
}
 
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   PrintStream stream = new PrintStream(baos);
+   System.setOut(stream);
+
+   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
+   Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2);
+   Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1);
+
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>(false, "mySink");
+   printSink.setRuntimeContext(ctx);
+   try {
+   printSink.open(new Configuration());
+   } catch (Exception e) {
+   Assert.fail();
 
 Review comment:
   Do not hide the original exception. From what I've heard there was some bug 
with old junit version and that's why this pattern is reoccurring in Flink 
tests. Regardless if that was the case, it's not the problem anymore, and 
hiding original exception makes test failures harder to read/understand. (ditto 
in rest of the file as well).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209260792
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception {
stream.close();
}
 
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   PrintStream stream = new PrintStream(baos);
+   System.setOut(stream);
+
+   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
+   Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2);
+   Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1);
+
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>(false, "mySink");
+   printSink.setRuntimeContext(ctx);
+   try {
+   printSink.open(new Configuration());
+   } catch (Exception e) {
+   Assert.fail();
+   }
+   printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, baos.toString());
+
+   printSink.setTargetToStandardErr();
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, baos.toString());
+
+   printSink.close();
+   stream.close();
+   }
+
+   @Test
+   public void testPrintSinkWithIdentifierButNoPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
 Review comment:
   There is quite a lot of code duplication in those tests and they were 
unnecessarily using mockito instead of proper mock. Also there was even a bug 
in `testPrintSinkStdErr`. I have fixed those issues in a hotifx: 
https://github.com/apache/flink/pull/6538
   
   I would the hotifx to be merged before this PR and please adapt/rewrite your 
test in similar fashion as I did in my hotfix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209237840
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws 
Exception {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
 
+   /**
+* Four possible format options:
+*  sinkId:taskId> output  <- sink id provided, parallelism 
> 1
+*  sinkId> output <- sink id provided, parallelism 
== 1
+*  taskId> output <- no sink id provided, 
parallelism > 1
+*  output <- no sink id provided, 
parallelism == 1
+*/
+
// set the prefix if we have a >1 parallelism
prefix = (context.getNumberOfParallelSubtasks() > 1) ?
((context.getIndexOfThisSubtask() + 1) + "> ") 
: null;
+
+   if (prefix == null) {
 
 Review comment:
   Don't use nulls here. In this case we can easily use empty string for the 
same purpose and it will be safer (no possible NPE).
   
   Btw, rephrasing this logic like that:
   ```
   completedPrefix = sinkIdentifier;
   
   if (context.getNumberOfParallelSubtasks() > 1)) {
 if (!completedPrefix.isEmpty()) {
   completedPrefix += ":";
 }
 completedPrefix += (context.getIndexOfThisSubtask() + 1);
   }
   
   if (!completedPrefix.isEmpty()) {
 completedPrefix += ">";
   }
   ```
   (optionally with ternary operator instead of some 'if' statements - that's 
only a matter of taste) simplifies the logic and deduplicate some of the 
code/constants.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209234901
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws 
Exception {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
 
+   /**
 
 Review comment:
   please move this to class's java doc. Also replace `sinkId`, `sink id` with 
`{@code sinkIdentifier} `.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209238315
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -89,6 +116,8 @@ public void invoke(IN record) {
public void close() {
this.stream = null;
this.prefix = null;
+   this.sinkIdentifier = null;
+   this.completedPrefix = null;
 
 Review comment:
   Please drop the this `close` method. It doesn't do anything beside causing 
`NPE`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209239178
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception {
stream.close();
}
 
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   PrintStream stream = new PrintStream(baos);
+   System.setOut(stream);
+
+   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
 
 Review comment:
   Do not use mockito for such things, it's very difficult to maintain such 
tests in the future. Instead please move 
`org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseTest.MockRuntimeContext`
 class to `flink-streaming-java` module to some utility package and reuse it in 
this whole `PrintSinkFunctionTest`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services