This is an automated email from the ASF dual-hosted git repository. hequn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c694954 [FLINK-16766][python] Support creating StreamTableEnvironment without passing StreamExecutionEnvironment (#11769) c694954 is described below commit c6949540c6c639695e1a0fb2684b467c6219024f Author: SteNicholas <programg...@163.com> AuthorDate: Thu Apr 23 09:39:21 2020 +0800 [FLINK-16766][python] Support creating StreamTableEnvironment without passing StreamExecutionEnvironment (#11769) --- flink-python/pyflink/table/table_environment.py | 41 ++++++++++++++++------ .../table/tests/test_table_environment_api.py | 18 ++++++++++ 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index eae18c9..fcce768 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -1138,7 +1138,10 @@ class StreamTableEnvironment(TableEnvironment): super(StreamTableEnvironment, self).__init__(j_tenv) def _get_j_env(self): - return self._j_tenv.execEnv() + if self._is_blink_planner: + return self._j_tenv.getPlanner().getExecEnv() + else: + return self._j_tenv.getPlanner().getExecutionEnvironment() def get_config(self): """ @@ -1185,26 +1188,27 @@ class StreamTableEnvironment(TableEnvironment): self._j_tenv.connect(connector_descriptor._j_connector_descriptor)) @staticmethod - def create(stream_execution_environment, table_config=None, environment_settings=None): + def create(stream_execution_environment=None, table_config=None, environment_settings=None): """ - Creates a :class:`~pyflink.table.TableEnvironment` for a - :class:`~pyflink.datastream.StreamExecutionEnvironment`. + Creates a :class:`~pyflink.table.StreamTableEnvironment`. Example: :: + # create with StreamExecutionEnvironment. >>> env = StreamExecutionEnvironment.get_execution_environment() - # create without optional parameters. >>> table_env = StreamTableEnvironment.create(env) - # create with TableConfig + # create with StreamExecutionEnvironment and TableConfig. >>> table_config = TableConfig() >>> table_config.set_null_check(False) >>> table_env = StreamTableEnvironment.create(env, table_config) - # create with EnvrionmentSettings + # create with StreamExecutionEnvironment and EnvironmentSettings. >>> environment_settings = EnvironmentSettings.new_instance().use_blink_planner() \\ ... .build() >>> table_env = StreamTableEnvironment.create( ... env, environment_settings=environment_settings) + # create with EnvironmentSettings. + >>> table_env = StreamTableEnvironment.create(environment_settings=environment_settings) :param stream_execution_environment: The @@ -1221,7 +1225,18 @@ class StreamTableEnvironment(TableEnvironment): configuration. :rtype: pyflink.table.StreamTableEnvironment """ - if table_config is not None and environment_settings is not None: + if stream_execution_environment is None and \ + table_config is None and \ + environment_settings is None: + raise ValueError("No argument found, the param 'stream_execution_environment' " + "or 'environment_settings' is required.") + elif stream_execution_environment is None and \ + table_config is not None and \ + environment_settings is None: + raise ValueError("Only the param 'table_config' is found, " + "the param 'stream_execution_environment' is also required.") + if table_config is not None and \ + environment_settings is not None: raise ValueError("The param 'table_config' and " "'environment_settings' cannot be used at the same time") @@ -1234,9 +1249,13 @@ class StreamTableEnvironment(TableEnvironment): if not environment_settings.is_streaming_mode(): raise ValueError("The environment settings for StreamTableEnvironment must be " "set to streaming mode.") - j_tenv = gateway.jvm.StreamTableEnvironment.create( - stream_execution_environment._j_stream_execution_environment, - environment_settings._j_environment_settings) + if stream_execution_environment is None: + j_tenv = gateway.jvm.TableEnvironment.create( + environment_settings._j_environment_settings) + else: + j_tenv = gateway.jvm.StreamTableEnvironment.create( + stream_execution_environment._j_stream_execution_environment, + environment_settings._j_environment_settings) else: j_tenv = gateway.jvm.StreamTableEnvironment.create( stream_execution_environment._j_stream_execution_environment) diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index efdf115..f640e87 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -301,6 +301,24 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa planner.getClass().getName(), "org.apache.flink.table.planner.delegation.StreamPlanner") + t_env = StreamTableEnvironment.create( + environment_settings=EnvironmentSettings.new_instance().build()) + + planner = t_env._j_tenv.getPlanner() + + self.assertEqual( + planner.getClass().getName(), + "org.apache.flink.table.planner.StreamPlanner") + + t_env = StreamTableEnvironment.create( + environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) + + planner = t_env._j_tenv.getPlanner() + + self.assertEqual( + planner.getClass().getName(), + "org.apache.flink.table.planner.delegation.StreamPlanner") + def test_table_environment_with_blink_planner(self): self.env.set_parallelism(1) t_env = StreamTableEnvironment.create(