This is an automated email from the ASF dual-hosted git repository. hequn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 5dadbf3 [FLINK-18888][python] Support execute_async for StreamExecutionEnvironment. (#13126) 5dadbf3 is described below commit 5dadbf312bb103dd3e8e812ac42a909479e2ca8a Author: Shuiqiang Chen <acqua....@alibaba-inc.com> AuthorDate: Wed Aug 12 18:59:22 2020 +0800 [FLINK-18888][python] Support execute_async for StreamExecutionEnvironment. (#13126) --- .../datastream/stream_execution_environment.py | 15 ++++++++++ .../pyflink/datastream/tests/test_data_stream.py | 2 +- .../tests/test_stream_execution_environment.py | 35 ++++++++++++++-------- flink-python/pyflink/datastream/tests/test_util.py | 4 +-- .../python/util/DataStreamTestCollectSink.java | 6 +++- 5 files changed, 46 insertions(+), 16 deletions(-) diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py index a7b110b..602c810 100644 --- a/flink-python/pyflink/datastream/stream_execution_environment.py +++ b/flink-python/pyflink/datastream/stream_execution_environment.py @@ -21,6 +21,7 @@ import tempfile from typing import List, Any from pyflink.common.execution_config import ExecutionConfig +from pyflink.common.job_client import JobClient from pyflink.common.job_execution_result import JobExecutionResult from pyflink.common.restart_strategy import RestartStrategies from pyflink.common.typeinfo import PickledBytesTypeInfo, TypeInformation @@ -420,6 +421,20 @@ class StreamExecutionEnvironment(object): else: return JobExecutionResult(self._j_stream_execution_environment.execute(job_name)) + def execute_async(self, job_name: str = 'Flink Streaming Job') -> JobClient: + """ + Triggers the program asynchronously. The environment will execute all parts of the program + that have resulted in a "sink" operation. Sink operations are for example printing results + or forwarding them to a message queue. + The program execution will be logged and displayed with a generated default name. + + :param job_name: Desired name of the job. + :return: A JobClient that can be used to communicate with the submitted job, completed on + submission succeeded. + """ + j_job_client = self._j_stream_execution_environment.executeAsync(job_name) + return JobClient(j_job_client=j_job_client) + def get_execution_plan(self): """ Creates the plan with which the system will execute the program, and returns it as diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py index 901eff6..61027f6 100644 --- a/flink-python/pyflink/datastream/tests/test_data_stream.py +++ b/flink-python/pyflink/datastream/tests/test_data_stream.py @@ -323,7 +323,7 @@ class DataStreamTests(PyFlinkTestCase): keyed_stream.forward() def tearDown(self) -> None: - self.test_sink.get_results() + self.test_sink.clear() class MyMapFunction(MapFunction): diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py index 7417746..4a6c71b 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py @@ -36,6 +36,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): def setUp(self): self.env = StreamExecutionEnvironment.get_execution_environment() + self.test_sink = DataStreamTestSinkFunction() def test_get_config(self): execution_config = self.env.get_config() @@ -217,10 +218,9 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): def test_from_collection_without_data_types(self): ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')]) - test_sink = DataStreamTestSinkFunction() - ds.add_sink(test_sink) + ds.add_sink(self.test_sink) self.env.execute("test from collection") - results = test_sink.get_results(True) + results = self.test_sink.get_results(True) # user does not specify data types for input data, the collected result should be in # in tuple format as inputs. expected = ["(1, 'Hi', 'Hello')", "(2, 'Hello', 'Hi')"] @@ -233,10 +233,9 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): type_info=Types.ROW([Types.INT(), Types.STRING(), Types.STRING()])) - test_sink = DataStreamTestSinkFunction() - ds.add_sink(test_sink) + ds.add_sink(self.test_sink) self.env.execute("test from collection") - results = test_sink.get_results(False) + results = self.test_sink.get_results(False) # if user specifies data types of input data, the collected result should be in row format. expected = ['1,Hi,Hello', '2,Hello,Hi'] results.sort() @@ -246,10 +245,9 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): def test_add_custom_source(self): custom_source = SourceFunction("org.apache.flink.python.util.MyCustomSourceFunction") ds = self.env.add_source(custom_source, type_info=Types.ROW([Types.INT(), Types.STRING()])) - test_sink = DataStreamTestSinkFunction() - ds.add_sink(test_sink) + ds.add_sink(self.test_sink) self.env.execute("test add custom source") - results = test_sink.get_results(False) + results = self.test_sink.get_results(False) expected = ['3,Mike', '1,Marry', '4,Ted', '5,Jack', '0,Bob', '2,Henry'] results.sort() expected.sort() @@ -264,10 +262,23 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase): f.write('\n') ds = self.env.read_text_file(text_file_path) - test_sink = DataStreamTestSinkFunction() - ds.add_sink(test_sink) + ds.add_sink(self.test_sink) self.env.execute("test read text file") - results = test_sink.get_results() + results = self.test_sink.get_results() results.sort() texts.sort() self.assertEqual(texts, results) + + def test_execute_async(self): + ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')], + type_info=Types.ROW( + [Types.INT(), Types.STRING(), Types.STRING()])) + ds.add_sink(self.test_sink) + job_client = self.env.execute_async("test execute async") + job_id = job_client.get_job_id() + self.assertIsNotNone(job_id) + execution_result = job_client.get_job_execution_result().result() + self.assertEqual(str(job_id), str(execution_result.get_job_id())) + + def tearDown(self) -> None: + self.test_sink.clear() diff --git a/flink-python/pyflink/datastream/tests/test_util.py b/flink-python/pyflink/datastream/tests/test_util.py index 5bc4044..ada06f5 100644 --- a/flink-python/pyflink/datastream/tests/test_util.py +++ b/flink-python/pyflink/datastream/tests/test_util.py @@ -45,6 +45,6 @@ class DataStreamTestSinkFunction(SinkFunction): return str_results def clear(self): - if self._j_data_stream_test_collect_sink is None: + if self.j_data_stream_collect_sink is None: return - self._j_data_stream_test_collect_sink.collectAndClear() + self.j_data_stream_collect_sink.clear() diff --git a/flink-python/src/test/java/org/apache/flink/python/util/DataStreamTestCollectSink.java b/flink-python/src/test/java/org/apache/flink/python/util/DataStreamTestCollectSink.java index 75f2ce7..fc1dbf4 100644 --- a/flink-python/src/test/java/org/apache/flink/python/util/DataStreamTestCollectSink.java +++ b/flink-python/src/test/java/org/apache/flink/python/util/DataStreamTestCollectSink.java @@ -52,7 +52,11 @@ public class DataStreamTestCollectSink<IN> implements SinkFunction<IN> { listToBeReturned.add(obj.toString()); } } - collectedResult.clear(); + clear(); return listToBeReturned; } + + public void clear() { + collectedResult.clear(); + } }