[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16583552#comment-16583552 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski closed pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java new file mode 100644 index 000..de058b69f1f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/PrintSinkOutputWriter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import org.apache.flink.annotation.Internal; + +import java.io.PrintStream; +import java.io.Serializable; + +/** + * Print sink output writer for DataStream and DataSet print API. + */ +@Internal +public class PrintSinkOutputWriter implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final boolean STD_OUT = false; + private static final boolean STD_ERR = true; + + private final boolean target; + private transient PrintStream stream; + private final String sinkIdentifier; + private transient String completedPrefix; + + public PrintSinkOutputWriter() { + this("", STD_OUT); + } + + public PrintSinkOutputWriter(final boolean stdErr) { + this("", stdErr); + } + + public PrintSinkOutputWriter(final String sinkIdentifier, final boolean stdErr) { + this.target = stdErr; + this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier); + } + + public void open(int subtaskIndex, int numParallelSubtasks) { + // get the target stream + stream = target == STD_OUT ? System.out : System.err; + + completedPrefix = sinkIdentifier; + + if (numParallelSubtasks > 1) { + if (!completedPrefix.isEmpty()) { + completedPrefix += ":"; + } + completedPrefix += (subtaskIndex + 1); + } + + if (!completedPrefix.isEmpty()) { + completedPrefix += "> "; + } + } + + public void write(IN record) { + stream.println(completedPrefix + record.toString()); + } + + @Override + public String toString() { + return "Print to " + (target == STD_OUT ? "System.out" : "System.err"); + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java index 0ab1abb2efb..62eabd0b739 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java @@ -19,45 +19,46 @@ package org.apache.flink.api.java.io; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; -import java.io.PrintStream; - /** * Output format that prints results into either stdout or stderr. - * @param + * + * + * Four possible format options: + * {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1 + * {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1 + * taskId> output
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16583550#comment-16583550 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-413787139 Thanks for the contribution! @zentol told me his LGTM so merging now :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582258#comment-16582258 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-413483130 @pnowojski Python-related changes have been removed, only changes to Java and Scala This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582239#comment-16582239 ] ASF GitHub Bot commented on FLINK-9850: --- zentol commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-413476883 yes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582205#comment-16582205 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-413470612 @zentol Do you mean that we don't change the python API here and remove the tests for the Python API? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582191#comment-16582191 ] ASF GitHub Bot commented on FLINK-9850: --- zentol commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-413468227 Personally I would prefer if we'd not change the python API in this PR, because we're now in the exact position we don't want to be in: where the merging of the PR is delayed due to another effectively unrelated problem. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582184#comment-16582184 ] ASF GitHub Bot commented on FLINK-9850: --- zentol commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210515127 ## File path: flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java ## @@ -63,7 +63,7 @@ private static Path findUtilsModule() { @Test public void testProgram() throws Exception { - Path testEntryPoint = new Path(getBaseTestPythonDir(), "examples/word_count.py"); + Path testEntryPoint = new Path(getBaseTestPythonDir(), "run_all_tests.py"); Review comment: Basically this change actually enables most python tests, which i accidentally disabled months ago. So yeah, to enable more tests we do need this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582186#comment-16582186 ] ASF GitHub Bot commented on FLINK-9850: --- zentol commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210514892 ## File path: flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_output_with_sink_identifier.py ## @@ -0,0 +1,58 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants + + +class Tokenizer(FlatMapFunction): +def flatMap(self, value, collector): +collector.collect((1, value)) + + +class Sum(ReduceFunction): +def reduce(self, input1, input2): +count1, val1 = input1 +count2, val2 = input2 +return (count1 + count2, val1) + + +class Selector(KeySelector): +def getKey(self, input): +return input[1] + + +class Main: +def run(self, flink): +elements = ["aa" if iii % 2 == 0 else "bbb" for iii in range(constants.NUM_ELEMENTS_IN_TEST)] + +env = flink.get_execution_environment() +env.from_collection(elements) \ +.flat_map(Tokenizer()) \ +.key_by(Selector()) \ +.time_window(milliseconds(10)) \ +.reduce(Sum()) \ +.output("testOutputWithIdentifier") + +env.execute() Review comment: yes this is a common issue with the tests, but fixing this isn't too high on my priority list, given that most methods are just striaght-forward wrappers around java methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582181#comment-16582181 ] ASF GitHub Bot commented on FLINK-9850: --- zentol commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210514892 ## File path: flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_output_with_sink_identifier.py ## @@ -0,0 +1,58 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants + + +class Tokenizer(FlatMapFunction): +def flatMap(self, value, collector): +collector.collect((1, value)) + + +class Sum(ReduceFunction): +def reduce(self, input1, input2): +count1, val1 = input1 +count2, val2 = input2 +return (count1 + count2, val1) + + +class Selector(KeySelector): +def getKey(self, input): +return input[1] + + +class Main: +def run(self, flink): +elements = ["aa" if iii % 2 == 0 else "bbb" for iii in range(constants.NUM_ELEMENTS_IN_TEST)] + +env = flink.get_execution_environment() +env.from_collection(elements) \ +.flat_map(Tokenizer()) \ +.key_by(Selector()) \ +.time_window(milliseconds(10)) \ +.reduce(Sum()) \ +.output("testOutputWithIdentifier") + +env.execute() Review comment: yes this is a common issue with the tests, but fixing this isn't too high on my priority list, given that most methods are just wrappers around java methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582171#comment-16582171 ] ASF GitHub Bot commented on FLINK-9850: --- zentol commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210514030 ## File path: flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import java.io.PrintStream; +import java.io.Serializable; + +/** + * Task output writer for DataStream and DataSet print API. + */ +public class TaskOutputWriter implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final boolean STD_OUT = false; + private static final boolean STD_ERR = true; + + private final boolean target; + private transient PrintStream stream; + private final String sinkIdentifier; + private transient String completedPrefix; + + public TaskOutputWriter() { + this("", STD_OUT); + } + + public TaskOutputWriter(final boolean stdErr) { + this("", stdErr); + } + + public TaskOutputWriter(final String sinkIdentifier, final boolean stdErr) { + this.target = stdErr; + this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier); + } + + public void open(int taskNumber, int numTasks) { Review comment: ignore the `OutputFormat#open` method, This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582169#comment-16582169 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210513770 ## File path: flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java ## @@ -63,7 +63,7 @@ private static Path findUtilsModule() { @Test public void testProgram() throws Exception { - Path testEntryPoint = new Path(getBaseTestPythonDir(), "examples/word_count.py"); + Path testEntryPoint = new Path(getBaseTestPythonDir(), "run_all_tests.py"); Review comment: About this change, you can get more detail in PR : #6475 , it's a issue, we can merge it in this PR or not, so I split it into a single commit. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582158#comment-16582158 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210511275 ## File path: flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/test_output_with_sink_identifier.py ## @@ -0,0 +1,58 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + +from utils import constants + + +class Tokenizer(FlatMapFunction): +def flatMap(self, value, collector): +collector.collect((1, value)) + + +class Sum(ReduceFunction): +def reduce(self, input1, input2): +count1, val1 = input1 +count2, val2 = input2 +return (count1 + count2, val1) + + +class Selector(KeySelector): +def getKey(self, input): +return input[1] + + +class Main: +def run(self, flink): +elements = ["aa" if iii % 2 == 0 else "bbb" for iii in range(constants.NUM_ELEMENTS_IN_TEST)] + +env = flink.get_execution_environment() +env.from_collection(elements) \ +.flat_map(Tokenizer()) \ +.key_by(Selector()) \ +.time_window(milliseconds(10)) \ +.reduce(Sum()) \ +.output("testOutputWithIdentifier") + +env.execute() Review comment: This is first time I'm looking at our python tests, so take my comment with a grain of salt. Isn't this test (and other python streaming tests) supposed to assert on something? Aren't they now testing only that python api doesn't throw anything but it would still pass with empty methods? @zentol ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582159#comment-16582159 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210509687 ## File path: flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java ## @@ -63,7 +63,7 @@ private static Path findUtilsModule() { @Test public void testProgram() throws Exception { - Path testEntryPoint = new Path(getBaseTestPythonDir(), "examples/word_count.py"); + Path testEntryPoint = new Path(getBaseTestPythonDir(), "run_all_tests.py"); Review comment: Is this change relevant? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581279#comment-16581279 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-413245077 cc @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581201#comment-16581201 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210299902 ## File path: flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import java.io.PrintStream; +import java.io.Serializable; + +/** + * Task output writer for DataStream and DataSet print API. + */ +public class TaskOutputWriter implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final boolean STD_OUT = false; + private static final boolean STD_ERR = true; + + private final boolean target; + private transient PrintStream stream; + private final String sinkIdentifier; + private transient String completedPrefix; + + public TaskOutputWriter() { + this("", STD_OUT); + } + + public TaskOutputWriter(final boolean stdErr) { + this("", stdErr); + } + + public TaskOutputWriter(final String sinkIdentifier, final boolean stdErr) { + this.target = stdErr; + this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier); + } + + public void open(int taskNumber, int numTasks) { Review comment: This suggestion is good, shall we rename the arguments of the interface method `OutputFormat#open`, or can we rename them in another issue? or let it go? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581031#comment-16581031 ] ASF GitHub Bot commented on FLINK-9850: --- zentol commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210251656 ## File path: flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import java.io.PrintStream; +import java.io.Serializable; + +/** + * Task output writer for DataStream and DataSet print API. + */ +public class TaskOutputWriter implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final boolean STD_OUT = false; + private static final boolean STD_ERR = true; + + private final boolean target; + private transient PrintStream stream; + private final String sinkIdentifier; + private transient String completedPrefix; + + public TaskOutputWriter() { + this("", STD_OUT); + } + + public TaskOutputWriter(final boolean stdErr) { + this("", stdErr); + } + + public TaskOutputWriter(final String sinkIdentifier, final boolean stdErr) { Review comment: ~~This class could be simplified by simply passing in the `OutputStream`.~~ I guess this would only work if we'd create this object after deployment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581013#comment-16581013 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210253268 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockRuntimeContext.java ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import java.util.Collections; + +/** + * Simple test runtime context for stream operators. + */ +public class MockRuntimeContext extends StreamingRuntimeContext { Review comment: This is based on your previous suggestion. Before your PR was merged, I forgot to delete it. Sorry, it will be deleted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581007#comment-16581007 ] ASF GitHub Bot commented on FLINK-9850: --- zentol commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210251882 ## File path: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/datastream/PythonDataStream.java ## @@ -144,6 +144,14 @@ public void output() { stream.print(); } + /** +* A thin wrapper layer over {@link DataStream#print(String)}. +*/ + @PublicEvolving + public void output(String sinkIdentifier) { + stream.print(sinkIdentifier); Review comment: this is not tested This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581008#comment-16581008 ] ASF GitHub Bot commented on FLINK-9850: --- zentol commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210251440 ## File path: flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import java.io.PrintStream; +import java.io.Serializable; + +/** + * Task output writer for DataStream and DataSet print API. + */ +public class TaskOutputWriter implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final boolean STD_OUT = false; + private static final boolean STD_ERR = true; + + private final boolean target; + private transient PrintStream stream; + private final String sinkIdentifier; + private transient String completedPrefix; + + public TaskOutputWriter() { + this("", STD_OUT); + } + + public TaskOutputWriter(final boolean stdErr) { + this("", stdErr); + } + + public TaskOutputWriter(final String sinkIdentifier, final boolean stdErr) { + this.target = stdErr; + this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier); + } + + public void open(int taskNumber, int numTasks) { Review comment: rename arguments to `subtaskIndex` and `numParallelSubtasks` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581009#comment-16581009 ] ASF GitHub Bot commented on FLINK-9850: --- zentol commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210251656 ## File path: flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import java.io.PrintStream; +import java.io.Serializable; + +/** + * Task output writer for DataStream and DataSet print API. + */ +public class TaskOutputWriter implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final boolean STD_OUT = false; + private static final boolean STD_ERR = true; + + private final boolean target; + private transient PrintStream stream; + private final String sinkIdentifier; + private transient String completedPrefix; + + public TaskOutputWriter() { + this("", STD_OUT); + } + + public TaskOutputWriter(final boolean stdErr) { + this("", stdErr); + } + + public TaskOutputWriter(final String sinkIdentifier, final boolean stdErr) { Review comment: This class could be simplified by simply passing in the `OutputStream`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581010#comment-16581010 ] ASF GitHub Bot commented on FLINK-9850: --- zentol commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210252035 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -18,81 +18,74 @@ package org.apache.flink.streaming.api.functions.sink; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.util.TaskOutputWriter; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import java.io.PrintStream; - /** * Implementation of the SinkFunction writing every tuple to the standard * output or standard error stream. * - * @param - *Input record type + * + * Four possible format options: + * {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1 + * {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1 + * taskId> output<- no {@code sinkIdentifier} provided, parallelism > 1 + * output<- no {@code sinkIdentifier} provided, parallelism == 1 + * + * + * @param Input record type */ @PublicEvolving public class PrintSinkFunction extends RichSinkFunction { - private static final long serialVersionUID = 1L; - private static final boolean STD_OUT = false; - private static final boolean STD_ERR = true; + private static final long serialVersionUID = 1L; - private boolean target; - private transient PrintStream stream; - private transient String prefix; + private final TaskOutputWriter writer; /** * Instantiates a print sink function that prints to standard out. */ - public PrintSinkFunction() {} + public PrintSinkFunction() { + writer = new TaskOutputWriter<>(); Review comment: call the other constructor instead This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581011#comment-16581011 ] ASF GitHub Bot commented on FLINK-9850: --- zentol commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210251434 ## File path: flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import java.io.PrintStream; +import java.io.Serializable; + +/** + * Task output writer for DataStream and DataSet print API. + */ +public class TaskOutputWriter implements Serializable { Review comment: can we give this a different name, like `PrintSinkOutputWriter`? The current name is a bit too general imo. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581006#comment-16581006 ] ASF GitHub Bot commented on FLINK-9850: --- zentol commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210252151 ## File path: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ## @@ -959,6 +959,29 @@ class DataStream[T](stream: JavaStream[T]) { @PublicEvolving def printToErr() = stream.printToErr() + /** +* Writes a DataStream to the standard output stream (stdout). For each +* element of the DataStream the result of [[AnyRef.toString()]] is +* written. +* +* @param sinkIdentifier The string to prefix the output with. +* @return The closed DataStream. +*/ + @PublicEvolving + def print(sinkIdentifier: String): DataStreamSink[T] = stream.print(sinkIdentifier) Review comment: these are not tested This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580959#comment-16580959 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210240596 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link PrintingOutputFormat}. + */ +public class PrintingOutputFormatTest { + + private final PrintStream originalSystemOut = System.out; + private final PrintStream originalSystemErr = System.err; + + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + private final ByteArrayOutputStream arrayErrorStream = new ByteArrayOutputStream(); + + private final String line = System.lineSeparator(); + + @Before + public void setUp() { + System.setOut(new PrintStream(arrayOutputStream)); + System.setErr(new PrintStream(arrayErrorStream)); + } + + @After + public void tearDown() { + if (System.out != originalSystemOut) { + System.out.close(); + } + if (System.err != originalSystemErr) { + System.err.close(); + } + System.setOut(originalSystemOut); + System.setErr(originalSystemErr); + } + + @Test + public void testPrintOutputFormatStdOut() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>(); + printSink.open(0, 1); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatStdErr() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>(true); + printSink.open(0, 1); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.err", printSink.toString()); + assertEquals("hello world!" + line, arrayErrorStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatWithPrefix() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>(); + printSink.open(1, 2); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("2> hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatWithIdentifierAndPrefix() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>("mySink", false); + printSink.open(1, 2); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, arrayOutputStream.toString()); + + printSink = new PrintingOutputFormat<>("mySink", true); + assertEquals("Print to System.err", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, arrayOutputStream.toString()); Review comment: In those 3 lines: were you trying to test for stderr? If so, you haven't invoked anything on stderr instance of `printSink` and in this line, you
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580961#comment-16580961 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210239256 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -18,81 +18,74 @@ package org.apache.flink.streaming.api.functions.sink; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.util.TaskOutputWriter; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import java.io.PrintStream; - /** * Implementation of the SinkFunction writing every tuple to the standard * output or standard error stream. * - * @param - *Input record type + * + * Four possible format options: + * {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1 + * {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1 + * taskId> output<- no {@code sinkIdentifier} provided, parallelism > 1 + * output<- no {@code sinkIdentifier} provided, parallelism == 1 + * + * + * @param Input record type */ @PublicEvolving public class PrintSinkFunction extends RichSinkFunction { - private static final long serialVersionUID = 1L; - private static final boolean STD_OUT = false; - private static final boolean STD_ERR = true; + private static final long serialVersionUID = 1L; - private boolean target; - private transient PrintStream stream; - private transient String prefix; + private final TaskOutputWriter writer; /** * Instantiates a print sink function that prints to standard out. */ - public PrintSinkFunction() {} + public PrintSinkFunction() { + writer = new TaskOutputWriter<>(); + } /** * Instantiates a print sink function that prints to standard out. * * @param stdErr True, if the format should print to standard error instead of standard out. */ - public PrintSinkFunction(boolean stdErr) { - target = stdErr; - } - - public void setTargetToStandardOut() { - target = STD_OUT; + public PrintSinkFunction(final boolean stdErr) { + writer = new TaskOutputWriter<>(stdErr); } - public void setTargetToStandardErr() { - target = STD_ERR; + /** +* Instantiates a print sink function that prints to standard out and gives a sink identifier. +* +* @param stdErr True, if the format should print to standard error instead of standard out. +* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value +*/ + public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) { + writer = new TaskOutputWriter<>(sinkIdentifier, stdErr); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); - // get the target stream - stream = target == STD_OUT ? System.out : System.err; + int taskNumber = context.getIndexOfThisSubtask(); Review comment: nit: inline those `taskNumber` and `numTasks` variables? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580949#comment-16580949 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210239256 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -18,81 +18,74 @@ package org.apache.flink.streaming.api.functions.sink; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.util.TaskOutputWriter; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import java.io.PrintStream; - /** * Implementation of the SinkFunction writing every tuple to the standard * output or standard error stream. * - * @param - *Input record type + * + * Four possible format options: + * {@code sinkIdentifier}:taskId> output <- {@code sinkIdentifier} provided, parallelism > 1 + * {@code sinkIdentifier}> output <- {@code sinkIdentifier} provided, parallelism == 1 + * taskId> output<- no {@code sinkIdentifier} provided, parallelism > 1 + * output<- no {@code sinkIdentifier} provided, parallelism == 1 + * + * + * @param Input record type */ @PublicEvolving public class PrintSinkFunction extends RichSinkFunction { - private static final long serialVersionUID = 1L; - private static final boolean STD_OUT = false; - private static final boolean STD_ERR = true; + private static final long serialVersionUID = 1L; - private boolean target; - private transient PrintStream stream; - private transient String prefix; + private final TaskOutputWriter writer; /** * Instantiates a print sink function that prints to standard out. */ - public PrintSinkFunction() {} + public PrintSinkFunction() { + writer = new TaskOutputWriter<>(); + } /** * Instantiates a print sink function that prints to standard out. * * @param stdErr True, if the format should print to standard error instead of standard out. */ - public PrintSinkFunction(boolean stdErr) { - target = stdErr; - } - - public void setTargetToStandardOut() { - target = STD_OUT; + public PrintSinkFunction(final boolean stdErr) { + writer = new TaskOutputWriter<>(stdErr); } - public void setTargetToStandardErr() { - target = STD_ERR; + /** +* Instantiates a print sink function that prints to standard out and gives a sink identifier. +* +* @param stdErr True, if the format should print to standard error instead of standard out. +* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value +*/ + public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) { + writer = new TaskOutputWriter<>(sinkIdentifier, stdErr); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); - // get the target stream - stream = target == STD_OUT ? System.out : System.err; + int taskNumber = context.getIndexOfThisSubtask(); Review comment: nit: inline those variables? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}}
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580958#comment-16580958 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210241537 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockRuntimeContext.java ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import java.util.Collections; + +/** + * Simple test runtime context for stream operators. + */ +public class MockRuntimeContext extends StreamingRuntimeContext { Review comment: Why did you add this class? I have already extracted the same thing on the master branch to `MockStreamingRuntimeContext`. I'm guessing some rebase error? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580957#comment-16580957 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210242096 ## File path: flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import java.io.PrintStream; +import java.io.Serializable; + +/** + * Task output writer for DataStream and DataSet print API. + */ +public class TaskOutputWriter implements Serializable { Review comment: add `@Internal`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580954#comment-16580954 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210239166 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link PrintingOutputFormat}. + */ +public class PrintingOutputFormatTest { Review comment: +1 for adding those tests! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580951#comment-16580951 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210240642 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link PrintingOutputFormat}. + */ +public class PrintingOutputFormatTest { + + private final PrintStream originalSystemOut = System.out; + private final PrintStream originalSystemErr = System.err; + + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + private final ByteArrayOutputStream arrayErrorStream = new ByteArrayOutputStream(); + + private final String line = System.lineSeparator(); + + @Before + public void setUp() { + System.setOut(new PrintStream(arrayOutputStream)); + System.setErr(new PrintStream(arrayErrorStream)); + } + + @After + public void tearDown() { + if (System.out != originalSystemOut) { + System.out.close(); + } + if (System.err != originalSystemErr) { + System.err.close(); + } + System.setOut(originalSystemOut); + System.setErr(originalSystemErr); + } + + @Test + public void testPrintOutputFormatStdOut() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>(); + printSink.open(0, 1); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatStdErr() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>(true); + printSink.open(0, 1); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.err", printSink.toString()); + assertEquals("hello world!" + line, arrayErrorStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatWithPrefix() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>(); + printSink.open(1, 2); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("2> hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatWithIdentifierAndPrefix() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>("mySink", false); + printSink.open(1, 2); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, arrayOutputStream.toString()); + + printSink = new PrintingOutputFormat<>("mySink", true); Review comment: nit: you haven't closed the previous sink. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580956#comment-16580956 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210240596 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link PrintingOutputFormat}. + */ +public class PrintingOutputFormatTest { + + private final PrintStream originalSystemOut = System.out; + private final PrintStream originalSystemErr = System.err; + + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + private final ByteArrayOutputStream arrayErrorStream = new ByteArrayOutputStream(); + + private final String line = System.lineSeparator(); + + @Before + public void setUp() { + System.setOut(new PrintStream(arrayOutputStream)); + System.setErr(new PrintStream(arrayErrorStream)); + } + + @After + public void tearDown() { + if (System.out != originalSystemOut) { + System.out.close(); + } + if (System.err != originalSystemErr) { + System.err.close(); + } + System.setOut(originalSystemOut); + System.setErr(originalSystemErr); + } + + @Test + public void testPrintOutputFormatStdOut() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>(); + printSink.open(0, 1); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatStdErr() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>(true); + printSink.open(0, 1); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.err", printSink.toString()); + assertEquals("hello world!" + line, arrayErrorStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatWithPrefix() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>(); + printSink.open(1, 2); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("2> hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatWithIdentifierAndPrefix() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>("mySink", false); + printSink.open(1, 2); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, arrayOutputStream.toString()); + + printSink = new PrintingOutputFormat<>("mySink", true); + assertEquals("Print to System.err", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, arrayOutputStream.toString()); Review comment: Were you trying in those 3 lines test for stderr? If so, you haven't invoked anything on stderr instance of `printSink` and in this line, you are
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580950#comment-16580950 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210238813 ## File path: flink-core/src/main/java/org/apache/flink/api/common/functions/util/TaskOutputWriter.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions.util; + +import java.io.PrintStream; +import java.io.Serializable; + +/** + * Task output writer for DataStream and DataSet print API. + */ +public class TaskOutputWriter implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final boolean STD_OUT = false; + private static final boolean STD_ERR = true; + + private final boolean target; + private transient PrintStream stream; + private final String sinkIdentifier; + private transient String completedPrefix; + + public TaskOutputWriter() { + this("", STD_OUT); + } + + public TaskOutputWriter(final boolean stdErr) { + this("", stdErr); + } + + public TaskOutputWriter(final String sinkIdentifier, final boolean stdErr) { + this.target = stdErr; + this.sinkIdentifier = (sinkIdentifier == null ? "" : sinkIdentifier); + } + + public void open(int taskNumber, int numTasks) { + // get the target stream + stream = target == STD_OUT ? System.out : System.err; + + completedPrefix = sinkIdentifier; + + if (numTasks > 1) { + if (!completedPrefix.isEmpty()) { + completedPrefix += ":"; + } + completedPrefix += (taskNumber + 1); + } + + if (!completedPrefix.isEmpty()) { + completedPrefix += "> "; + } + } + + public void write(IN record) { + stream.println(completedPrefix + record.toString()); + } + + public void close() { Review comment: Drop the close? It seems like a dead empty method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580952#comment-16580952 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210240728 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/PrintingOutputFormatTest.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link PrintingOutputFormat}. + */ +public class PrintingOutputFormatTest { + + private final PrintStream originalSystemOut = System.out; + private final PrintStream originalSystemErr = System.err; + + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + private final ByteArrayOutputStream arrayErrorStream = new ByteArrayOutputStream(); + + private final String line = System.lineSeparator(); + + @Before + public void setUp() { + System.setOut(new PrintStream(arrayOutputStream)); + System.setErr(new PrintStream(arrayErrorStream)); + } + + @After + public void tearDown() { + if (System.out != originalSystemOut) { + System.out.close(); + } + if (System.err != originalSystemErr) { + System.err.close(); + } + System.setOut(originalSystemOut); + System.setErr(originalSystemErr); + } + + @Test + public void testPrintOutputFormatStdOut() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>(); + printSink.open(0, 1); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatStdErr() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>(true); + printSink.open(0, 1); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.err", printSink.toString()); + assertEquals("hello world!" + line, arrayErrorStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatWithPrefix() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>(); + printSink.open(1, 2); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("2> hello world!" + line, arrayOutputStream.toString()); + printSink.close(); + } + + @Test + public void testPrintOutputFormatWithIdentifierAndPrefix() throws Exception { + PrintingOutputFormat printSink = new PrintingOutputFormat<>("mySink", false); + printSink.open(1, 2); + + printSink.writeRecord("hello world!"); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, arrayOutputStream.toString()); + + printSink = new PrintingOutputFormat<>("mySink", true); + assertEquals("Print to System.err", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, arrayOutputStream.toString()); + + printSink.close(); + } + + @Test + public void testPrintOutputFormatWithIdentifierButNoPrefix() throws Exception { +
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580953#comment-16580953 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210240875 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ## @@ -103,4 +102,41 @@ public void testPrintSinkWithPrefix() throws Exception { assertEquals("2> hello world!" + line, arrayOutputStream.toString()); printSink.close(); } + + @Test + public void testPrintSinkWithIdentifierAndPrefix() throws Exception { + PrintSinkFunction printSink = new PrintSinkFunction<>("mySink", false); + printSink.setRuntimeContext(new MockStreamingRuntimeContext(false, 2, 1)); + printSink.open(new Configuration()); + + printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, arrayOutputStream.toString()); + + printSink = new PrintSinkFunction<>("mySink", true); + assertEquals("Print to System.err", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, arrayOutputStream.toString()); + + printSink.close(); + } + + @Test + public void testPrintSinkWithIdentifierButNoPrefix() throws Exception { + PrintSinkFunction printSink = new PrintSinkFunction<>("mySink", false); + printSink.setRuntimeContext(new MockRuntimeContext(false, 1, 0)); + printSink.open(new Configuration()); + + printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink> hello world!" + line, arrayOutputStream.toString()); + + printSink = new PrintSinkFunction<>("mySink", true); + assertEquals("Print to System.err", printSink.toString()); + assertEquals("mySink> hello world!" + line, arrayOutputStream.toString()); Review comment: ditto regarding testing stderr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580955#comment-16580955 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r210240862 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ## @@ -103,4 +102,41 @@ public void testPrintSinkWithPrefix() throws Exception { assertEquals("2> hello world!" + line, arrayOutputStream.toString()); printSink.close(); } + + @Test + public void testPrintSinkWithIdentifierAndPrefix() throws Exception { + PrintSinkFunction printSink = new PrintSinkFunction<>("mySink", false); + printSink.setRuntimeContext(new MockStreamingRuntimeContext(false, 2, 1)); + printSink.open(new Configuration()); + + printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, arrayOutputStream.toString()); + + printSink = new PrintSinkFunction<>("mySink", true); + assertEquals("Print to System.err", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, arrayOutputStream.toString()); Review comment: ditto regarding testing stderr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580001#comment-16580001 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412926404 cc @pnowojski updated This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579783#comment-16579783 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski edited a comment on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412868964 Yes exactly. More precisely as I wrote in one of the previous comments, this `TaskOutputWriter` (`TaskStandardOutputWriter`?) would deduplicate logic from those methods: - open - writeRecord/invoke - toString - close (those probably should be dropped because of NPE.) Because their logic is identical between `PrintSinkFunction` and `PrintingOutputFormat` while their implementation is already inconsistent. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579782#comment-16579782 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412868964 Yes exactly. More precisely as I wrote in one of the previous comments, this `TaskOutputWriter` (`TaskStandardOutputWriter`?) would deduplicate logic from those methods: - open - writeRecord/invoke - toString - close (those probably should be dropped because of NPE.) Because their logic is identical between `PrintSinkFunction` and `PrintingOutputFormat` while their implementation is already inconsistent. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579732#comment-16579732 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412854540 @pnowojski fixed some issues you just mentioned, will refactor `PrintSinkFunction ` and `PrintingOutputFormat `, you mean I should create a new class named `TaskOutputWriter ` and extract the same logic code? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579662#comment-16579662 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209911856 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -70,27 +84,30 @@ public void open(Configuration parameters) throws Exception { // get the target stream stream = target == STD_OUT ? System.out : System.err; - // set the prefix if we have a >1 parallelism - prefix = (context.getNumberOfParallelSubtasks() > 1) ? - ((context.getIndexOfThisSubtask() + 1) + "> ") : null; + completedPrefix = sinkIdentifier; + + if (context.getNumberOfParallelSubtasks() > 1) { + if (!completedPrefix.isEmpty()) { + completedPrefix += ":"; + } + completedPrefix += (context.getIndexOfThisSubtask() + 1); + } + + if (!completedPrefix.isEmpty()) { + completedPrefix += "> "; + } } @Override public void invoke(IN record) { - if (prefix != null) { - stream.println(prefix + record.toString()); + if (completedPrefix != null) { Review comment: This check is not valid anymore, this field is never null. It can either be dropped altogether or replaced by empty string check. Probably it's better to drop it. Functionally both will be the same and empty string check's performance benefit (if any) doesn't matter while printing to STDOUT/STDERR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579655#comment-16579655 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209911856 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -70,27 +84,30 @@ public void open(Configuration parameters) throws Exception { // get the target stream stream = target == STD_OUT ? System.out : System.err; - // set the prefix if we have a >1 parallelism - prefix = (context.getNumberOfParallelSubtasks() > 1) ? - ((context.getIndexOfThisSubtask() + 1) + "> ") : null; + completedPrefix = sinkIdentifier; + + if (context.getNumberOfParallelSubtasks() > 1) { + if (!completedPrefix.isEmpty()) { + completedPrefix += ":"; + } + completedPrefix += (context.getIndexOfThisSubtask() + 1); + } + + if (!completedPrefix.isEmpty()) { + completedPrefix += "> "; + } } @Override public void invoke(IN record) { - if (prefix != null) { - stream.println(prefix + record.toString()); + if (completedPrefix != null) { Review comment: This check is not valid anymore, this field is never null. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576573#comment-16576573 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412144209 Hi @pnowojski thanks for your suggestion, I have refactored this PR, we both changed the `PrintSinkFunctionTest` so maybe I did not finish all of the works. What's more, I agree with the opinion about "deduplicate the logic of PrintSinkFunction and PrintingOutputFormat", I'd like to start this work, if current issue would be fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576310#comment-16576310 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209260792 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ## @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception { stream.close(); } + @Test + public void testPrintSinkWithIdentifierAndPrefix() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream stream = new PrintStream(baos); + System.setOut(stream); + + final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class); + Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2); + Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1); + + PrintSinkFunction printSink = new PrintSinkFunction<>(false, "mySink"); + printSink.setRuntimeContext(ctx); + try { + printSink.open(new Configuration()); + } catch (Exception e) { + Assert.fail(); + } + printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, baos.toString()); + + printSink.setTargetToStandardErr(); + assertEquals("Print to System.err", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, baos.toString()); + + printSink.close(); + stream.close(); + } + + @Test + public void testPrintSinkWithIdentifierButNoPrefix() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); Review comment: There is quite a lot of code duplication in those tests and they were unnecessarily using mockito instead of proper mock. Also there was even a bug in `testPrintSinkStdErr`. I have fixed those issues in a hotifx: https://github.com/apache/flink/pull/6538 I would the hotifx to be merged before this PR and please adapt/rewrite your test in similar fashion as I did in my hotfix. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576314#comment-16576314 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209234301 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -55,6 +57,17 @@ public PrintSinkFunction(boolean stdErr) { target = stdErr; } + /** +* Instantiates a print sink function that prints to standard out and gives a sink identifier. +* +* @param stdErr True, if the format should print to standard error instead of standard out. +* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value +*/ + public PrintSinkFunction(boolean stdErr, String sinkIdentifier) { + this(stdErr); + this.sinkIdentifier = sinkIdentifier; Review comment: Usually less detailed constructor is calling the more specific ones, not the other way around. Here it will allow you to mark fields as `final` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576311#comment-16576311 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209234901 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws Exception { // get the target stream stream = target == STD_OUT ? System.out : System.err; + /** Review comment: please move this to class's java doc. Also replace `sinkId`, `sink id` with `{@code sinkIdentifier} `. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576308#comment-16576308 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209237840 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws Exception { // get the target stream stream = target == STD_OUT ? System.out : System.err; + /** +* Four possible format options: +* sinkId:taskId> output <- sink id provided, parallelism > 1 +* sinkId> output <- sink id provided, parallelism == 1 +* taskId> output <- no sink id provided, parallelism > 1 +* output <- no sink id provided, parallelism == 1 +*/ + // set the prefix if we have a >1 parallelism prefix = (context.getNumberOfParallelSubtasks() > 1) ? ((context.getIndexOfThisSubtask() + 1) + "> ") : null; + + if (prefix == null) { Review comment: Don't use nulls here. In this case we can easily use empty string for the same purpose and it will be safer (no possible NPE). Btw, rephrasing this logic like that: ``` completedPrefix = sinkIdentifier; if (context.getNumberOfParallelSubtasks() > 1)) { if (!completedPrefix.isEmpty()) { completedPrefix += ":"; } completedPrefix += (context.getIndexOfThisSubtask() + 1); } if (!completedPrefix.isEmpty()) { completedPrefix += ">"; } ``` (optionally with ternary operator instead of some 'if' statements - that's only a matter of taste) simplifies the logic and deduplicate some of the code/constants. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576309#comment-16576309 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209233947 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -40,6 +40,8 @@ private boolean target; private transient PrintStream stream; private transient String prefix; + private String sinkIdentifier; + private transient String completedPrefix; Review comment: Please drop `prefix` field, it's only a local variable now. Also please change `target` and `sinkIdentifier` into `final` fields. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576313#comment-16576313 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209239877 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ## @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception { stream.close(); } + @Test + public void testPrintSinkWithIdentifierAndPrefix() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream stream = new PrintStream(baos); + System.setOut(stream); + + final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class); + Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2); + Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1); + + PrintSinkFunction printSink = new PrintSinkFunction<>(false, "mySink"); + printSink.setRuntimeContext(ctx); + try { + printSink.open(new Configuration()); + } catch (Exception e) { + Assert.fail(); Review comment: Do not hide the original exception. From what I've heard there was some bug with old junit version and that's why this pattern is reoccurring in Flink tests. Regardless if that was the case, it's not the problem anymore, and hiding original exception makes test failures harder to read/understand. (ditto in rest of the file as well). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576312#comment-16576312 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209238315 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -89,6 +116,8 @@ public void invoke(IN record) { public void close() { this.stream = null; this.prefix = null; + this.sinkIdentifier = null; + this.completedPrefix = null; Review comment: Please drop the this `close` method. It doesn't do anything beside causing `NPE`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576307#comment-16576307 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209239178 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ## @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception { stream.close(); } + @Test + public void testPrintSinkWithIdentifierAndPrefix() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream stream = new PrintStream(baos); + System.setOut(stream); + + final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class); Review comment: Do not use mockito for such things, it's very difficult to maintain such tests in the future. Instead please move `org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseTest.MockRuntimeContext` class to `flink-streaming-java` module to some utility package and reuse it in this whole `PrintSinkFunctionTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575041#comment-16575041 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-411806291 cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16563408#comment-16563408 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-409163881 cc @tillrohrmann and @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554399#comment-16554399 ] ASF GitHub Bot commented on FLINK-9850: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6367 cc @dawidwys and @pnowojski > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16549354#comment-16549354 ] ASF GitHub Bot commented on FLINK-9850: --- Github user tison1 commented on the issue: https://github.com/apache/flink/pull/6367 @yanghua also +1 this is a net win. > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16549334#comment-16549334 ] ASF GitHub Bot commented on FLINK-9850: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/6367 @yanghua Thanks for your update. +1 to merge > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16549001#comment-16549001 ] ASF GitHub Bot commented on FLINK-9850: --- Github user tison1 commented on a diff in the pull request: https://github.com/apache/flink/pull/6367#discussion_r203644208 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java --- @@ -40,6 +40,8 @@ private boolean target; private transient PrintStream stream; private transient String prefix; + private String sinkIdentifier; + private String completedPrefix; --- End diff -- if `prefix` is `transient`, why not `completedPrefix`? > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548837#comment-16548837 ] ASF GitHub Bot commented on FLINK-9850: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6367 @hequn8128 thanks, I have added some test case~ > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548718#comment-16548718 ] ASF GitHub Bot commented on FLINK-9850: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/6367#discussion_r203588835 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala --- @@ -959,6 +959,29 @@ class DataStream[T](stream: JavaStream[T]) { @PublicEvolving def printToErr() = stream.printToErr() + /** +* Writes a DataStream to the standard output stream (stdout). For each +* element of the DataStream the result of .toString is --- End diff -- .toString => [[AnyRef.toString()]] > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548004#comment-16548004 ] ASF GitHub Bot commented on FLINK-9850: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6367 @tillrohrmann and @zentol I see the Python DataStream API methods do not match DataStream Java API methods (missed some API methods), Shall we add those missed API into `PythonDataStream`? If yes, I'd like to do this. > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16547965#comment-16547965 ] ASF GitHub Bot commented on FLINK-9850: --- GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/6367 [FLINK-9850] Add a string to the print method to identify output for DataStream ## What is the purpose of the change *This pull request adds a string to the print method to identify output for DataStream* ## Brief change log - *add print(string) / printToErr(string) to DataStream Java API* - *add print(string) / printToErr(string) to DataStream Scala API* - *add print(string) to DataStream Python API* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / **docs** / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-9850 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6367.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6367 commit 80215cd12618392ab0909a431863939d3353ca16 Author: yanghua Date: 2018-07-18T15:20:11Z [FLINK-9850] Add a string to the print method to identify output for DataStream > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)