This is an automated email from the ASF dual-hosted git repository.

hequn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dadbf3  [FLINK-18888][python] Support execute_async for 
StreamExecutionEnvironment. (#13126)
5dadbf3 is described below

commit 5dadbf312bb103dd3e8e812ac42a909479e2ca8a
Author: Shuiqiang Chen <acqua....@alibaba-inc.com>
AuthorDate: Wed Aug 12 18:59:22 2020 +0800

    [FLINK-18888][python] Support execute_async for StreamExecutionEnvironment. 
(#13126)
---
 .../datastream/stream_execution_environment.py     | 15 ++++++++++
 .../pyflink/datastream/tests/test_data_stream.py   |  2 +-
 .../tests/test_stream_execution_environment.py     | 35 ++++++++++++++--------
 flink-python/pyflink/datastream/tests/test_util.py |  4 +--
 .../python/util/DataStreamTestCollectSink.java     |  6 +++-
 5 files changed, 46 insertions(+), 16 deletions(-)

diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py 
b/flink-python/pyflink/datastream/stream_execution_environment.py
index a7b110b..602c810 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -21,6 +21,7 @@ import tempfile
 from typing import List, Any
 
 from pyflink.common.execution_config import ExecutionConfig
+from pyflink.common.job_client import JobClient
 from pyflink.common.job_execution_result import JobExecutionResult
 from pyflink.common.restart_strategy import RestartStrategies
 from pyflink.common.typeinfo import PickledBytesTypeInfo, TypeInformation
@@ -420,6 +421,20 @@ class StreamExecutionEnvironment(object):
         else:
             return 
JobExecutionResult(self._j_stream_execution_environment.execute(job_name))
 
+    def execute_async(self, job_name: str = 'Flink Streaming Job') -> 
JobClient:
+        """
+        Triggers the program asynchronously. The environment will execute all 
parts of the program
+        that have resulted in a "sink" operation. Sink operations are for 
example printing results
+        or forwarding them to a message queue.
+        The program execution will be logged and displayed with a generated 
default name.
+
+        :param job_name: Desired name of the job.
+        :return: A JobClient that can be used to communicate with the 
submitted job, completed on
+                 submission succeeded.
+        """
+        j_job_client = 
self._j_stream_execution_environment.executeAsync(job_name)
+        return JobClient(j_job_client=j_job_client)
+
     def get_execution_plan(self):
         """
         Creates the plan with which the system will execute the program, and 
returns it as
diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py 
b/flink-python/pyflink/datastream/tests/test_data_stream.py
index 901eff6..61027f6 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -323,7 +323,7 @@ class DataStreamTests(PyFlinkTestCase):
             keyed_stream.forward()
 
     def tearDown(self) -> None:
-        self.test_sink.get_results()
+        self.test_sink.clear()
 
 
 class MyMapFunction(MapFunction):
diff --git 
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py 
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 7417746..4a6c71b 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -36,6 +36,7 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
 
     def setUp(self):
         self.env = StreamExecutionEnvironment.get_execution_environment()
+        self.test_sink = DataStreamTestSinkFunction()
 
     def test_get_config(self):
         execution_config = self.env.get_config()
@@ -217,10 +218,9 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
 
     def test_from_collection_without_data_types(self):
         ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')])
-        test_sink = DataStreamTestSinkFunction()
-        ds.add_sink(test_sink)
+        ds.add_sink(self.test_sink)
         self.env.execute("test from collection")
-        results = test_sink.get_results(True)
+        results = self.test_sink.get_results(True)
         # user does not specify data types for input data, the collected 
result should be in
         # in tuple format as inputs.
         expected = ["(1, 'Hi', 'Hello')", "(2, 'Hello', 'Hi')"]
@@ -233,10 +233,9 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
                                       type_info=Types.ROW([Types.INT(),
                                                            Types.STRING(),
                                                            Types.STRING()]))
-        test_sink = DataStreamTestSinkFunction()
-        ds.add_sink(test_sink)
+        ds.add_sink(self.test_sink)
         self.env.execute("test from collection")
-        results = test_sink.get_results(False)
+        results = self.test_sink.get_results(False)
         # if user specifies data types of input data, the collected result 
should be in row format.
         expected = ['1,Hi,Hello', '2,Hello,Hi']
         results.sort()
@@ -246,10 +245,9 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
     def test_add_custom_source(self):
         custom_source = 
SourceFunction("org.apache.flink.python.util.MyCustomSourceFunction")
         ds = self.env.add_source(custom_source, 
type_info=Types.ROW([Types.INT(), Types.STRING()]))
-        test_sink = DataStreamTestSinkFunction()
-        ds.add_sink(test_sink)
+        ds.add_sink(self.test_sink)
         self.env.execute("test add custom source")
-        results = test_sink.get_results(False)
+        results = self.test_sink.get_results(False)
         expected = ['3,Mike', '1,Marry', '4,Ted', '5,Jack', '0,Bob', '2,Henry']
         results.sort()
         expected.sort()
@@ -264,10 +262,23 @@ class StreamExecutionEnvironmentTests(PyFlinkTestCase):
                 f.write('\n')
 
         ds = self.env.read_text_file(text_file_path)
-        test_sink = DataStreamTestSinkFunction()
-        ds.add_sink(test_sink)
+        ds.add_sink(self.test_sink)
         self.env.execute("test read text file")
-        results = test_sink.get_results()
+        results = self.test_sink.get_results()
         results.sort()
         texts.sort()
         self.assertEqual(texts, results)
+
+    def test_execute_async(self):
+        ds = self.env.from_collection([(1, 'Hi', 'Hello'), (2, 'Hello', 'Hi')],
+                                      type_info=Types.ROW(
+                                          [Types.INT(), Types.STRING(), 
Types.STRING()]))
+        ds.add_sink(self.test_sink)
+        job_client = self.env.execute_async("test execute async")
+        job_id = job_client.get_job_id()
+        self.assertIsNotNone(job_id)
+        execution_result = job_client.get_job_execution_result().result()
+        self.assertEqual(str(job_id), str(execution_result.get_job_id()))
+
+    def tearDown(self) -> None:
+        self.test_sink.clear()
diff --git a/flink-python/pyflink/datastream/tests/test_util.py 
b/flink-python/pyflink/datastream/tests/test_util.py
index 5bc4044..ada06f5 100644
--- a/flink-python/pyflink/datastream/tests/test_util.py
+++ b/flink-python/pyflink/datastream/tests/test_util.py
@@ -45,6 +45,6 @@ class DataStreamTestSinkFunction(SinkFunction):
             return str_results
 
     def clear(self):
-        if self._j_data_stream_test_collect_sink is None:
+        if self.j_data_stream_collect_sink is None:
             return
-        self._j_data_stream_test_collect_sink.collectAndClear()
+        self.j_data_stream_collect_sink.clear()
diff --git 
a/flink-python/src/test/java/org/apache/flink/python/util/DataStreamTestCollectSink.java
 
b/flink-python/src/test/java/org/apache/flink/python/util/DataStreamTestCollectSink.java
index 75f2ce7..fc1dbf4 100644
--- 
a/flink-python/src/test/java/org/apache/flink/python/util/DataStreamTestCollectSink.java
+++ 
b/flink-python/src/test/java/org/apache/flink/python/util/DataStreamTestCollectSink.java
@@ -52,7 +52,11 @@ public class DataStreamTestCollectSink<IN> implements 
SinkFunction<IN> {
                                listToBeReturned.add(obj.toString());
                        }
                }
-               collectedResult.clear();
+               clear();
                return listToBeReturned;
        }
+
+       public void clear() {
+               collectedResult.clear();
+       }
 }

Reply via email to