This is an automated email from the ASF dual-hosted git repository.

hequn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c694954  [FLINK-16766][python] Support creating StreamTableEnvironment 
without passing StreamExecutionEnvironment (#11769)
c694954 is described below

commit c6949540c6c639695e1a0fb2684b467c6219024f
Author: SteNicholas <programg...@163.com>
AuthorDate: Thu Apr 23 09:39:21 2020 +0800

    [FLINK-16766][python] Support creating StreamTableEnvironment without 
passing StreamExecutionEnvironment (#11769)
---
 flink-python/pyflink/table/table_environment.py    | 41 ++++++++++++++++------
 .../table/tests/test_table_environment_api.py      | 18 ++++++++++
 2 files changed, 48 insertions(+), 11 deletions(-)

diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index eae18c9..fcce768 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -1138,7 +1138,10 @@ class StreamTableEnvironment(TableEnvironment):
         super(StreamTableEnvironment, self).__init__(j_tenv)
 
     def _get_j_env(self):
-        return self._j_tenv.execEnv()
+        if self._is_blink_planner:
+            return self._j_tenv.getPlanner().getExecEnv()
+        else:
+            return self._j_tenv.getPlanner().getExecutionEnvironment()
 
     def get_config(self):
         """
@@ -1185,26 +1188,27 @@ class StreamTableEnvironment(TableEnvironment):
             self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
     @staticmethod
-    def create(stream_execution_environment, table_config=None, 
environment_settings=None):
+    def create(stream_execution_environment=None, table_config=None, 
environment_settings=None):
         """
-        Creates a :class:`~pyflink.table.TableEnvironment` for a
-        :class:`~pyflink.datastream.StreamExecutionEnvironment`.
+        Creates a :class:`~pyflink.table.StreamTableEnvironment`.
 
         Example:
         ::
 
+            # create with StreamExecutionEnvironment.
             >>> env = StreamExecutionEnvironment.get_execution_environment()
-            # create without optional parameters.
             >>> table_env = StreamTableEnvironment.create(env)
-            # create with TableConfig
+            # create with StreamExecutionEnvironment and TableConfig.
             >>> table_config = TableConfig()
             >>> table_config.set_null_check(False)
             >>> table_env = StreamTableEnvironment.create(env, table_config)
-            # create with EnvrionmentSettings
+            # create with StreamExecutionEnvironment and EnvironmentSettings.
             >>> environment_settings = 
EnvironmentSettings.new_instance().use_blink_planner() \\
             ...     .build()
             >>> table_env = StreamTableEnvironment.create(
             ...     env, environment_settings=environment_settings)
+            # create with EnvironmentSettings.
+            >>> table_env = 
StreamTableEnvironment.create(environment_settings=environment_settings)
 
 
         :param stream_execution_environment: The
@@ -1221,7 +1225,18 @@ class StreamTableEnvironment(TableEnvironment):
                  configuration.
         :rtype: pyflink.table.StreamTableEnvironment
         """
-        if table_config is not None and environment_settings is not None:
+        if stream_execution_environment is None and \
+                table_config is None and \
+                environment_settings is None:
+            raise ValueError("No argument found, the param 
'stream_execution_environment' "
+                             "or 'environment_settings' is required.")
+        elif stream_execution_environment is None and \
+                table_config is not None and \
+                environment_settings is None:
+            raise ValueError("Only the param 'table_config' is found, "
+                             "the param 'stream_execution_environment' is also 
required.")
+        if table_config is not None and \
+                environment_settings is not None:
             raise ValueError("The param 'table_config' and "
                              "'environment_settings' cannot be used at the 
same time")
 
@@ -1234,9 +1249,13 @@ class StreamTableEnvironment(TableEnvironment):
             if not environment_settings.is_streaming_mode():
                 raise ValueError("The environment settings for 
StreamTableEnvironment must be "
                                  "set to streaming mode.")
-            j_tenv = gateway.jvm.StreamTableEnvironment.create(
-                stream_execution_environment._j_stream_execution_environment,
-                environment_settings._j_environment_settings)
+            if stream_execution_environment is None:
+                j_tenv = gateway.jvm.TableEnvironment.create(
+                    environment_settings._j_environment_settings)
+            else:
+                j_tenv = gateway.jvm.StreamTableEnvironment.create(
+                    
stream_execution_environment._j_stream_execution_environment,
+                    environment_settings._j_environment_settings)
         else:
             j_tenv = gateway.jvm.StreamTableEnvironment.create(
                 stream_execution_environment._j_stream_execution_environment)
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py 
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index efdf115..f640e87 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -301,6 +301,24 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, 
PyFlinkStreamTableTestCa
             planner.getClass().getName(),
             "org.apache.flink.table.planner.delegation.StreamPlanner")
 
+        t_env = StreamTableEnvironment.create(
+            environment_settings=EnvironmentSettings.new_instance().build())
+
+        planner = t_env._j_tenv.getPlanner()
+
+        self.assertEqual(
+            planner.getClass().getName(),
+            "org.apache.flink.table.planner.StreamPlanner")
+
+        t_env = StreamTableEnvironment.create(
+            
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
+
+        planner = t_env._j_tenv.getPlanner()
+
+        self.assertEqual(
+            planner.getClass().getName(),
+            "org.apache.flink.table.planner.delegation.StreamPlanner")
+
     def test_table_environment_with_blink_planner(self):
         self.env.set_parallelism(1)
         t_env = StreamTableEnvironment.create(

Reply via email to