[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16583552#comment-16583552
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski closed pull request #6367: [FLINK-9850] Add a string to the print 
method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
new file mode 100644
index 000..de058b69f1f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Print sink output writer for DataStream and DataSet print API.
+ */
+@Internal
+public class PrintSinkOutputWriter implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final boolean STD_OUT = false;
+   private static final boolean STD_ERR = true;
+
+   private final boolean target;
+   private transient PrintStream stream;
+   private final String sinkIdentifier;
+   private transient String completedPrefix;
+
+   public PrintSinkOutputWriter() {
+   this("", STD_OUT);
+   }
+
+   public PrintSinkOutputWriter(final boolean stdErr) {
+   this("", stdErr);
+   }
+
+   public PrintSinkOutputWriter(final String sinkIdentifier, final boolean 
stdErr) {
+   this.target = stdErr;
+   this.sinkIdentifier = (sinkIdentifier == null ? "" : 
sinkIdentifier);
+   }
+
+   public void open(int subtaskIndex, int numParallelSubtasks) {
+   // get the target stream
+   stream = target == STD_OUT ? System.out : System.err;
+
+   completedPrefix = sinkIdentifier;
+
+   if (numParallelSubtasks > 1) {
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += ":";
+   }
+   completedPrefix += (subtaskIndex + 1);
+   }
+
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += "> ";
+   }
+   }
+
+   public void write(IN record) {
+   stream.println(completedPrefix + record.toString());
+   }
+
+   @Override
+   public String toString() {
+   return "Print to " + (target == STD_OUT ? "System.out" : 
"System.err");
+   }
+}
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
index 0ab1abb2efb..62eabd0b739 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
@@ -19,45 +19,46 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
 
-import java.io.PrintStream;
-
 /**
  * Output format that prints results into either stdout or stderr.
- * @param 
+ *
+ * 
+ * Four possible format options:
+ * {@code sinkIdentifier}:taskId> output  <- {@code sinkIdentifier} 
provided, parallelism > 1
+ * {@code sinkIdentifier}> output <- {@code sinkIdentifier} 
provided, parallelism == 1
+ *  taskId> output   

[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16583550#comment-16583550
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print 
method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-413787139
 
 
   Thanks for the contribution! @zentol told me his LGTM so merging now :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582258#comment-16582258
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-413483130
 
 
   @pnowojski Python-related changes have been removed, only changes to Java 
and Scala


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582239#comment-16582239
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

zentol commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-413476883
 
 
   yes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582205#comment-16582205
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-413470612
 
 
   @zentol Do you mean that we don't change the python API here and remove the 
tests for the Python API?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582191#comment-16582191
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

zentol commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-413468227
 
 
   Personally I would prefer if we'd not change the python API in this PR, 
because we're now in the exact position we don't want to be in: where the 
merging of the PR is delayed due to another effectively unrelated problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582184#comment-16582184
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

zentol commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210515127
 
 

 ##
 File path: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ##
 @@ -63,7 +63,7 @@ private static Path findUtilsModule() {
 
@Test
public void testProgram() throws Exception {
-   Path testEntryPoint = new Path(getBaseTestPythonDir(), 
"examples/word_count.py");
+   Path testEntryPoint = new Path(getBaseTestPythonDir(), 
"run_all_tests.py");
 
 Review comment:
   Basically this change actually enables most python tests, which i 
accidentally disabled months ago. So yeah, to enable more tests we do need this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582186#comment-16582186
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

zentol commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210514892
 
 

 ##
 File path: 
flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_output_with_sink_identifier.py
 ##
 @@ -0,0 +1,58 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+
+
+class Tokenizer(FlatMapFunction):
+def flatMap(self, value, collector):
+collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+def reduce(self, input1, input2):
+count1, val1 = input1
+count2, val2 = input2
+return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+def getKey(self, input):
+return input[1]
+
+
+class Main:
+def run(self, flink):
+elements = ["aa" if iii % 2 == 0 else "bbb" for iii in 
range(constants.NUM_ELEMENTS_IN_TEST)]
+
+env = flink.get_execution_environment()
+env.from_collection(elements) \
+.flat_map(Tokenizer()) \
+.key_by(Selector()) \
+.time_window(milliseconds(10)) \
+.reduce(Sum()) \
+.output("testOutputWithIdentifier")
+
+env.execute()
 
 Review comment:
   yes this is a common issue with the tests, but fixing this isn't too high on 
my priority list, given that most methods are just striaght-forward wrappers 
around java methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582181#comment-16582181
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

zentol commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210514892
 
 

 ##
 File path: 
flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_output_with_sink_identifier.py
 ##
 @@ -0,0 +1,58 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+
+
+class Tokenizer(FlatMapFunction):
+def flatMap(self, value, collector):
+collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+def reduce(self, input1, input2):
+count1, val1 = input1
+count2, val2 = input2
+return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+def getKey(self, input):
+return input[1]
+
+
+class Main:
+def run(self, flink):
+elements = ["aa" if iii % 2 == 0 else "bbb" for iii in 
range(constants.NUM_ELEMENTS_IN_TEST)]
+
+env = flink.get_execution_environment()
+env.from_collection(elements) \
+.flat_map(Tokenizer()) \
+.key_by(Selector()) \
+.time_window(milliseconds(10)) \
+.reduce(Sum()) \
+.output("testOutputWithIdentifier")
+
+env.execute()
 
 Review comment:
   yes this is a common issue with the tests, but fixing this isn't too high on 
my priority list, given that most methods are just wrappers around java methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582171#comment-16582171
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

zentol commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210514030
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Task output writer for DataStream and DataSet print API.
+ */
+public class TaskOutputWriter implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final boolean STD_OUT = false;
+   private static final boolean STD_ERR = true;
+
+   private final boolean target;
+   private transient PrintStream stream;
+   private final String sinkIdentifier;
+   private transient String completedPrefix;
+
+   public TaskOutputWriter() {
+   this("", STD_OUT);
+   }
+
+   public TaskOutputWriter(final boolean stdErr) {
+   this("", stdErr);
+   }
+
+   public TaskOutputWriter(final String sinkIdentifier, final boolean 
stdErr) {
+   this.target = stdErr;
+   this.sinkIdentifier = (sinkIdentifier == null ? "" : 
sinkIdentifier);
+   }
+
+   public void open(int taskNumber, int numTasks) {
 
 Review comment:
   ignore the `OutputFormat#open` method,


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582169#comment-16582169
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210513770
 
 

 ##
 File path: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ##
 @@ -63,7 +63,7 @@ private static Path findUtilsModule() {
 
@Test
public void testProgram() throws Exception {
-   Path testEntryPoint = new Path(getBaseTestPythonDir(), 
"examples/word_count.py");
+   Path testEntryPoint = new Path(getBaseTestPythonDir(), 
"run_all_tests.py");
 
 Review comment:
   About this change, you can get more detail in PR : #6475 , it's a issue, we 
can merge it in this PR or not, so I split it into a single commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582158#comment-16582158
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210511275
 
 

 ##
 File path: 
flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_output_with_sink_identifier.py
 ##
 @@ -0,0 +1,58 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
+from org.apache.flink.api.java.functions import KeySelector
+from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
+
+from utils import constants
+
+
+class Tokenizer(FlatMapFunction):
+def flatMap(self, value, collector):
+collector.collect((1, value))
+
+
+class Sum(ReduceFunction):
+def reduce(self, input1, input2):
+count1, val1 = input1
+count2, val2 = input2
+return (count1 + count2, val1)
+
+
+class Selector(KeySelector):
+def getKey(self, input):
+return input[1]
+
+
+class Main:
+def run(self, flink):
+elements = ["aa" if iii % 2 == 0 else "bbb" for iii in 
range(constants.NUM_ELEMENTS_IN_TEST)]
+
+env = flink.get_execution_environment()
+env.from_collection(elements) \
+.flat_map(Tokenizer()) \
+.key_by(Selector()) \
+.time_window(milliseconds(10)) \
+.reduce(Sum()) \
+.output("testOutputWithIdentifier")
+
+env.execute()
 
 Review comment:
   This is first time I'm looking at our python tests, so take my comment with 
a grain of salt.
   
   Isn't this test (and other python streaming tests) supposed to assert on 
something? Aren't they now testing only that python api doesn't throw anything 
but it would still pass with empty methods? @zentol  ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582159#comment-16582159
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210509687
 
 

 ##
 File path: 
flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java
 ##
 @@ -63,7 +63,7 @@ private static Path findUtilsModule() {
 
@Test
public void testProgram() throws Exception {
-   Path testEntryPoint = new Path(getBaseTestPythonDir(), 
"examples/word_count.py");
+   Path testEntryPoint = new Path(getBaseTestPythonDir(), 
"run_all_tests.py");
 
 Review comment:
   Is this change relevant? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581279#comment-16581279
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-413245077
 
 
   cc @pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581201#comment-16581201
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210299902
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Task output writer for DataStream and DataSet print API.
+ */
+public class TaskOutputWriter implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final boolean STD_OUT = false;
+   private static final boolean STD_ERR = true;
+
+   private final boolean target;
+   private transient PrintStream stream;
+   private final String sinkIdentifier;
+   private transient String completedPrefix;
+
+   public TaskOutputWriter() {
+   this("", STD_OUT);
+   }
+
+   public TaskOutputWriter(final boolean stdErr) {
+   this("", stdErr);
+   }
+
+   public TaskOutputWriter(final String sinkIdentifier, final boolean 
stdErr) {
+   this.target = stdErr;
+   this.sinkIdentifier = (sinkIdentifier == null ? "" : 
sinkIdentifier);
+   }
+
+   public void open(int taskNumber, int numTasks) {
 
 Review comment:
   This suggestion is good, shall we rename the arguments of the interface 
method `OutputFormat#open`, or can we rename them in another issue? or let it 
go?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581031#comment-16581031
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

zentol commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210251656
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Task output writer for DataStream and DataSet print API.
+ */
+public class TaskOutputWriter implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final boolean STD_OUT = false;
+   private static final boolean STD_ERR = true;
+
+   private final boolean target;
+   private transient PrintStream stream;
+   private final String sinkIdentifier;
+   private transient String completedPrefix;
+
+   public TaskOutputWriter() {
+   this("", STD_OUT);
+   }
+
+   public TaskOutputWriter(final boolean stdErr) {
+   this("", stdErr);
+   }
+
+   public TaskOutputWriter(final String sinkIdentifier, final boolean 
stdErr) {
 
 Review comment:
   ~~This class could be simplified by simply passing in the `OutputStream`.~~ 
I guess this would only work if we'd create this object after deployment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581013#comment-16581013
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210253268
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockRuntimeContext.java
 ##
 @@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import java.util.Collections;
+
+/**
+ * Simple test runtime context for stream operators.
+ */
+public class MockRuntimeContext extends StreamingRuntimeContext {
 
 Review comment:
   This is based on your previous suggestion. Before your PR was merged, I 
forgot to delete it. Sorry, it will be deleted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581007#comment-16581007
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

zentol commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210251882
 
 

 ##
 File path: 
flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java
 ##
 @@ -144,6 +144,14 @@ public void output() {
stream.print();
}
 
+   /**
+* A thin wrapper layer over {@link DataStream#print(String)}.
+*/
+   @PublicEvolving
+   public void output(String sinkIdentifier) {
+   stream.print(sinkIdentifier);
 
 Review comment:
   this is not tested


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581008#comment-16581008
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

zentol commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210251440
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Task output writer for DataStream and DataSet print API.
+ */
+public class TaskOutputWriter implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final boolean STD_OUT = false;
+   private static final boolean STD_ERR = true;
+
+   private final boolean target;
+   private transient PrintStream stream;
+   private final String sinkIdentifier;
+   private transient String completedPrefix;
+
+   public TaskOutputWriter() {
+   this("", STD_OUT);
+   }
+
+   public TaskOutputWriter(final boolean stdErr) {
+   this("", stdErr);
+   }
+
+   public TaskOutputWriter(final String sinkIdentifier, final boolean 
stdErr) {
+   this.target = stdErr;
+   this.sinkIdentifier = (sinkIdentifier == null ? "" : 
sinkIdentifier);
+   }
+
+   public void open(int taskNumber, int numTasks) {
 
 Review comment:
   rename arguments to `subtaskIndex` and `numParallelSubtasks`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581009#comment-16581009
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

zentol commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210251656
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Task output writer for DataStream and DataSet print API.
+ */
+public class TaskOutputWriter implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final boolean STD_OUT = false;
+   private static final boolean STD_ERR = true;
+
+   private final boolean target;
+   private transient PrintStream stream;
+   private final String sinkIdentifier;
+   private transient String completedPrefix;
+
+   public TaskOutputWriter() {
+   this("", STD_OUT);
+   }
+
+   public TaskOutputWriter(final boolean stdErr) {
+   this("", stdErr);
+   }
+
+   public TaskOutputWriter(final String sinkIdentifier, final boolean 
stdErr) {
 
 Review comment:
   This class could be simplified by simply passing in the `OutputStream`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581010#comment-16581010
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

zentol commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210252035
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -18,81 +18,74 @@
 package org.apache.flink.streaming.api.functions.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.util.TaskOutputWriter;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-import java.io.PrintStream;
-
 /**
  * Implementation of the SinkFunction writing every tuple to the standard
  * output or standard error stream.
  *
- * @param 
- *Input record type
+ * 
+ * Four possible format options:
+ * {@code sinkIdentifier}:taskId> output  <- {@code sinkIdentifier} 
provided, parallelism > 1
+ * {@code sinkIdentifier}> output <- {@code sinkIdentifier} 
provided, parallelism == 1
+ *  taskId> output<- no {@code 
sinkIdentifier} provided, parallelism > 1
+ *  output<- no {@code 
sinkIdentifier} provided, parallelism == 1
+ * 
+ *
+ * @param  Input record type
  */
 @PublicEvolving
 public class PrintSinkFunction extends RichSinkFunction {
-   private static final long serialVersionUID = 1L;
 
-   private static final boolean STD_OUT = false;
-   private static final boolean STD_ERR = true;
+   private static final long serialVersionUID = 1L;
 
-   private boolean target;
-   private transient PrintStream stream;
-   private transient String prefix;
+   private final TaskOutputWriter writer;
 
/**
 * Instantiates a print sink function that prints to standard out.
 */
-   public PrintSinkFunction() {}
+   public PrintSinkFunction() {
+   writer = new TaskOutputWriter<>();
 
 Review comment:
   call the other constructor instead


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581011#comment-16581011
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

zentol commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210251434
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Task output writer for DataStream and DataSet print API.
+ */
+public class TaskOutputWriter implements Serializable {
 
 Review comment:
   can we give this a different name, like `PrintSinkOutputWriter`? The current 
name is a bit too general imo.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581006#comment-16581006
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

zentol commented on a change in pull request #6367: [FLINK-9850] Add a string 
to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210252151
 
 

 ##
 File path: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ##
 @@ -959,6 +959,29 @@ class DataStream[T](stream: JavaStream[T]) {
   @PublicEvolving
   def printToErr() = stream.printToErr()
 
+  /**
+* Writes a DataStream to the standard output stream (stdout). For each
+* element of the DataStream the result of [[AnyRef.toString()]] is
+* written.
+*
+* @param sinkIdentifier The string to prefix the output with.
+* @return The closed DataStream.
+*/
+  @PublicEvolving
+  def print(sinkIdentifier: String): DataStreamSink[T] = 
stream.print(sinkIdentifier)
 
 Review comment:
   these are not tested


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580959#comment-16580959
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210240596
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link PrintingOutputFormat}.
+ */
+public class PrintingOutputFormatTest {
+
+   private final PrintStream originalSystemOut = System.out;
+   private final PrintStream originalSystemErr = System.err;
+
+   private final ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+   private final ByteArrayOutputStream arrayErrorStream = new 
ByteArrayOutputStream();
+
+   private final String line = System.lineSeparator();
+
+   @Before
+   public void setUp() {
+   System.setOut(new PrintStream(arrayOutputStream));
+   System.setErr(new PrintStream(arrayErrorStream));
+   }
+
+   @After
+   public void tearDown() {
+   if (System.out != originalSystemOut) {
+   System.out.close();
+   }
+   if (System.err != originalSystemErr) {
+   System.err.close();
+   }
+   System.setOut(originalSystemOut);
+   System.setErr(originalSystemErr);
+   }
+
+   @Test
+   public void testPrintOutputFormatStdOut() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatStdErr() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>(true);
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayErrorStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithPrefix() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithIdentifierAndPrefix() throws 
Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>("mySink", false);
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintingOutputFormat<>("mySink", true);
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
 
 Review comment:
   In those 3 lines: were you trying to test for stderr? If so, you haven't 
invoked anything on stderr instance of `printSink` and in this line, you 

[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580961#comment-16580961
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210239256
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -18,81 +18,74 @@
 package org.apache.flink.streaming.api.functions.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.util.TaskOutputWriter;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-import java.io.PrintStream;
-
 /**
  * Implementation of the SinkFunction writing every tuple to the standard
  * output or standard error stream.
  *
- * @param 
- *Input record type
+ * 
+ * Four possible format options:
+ * {@code sinkIdentifier}:taskId> output  <- {@code sinkIdentifier} 
provided, parallelism > 1
+ * {@code sinkIdentifier}> output <- {@code sinkIdentifier} 
provided, parallelism == 1
+ *  taskId> output<- no {@code 
sinkIdentifier} provided, parallelism > 1
+ *  output<- no {@code 
sinkIdentifier} provided, parallelism == 1
+ * 
+ *
+ * @param  Input record type
  */
 @PublicEvolving
 public class PrintSinkFunction extends RichSinkFunction {
-   private static final long serialVersionUID = 1L;
 
-   private static final boolean STD_OUT = false;
-   private static final boolean STD_ERR = true;
+   private static final long serialVersionUID = 1L;
 
-   private boolean target;
-   private transient PrintStream stream;
-   private transient String prefix;
+   private final TaskOutputWriter writer;
 
/**
 * Instantiates a print sink function that prints to standard out.
 */
-   public PrintSinkFunction() {}
+   public PrintSinkFunction() {
+   writer = new TaskOutputWriter<>();
+   }
 
/**
 * Instantiates a print sink function that prints to standard out.
 *
 * @param stdErr True, if the format should print to standard error 
instead of standard out.
 */
-   public PrintSinkFunction(boolean stdErr) {
-   target = stdErr;
-   }
-
-   public void setTargetToStandardOut() {
-   target = STD_OUT;
+   public PrintSinkFunction(final boolean stdErr) {
+   writer = new TaskOutputWriter<>(stdErr);
}
 
-   public void setTargetToStandardErr() {
-   target = STD_ERR;
+   /**
+* Instantiates a print sink function that prints to standard out and 
gives a sink identifier.
+*
+* @param stdErr True, if the format should print to standard error 
instead of standard out.
+* @param sinkIdentifier Message that identify sink and is prefixed to 
the output of the value
+*/
+   public PrintSinkFunction(final String sinkIdentifier, final boolean 
stdErr) {
+   writer = new TaskOutputWriter<>(sinkIdentifier, stdErr);
}
 
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
-   // get the target stream
-   stream = target == STD_OUT ? System.out : System.err;
+   int taskNumber = context.getIndexOfThisSubtask();
 
 Review comment:
   nit: inline those `taskNumber` and `numTasks` variables?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this 

[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580949#comment-16580949
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210239256
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -18,81 +18,74 @@
 package org.apache.flink.streaming.api.functions.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.util.TaskOutputWriter;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-import java.io.PrintStream;
-
 /**
  * Implementation of the SinkFunction writing every tuple to the standard
  * output or standard error stream.
  *
- * @param 
- *Input record type
+ * 
+ * Four possible format options:
+ * {@code sinkIdentifier}:taskId> output  <- {@code sinkIdentifier} 
provided, parallelism > 1
+ * {@code sinkIdentifier}> output <- {@code sinkIdentifier} 
provided, parallelism == 1
+ *  taskId> output<- no {@code 
sinkIdentifier} provided, parallelism > 1
+ *  output<- no {@code 
sinkIdentifier} provided, parallelism == 1
+ * 
+ *
+ * @param  Input record type
  */
 @PublicEvolving
 public class PrintSinkFunction extends RichSinkFunction {
-   private static final long serialVersionUID = 1L;
 
-   private static final boolean STD_OUT = false;
-   private static final boolean STD_ERR = true;
+   private static final long serialVersionUID = 1L;
 
-   private boolean target;
-   private transient PrintStream stream;
-   private transient String prefix;
+   private final TaskOutputWriter writer;
 
/**
 * Instantiates a print sink function that prints to standard out.
 */
-   public PrintSinkFunction() {}
+   public PrintSinkFunction() {
+   writer = new TaskOutputWriter<>();
+   }
 
/**
 * Instantiates a print sink function that prints to standard out.
 *
 * @param stdErr True, if the format should print to standard error 
instead of standard out.
 */
-   public PrintSinkFunction(boolean stdErr) {
-   target = stdErr;
-   }
-
-   public void setTargetToStandardOut() {
-   target = STD_OUT;
+   public PrintSinkFunction(final boolean stdErr) {
+   writer = new TaskOutputWriter<>(stdErr);
}
 
-   public void setTargetToStandardErr() {
-   target = STD_ERR;
+   /**
+* Instantiates a print sink function that prints to standard out and 
gives a sink identifier.
+*
+* @param stdErr True, if the format should print to standard error 
instead of standard out.
+* @param sinkIdentifier Message that identify sink and is prefixed to 
the output of the value
+*/
+   public PrintSinkFunction(final String sinkIdentifier, final boolean 
stdErr) {
+   writer = new TaskOutputWriter<>(sinkIdentifier, stdErr);
}
 
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) 
getRuntimeContext();
-   // get the target stream
-   stream = target == STD_OUT ? System.out : System.err;
+   int taskNumber = context.getIndexOfThisSubtask();
 
 Review comment:
   nit: inline those variables?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}




[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580958#comment-16580958
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210241537
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockRuntimeContext.java
 ##
 @@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import java.util.Collections;
+
+/**
+ * Simple test runtime context for stream operators.
+ */
+public class MockRuntimeContext extends StreamingRuntimeContext {
 
 Review comment:
   Why did you add this class? I have already extracted the same thing on the 
master branch to `MockStreamingRuntimeContext`. I'm guessing some rebase error?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580957#comment-16580957
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210242096
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Task output writer for DataStream and DataSet print API.
+ */
+public class TaskOutputWriter implements Serializable {
 
 Review comment:
   add `@Internal`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580954#comment-16580954
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210239166
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link PrintingOutputFormat}.
+ */
+public class PrintingOutputFormatTest {
 
 Review comment:
   +1 for adding those tests!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580951#comment-16580951
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210240642
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link PrintingOutputFormat}.
+ */
+public class PrintingOutputFormatTest {
+
+   private final PrintStream originalSystemOut = System.out;
+   private final PrintStream originalSystemErr = System.err;
+
+   private final ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+   private final ByteArrayOutputStream arrayErrorStream = new 
ByteArrayOutputStream();
+
+   private final String line = System.lineSeparator();
+
+   @Before
+   public void setUp() {
+   System.setOut(new PrintStream(arrayOutputStream));
+   System.setErr(new PrintStream(arrayErrorStream));
+   }
+
+   @After
+   public void tearDown() {
+   if (System.out != originalSystemOut) {
+   System.out.close();
+   }
+   if (System.err != originalSystemErr) {
+   System.err.close();
+   }
+   System.setOut(originalSystemOut);
+   System.setErr(originalSystemErr);
+   }
+
+   @Test
+   public void testPrintOutputFormatStdOut() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatStdErr() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>(true);
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayErrorStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithPrefix() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithIdentifierAndPrefix() throws 
Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>("mySink", false);
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintingOutputFormat<>("mySink", true);
 
 Review comment:
   nit: you haven't closed the previous sink. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please 

[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580956#comment-16580956
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210240596
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link PrintingOutputFormat}.
+ */
+public class PrintingOutputFormatTest {
+
+   private final PrintStream originalSystemOut = System.out;
+   private final PrintStream originalSystemErr = System.err;
+
+   private final ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+   private final ByteArrayOutputStream arrayErrorStream = new 
ByteArrayOutputStream();
+
+   private final String line = System.lineSeparator();
+
+   @Before
+   public void setUp() {
+   System.setOut(new PrintStream(arrayOutputStream));
+   System.setErr(new PrintStream(arrayErrorStream));
+   }
+
+   @After
+   public void tearDown() {
+   if (System.out != originalSystemOut) {
+   System.out.close();
+   }
+   if (System.err != originalSystemErr) {
+   System.err.close();
+   }
+   System.setOut(originalSystemOut);
+   System.setErr(originalSystemErr);
+   }
+
+   @Test
+   public void testPrintOutputFormatStdOut() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatStdErr() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>(true);
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayErrorStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithPrefix() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithIdentifierAndPrefix() throws 
Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>("mySink", false);
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintingOutputFormat<>("mySink", true);
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
 
 Review comment:
   Were you trying in those 3 lines test for stderr? If so, you haven't invoked 
anything on stderr instance of `printSink` and in this line, you are 

[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580950#comment-16580950
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210238813
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+
+/**
+ * Task output writer for DataStream and DataSet print API.
+ */
+public class TaskOutputWriter implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final boolean STD_OUT = false;
+   private static final boolean STD_ERR = true;
+
+   private final boolean target;
+   private transient PrintStream stream;
+   private final String sinkIdentifier;
+   private transient String completedPrefix;
+
+   public TaskOutputWriter() {
+   this("", STD_OUT);
+   }
+
+   public TaskOutputWriter(final boolean stdErr) {
+   this("", stdErr);
+   }
+
+   public TaskOutputWriter(final String sinkIdentifier, final boolean 
stdErr) {
+   this.target = stdErr;
+   this.sinkIdentifier = (sinkIdentifier == null ? "" : 
sinkIdentifier);
+   }
+
+   public void open(int taskNumber, int numTasks) {
+   // get the target stream
+   stream = target == STD_OUT ? System.out : System.err;
+
+   completedPrefix = sinkIdentifier;
+
+   if (numTasks > 1) {
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += ":";
+   }
+   completedPrefix += (taskNumber + 1);
+   }
+
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += "> ";
+   }
+   }
+
+   public void write(IN record) {
+   stream.println(completedPrefix + record.toString());
+   }
+
+   public void close() {
 
 Review comment:
   Drop the close? It seems like a dead empty method. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580952#comment-16580952
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210240728
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link PrintingOutputFormat}.
+ */
+public class PrintingOutputFormatTest {
+
+   private final PrintStream originalSystemOut = System.out;
+   private final PrintStream originalSystemErr = System.err;
+
+   private final ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+   private final ByteArrayOutputStream arrayErrorStream = new 
ByteArrayOutputStream();
+
+   private final String line = System.lineSeparator();
+
+   @Before
+   public void setUp() {
+   System.setOut(new PrintStream(arrayOutputStream));
+   System.setErr(new PrintStream(arrayErrorStream));
+   }
+
+   @After
+   public void tearDown() {
+   if (System.out != originalSystemOut) {
+   System.out.close();
+   }
+   if (System.err != originalSystemErr) {
+   System.err.close();
+   }
+   System.setOut(originalSystemOut);
+   System.setErr(originalSystemErr);
+   }
+
+   @Test
+   public void testPrintOutputFormatStdOut() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatStdErr() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>(true);
+   printSink.open(0, 1);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("hello world!" + line, 
arrayErrorStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithPrefix() throws Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>();
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithIdentifierAndPrefix() throws 
Exception {
+   PrintingOutputFormat printSink = new 
PrintingOutputFormat<>("mySink", false);
+   printSink.open(1, 2);
+
+   printSink.writeRecord("hello world!");
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintingOutputFormat<>("mySink", true);
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintOutputFormatWithIdentifierButNoPrefix() throws 
Exception {
+   

[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580953#comment-16580953
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210240875
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -103,4 +102,41 @@ public void testPrintSinkWithPrefix() throws Exception {
assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
printSink.close();
}
+
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>("mySink", false);
+   printSink.setRuntimeContext(new 
MockStreamingRuntimeContext(false, 2, 1));
+   printSink.open(new Configuration());
+
+   printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintSinkFunction<>("mySink", true);
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink.close();
+   }
+
+   @Test
+   public void testPrintSinkWithIdentifierButNoPrefix() throws Exception {
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>("mySink", false);
+   printSink.setRuntimeContext(new MockRuntimeContext(false, 1, 
0));
+   printSink.open(new Configuration());
+
+   printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintSinkFunction<>("mySink", true);
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink> hello world!" + line, 
arrayOutputStream.toString());
 
 Review comment:
   ditto regarding testing stderr


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580955#comment-16580955
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r210240862
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -103,4 +102,41 @@ public void testPrintSinkWithPrefix() throws Exception {
assertEquals("2> hello world!" + line, 
arrayOutputStream.toString());
printSink.close();
}
+
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>("mySink", false);
+   printSink.setRuntimeContext(new 
MockStreamingRuntimeContext(false, 2, 1));
+   printSink.open(new Configuration());
+
+   printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
+
+   printSink = new PrintSinkFunction<>("mySink", true);
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, 
arrayOutputStream.toString());
 
 Review comment:
   ditto regarding testing stderr


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580001#comment-16580001
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412926404
 
 
   cc @pnowojski updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579783#comment-16579783
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski edited a comment on issue #6367: [FLINK-9850] Add a string to the 
print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412868964
 
 
   Yes exactly. More precisely as I wrote in one of the previous comments, this 
`TaskOutputWriter` (`TaskStandardOutputWriter`?) would deduplicate logic from 
those methods:
   
   - open
   - writeRecord/invoke
   - toString
   - close (those probably should be dropped because of NPE.)
   
   Because their logic is identical between `PrintSinkFunction` and 
`PrintingOutputFormat` while their implementation is already inconsistent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579782#comment-16579782
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print 
method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412868964
 
 
   Yes exactly. More precisely as I wrote in one of the previous comments, this 
`TaskOutputWriter` (`TaskStandardOutputWriter`?) would deduplicate logic from 
those methods:
   
   - open
   - writeRecord/invoke
   - toString
   - close (those probably should be dropped because of NPE.)
   
   Because their logic is identical between `PrintSinkFunction` and 
`PrintingOutputFormat` while their implementation is already inconsistent. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579732#comment-16579732
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412854540
 
 
   @pnowojski fixed some issues you just mentioned, will refactor 
`PrintSinkFunction ` and `PrintingOutputFormat `, you mean I should create a 
new class named `TaskOutputWriter ` and extract the same logic code?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579662#comment-16579662
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209911856
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -70,27 +84,30 @@ public void open(Configuration parameters) throws 
Exception {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
 
-   // set the prefix if we have a >1 parallelism
-   prefix = (context.getNumberOfParallelSubtasks() > 1) ?
-   ((context.getIndexOfThisSubtask() + 1) + "> ") 
: null;
+   completedPrefix = sinkIdentifier;
+
+   if (context.getNumberOfParallelSubtasks() > 1) {
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += ":";
+   }
+   completedPrefix += (context.getIndexOfThisSubtask() + 
1);
+   }
+
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += "> ";
+   }
}
 
@Override
public void invoke(IN record) {
-   if (prefix != null) {
-   stream.println(prefix + record.toString());
+   if (completedPrefix != null) {
 
 Review comment:
   This check is not valid anymore, this field is never null. It can either be 
dropped altogether or replaced by empty string check. Probably it's better to 
drop it. Functionally both will be the same and empty string check's 
performance benefit (if any) doesn't matter while printing to STDOUT/STDERR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579655#comment-16579655
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209911856
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -70,27 +84,30 @@ public void open(Configuration parameters) throws 
Exception {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
 
-   // set the prefix if we have a >1 parallelism
-   prefix = (context.getNumberOfParallelSubtasks() > 1) ?
-   ((context.getIndexOfThisSubtask() + 1) + "> ") 
: null;
+   completedPrefix = sinkIdentifier;
+
+   if (context.getNumberOfParallelSubtasks() > 1) {
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += ":";
+   }
+   completedPrefix += (context.getIndexOfThisSubtask() + 
1);
+   }
+
+   if (!completedPrefix.isEmpty()) {
+   completedPrefix += "> ";
+   }
}
 
@Override
public void invoke(IN record) {
-   if (prefix != null) {
-   stream.println(prefix + record.toString());
+   if (completedPrefix != null) {
 
 Review comment:
   This check is not valid anymore, this field is never null.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576573#comment-16576573
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412144209
 
 
   Hi @pnowojski thanks for your suggestion, I have refactored this PR, we both 
changed the `PrintSinkFunctionTest` so maybe I did not finish all of the works. 
What's more, I agree with the opinion about "deduplicate the logic of 
PrintSinkFunction and PrintingOutputFormat",  I'd like to start this work, if 
current issue would be fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576310#comment-16576310
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209260792
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception {
stream.close();
}
 
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   PrintStream stream = new PrintStream(baos);
+   System.setOut(stream);
+
+   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
+   Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2);
+   Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1);
+
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>(false, "mySink");
+   printSink.setRuntimeContext(ctx);
+   try {
+   printSink.open(new Configuration());
+   } catch (Exception e) {
+   Assert.fail();
+   }
+   printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, baos.toString());
+
+   printSink.setTargetToStandardErr();
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, baos.toString());
+
+   printSink.close();
+   stream.close();
+   }
+
+   @Test
+   public void testPrintSinkWithIdentifierButNoPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
 Review comment:
   There is quite a lot of code duplication in those tests and they were 
unnecessarily using mockito instead of proper mock. Also there was even a bug 
in `testPrintSinkStdErr`. I have fixed those issues in a hotifx: 
https://github.com/apache/flink/pull/6538
   
   I would the hotifx to be merged before this PR and please adapt/rewrite your 
test in similar fashion as I did in my hotfix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576314#comment-16576314
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209234301
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -55,6 +57,17 @@ public PrintSinkFunction(boolean stdErr) {
target = stdErr;
}
 
+   /**
+* Instantiates a print sink function that prints to standard out and 
gives a sink identifier.
+*
+* @param stdErr True, if the format should print to standard error 
instead of standard out.
+* @param sinkIdentifier Message that identify sink and is prefixed to 
the output of the value
+*/
+   public PrintSinkFunction(boolean stdErr, String sinkIdentifier) {
+   this(stdErr);
+   this.sinkIdentifier = sinkIdentifier;
 
 Review comment:
   Usually less detailed constructor is calling the more specific ones, not the 
other way around. Here it will allow you to mark fields as `final`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576311#comment-16576311
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209234901
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws 
Exception {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
 
+   /**
 
 Review comment:
   please move this to class's java doc. Also replace `sinkId`, `sink id` with 
`{@code sinkIdentifier} `.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576308#comment-16576308
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209237840
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws 
Exception {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
 
+   /**
+* Four possible format options:
+*  sinkId:taskId> output  <- sink id provided, parallelism 
> 1
+*  sinkId> output <- sink id provided, parallelism 
== 1
+*  taskId> output <- no sink id provided, 
parallelism > 1
+*  output <- no sink id provided, 
parallelism == 1
+*/
+
// set the prefix if we have a >1 parallelism
prefix = (context.getNumberOfParallelSubtasks() > 1) ?
((context.getIndexOfThisSubtask() + 1) + "> ") 
: null;
+
+   if (prefix == null) {
 
 Review comment:
   Don't use nulls here. In this case we can easily use empty string for the 
same purpose and it will be safer (no possible NPE).
   
   Btw, rephrasing this logic like that:
   ```
   completedPrefix = sinkIdentifier;
   
   if (context.getNumberOfParallelSubtasks() > 1)) {
 if (!completedPrefix.isEmpty()) {
   completedPrefix += ":";
 }
 completedPrefix += (context.getIndexOfThisSubtask() + 1);
   }
   
   if (!completedPrefix.isEmpty()) {
 completedPrefix += ">";
   }
   ```
   (optionally with ternary operator instead of some 'if' statements - that's 
only a matter of taste) simplifies the logic and deduplicate some of the 
code/constants.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576309#comment-16576309
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209233947
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -40,6 +40,8 @@
private boolean target;
private transient PrintStream stream;
private transient String prefix;
+   private String sinkIdentifier;
+   private transient String completedPrefix;
 
 Review comment:
   Please drop `prefix` field, it's only a local variable now. Also please 
change `target` and `sinkIdentifier` into `final` fields.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576313#comment-16576313
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209239877
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception {
stream.close();
}
 
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   PrintStream stream = new PrintStream(baos);
+   System.setOut(stream);
+
+   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
+   Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2);
+   Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1);
+
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>(false, "mySink");
+   printSink.setRuntimeContext(ctx);
+   try {
+   printSink.open(new Configuration());
+   } catch (Exception e) {
+   Assert.fail();
 
 Review comment:
   Do not hide the original exception. From what I've heard there was some bug 
with old junit version and that's why this pattern is reoccurring in Flink 
tests. Regardless if that was the case, it's not the problem anymore, and 
hiding original exception makes test failures harder to read/understand. (ditto 
in rest of the file as well).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576312#comment-16576312
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209238315
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -89,6 +116,8 @@ public void invoke(IN record) {
public void close() {
this.stream = null;
this.prefix = null;
+   this.sinkIdentifier = null;
+   this.completedPrefix = null;
 
 Review comment:
   Please drop the this `close` method. It doesn't do anything beside causing 
`NPE`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576307#comment-16576307
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209239178
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception {
stream.close();
}
 
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   PrintStream stream = new PrintStream(baos);
+   System.setOut(stream);
+
+   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
 
 Review comment:
   Do not use mockito for such things, it's very difficult to maintain such 
tests in the future. Instead please move 
`org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseTest.MockRuntimeContext`
 class to `flink-streaming-java` module to some utility package and reuse it in 
this whole `PrintSinkFunctionTest`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575041#comment-16575041
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-411806291
 
 
   cc @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-07-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16563408#comment-16563408
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-409163881
 
 
   cc @tillrohrmann and @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554399#comment-16554399
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6367
  
cc @dawidwys and @pnowojski 


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-07-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16549354#comment-16549354
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

Github user tison1 commented on the issue:

https://github.com/apache/flink/pull/6367
  
@yanghua also +1

this is a net win.


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-07-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16549334#comment-16549334
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6367
  
@yanghua Thanks for your update. +1 to merge 


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-07-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16549001#comment-16549001
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

Github user tison1 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6367#discussion_r203644208
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ---
@@ -40,6 +40,8 @@
private boolean target;
private transient PrintStream stream;
private transient String prefix;
+   private String sinkIdentifier;
+   private String completedPrefix;
--- End diff --

if `prefix` is `transient`, why not `completedPrefix`?


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-07-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548837#comment-16548837
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6367
  
@hequn8128  thanks, I have added some test case~


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-07-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548718#comment-16548718
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6367#discussion_r203588835
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -959,6 +959,29 @@ class DataStream[T](stream: JavaStream[T]) {
   @PublicEvolving
   def printToErr() = stream.printToErr()
 
+  /**
+* Writes a DataStream to the standard output stream (stdout). For each
+* element of the DataStream the result of .toString is
--- End diff --

.toString => [[AnyRef.toString()]]


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-07-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548004#comment-16548004
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6367
  
@tillrohrmann and @zentol I see the Python DataStream API methods do not 
match DataStream Java API methods (missed some API methods), Shall we add those 
missed API into `PythonDataStream`? If yes, I'd like to do this.


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-07-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16547965#comment-16547965
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6367

[FLINK-9850] Add a string to the print method to identify output for 
DataStream

## What is the purpose of the change

*This pull request adds a string to the print method to identify output for 
DataStream*


## Brief change log

  - *add print(string) / printToErr(string) to DataStream Java API*
  - *add print(string) / printToErr(string) to DataStream Scala API*
  - *add print(string) to DataStream Python API*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / **docs** / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9850

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6367.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6367


commit 80215cd12618392ab0909a431863939d3353ca16
Author: yanghua 
Date:   2018-07-18T15:20:11Z

[FLINK-9850] Add a string to the print method to identify output for 
DataStream




> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)