????????????????????

 




------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<hxbks...@gmail.com&gt;;
????????:&nbsp;2020??8??12??(??????) ????4:11
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re: pyflink 1.11.1 execute_sql 
sql_update????????????????????????????????????????????, ??????????????Deprecated



Hi,
execute_sql????????????execute????????????????????????????????????????????execute_sql????????????execute??????????????????????[1]????????????execute_sql????????????????????????????????????????????????????????????????
result = t_env.execute_sql("??????sql")
result.get_job_client().get_job_execution_result().result()

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query

Best,
Xingbo

?????? <huazhe...@foxmail.com&gt; ??2020??8??12?????? ????4:00??????

&gt; from pyflink.datastream import StreamExecutionEnvironment
&gt; from pyflink.table import EnvironmentSettings, StreamTableEnvironment
&gt;
&gt; # pyflink 1.11.1
&gt; environment_settings =
&gt; 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
&gt; senv = StreamExecutionEnvironment.get_execution_environment()
&gt; t_env = StreamTableEnvironment.create(senv,
&gt; environment_settings=environment_settings)
&gt; source_ddl="create table sc(wd varchar,cnt int,c int,hu
&gt; 
varchar)with('connector'='filesystem','path'='/home/xzh/mypython/bigdata/flink/stream/input/','format'='csv')"
&gt; sink_ddl = "create table print_sink(wd varchar,cnt
&gt; bigint)with('connector'='print')"
&gt;
&gt; # ????????????????
&gt; t_env.execute_sql(source_ddl)
&gt; t_env.execute_sql(sink_ddl)
&gt; t_env.sql_update("insert into print_sink select wd,count(wd) cnt from sc
&gt; group by wd")
&gt; t_env.execute("soc")
&gt;
&gt; # ??????????????????
&gt; t_env.execute_sql(source_ddl)
&gt; t_env.execute_sql(sink_ddl)
&gt; t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc
&gt; group by wd")
&gt; t_env.execute("soc")
&gt;
&gt;
&gt; # ????????????????
&gt; t_env.execute_sql(source_ddl)
&gt; t_env.execute_sql(sink_ddl)
&gt; t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc
&gt; group by wd")
&gt; senv.execute("soc")
&gt;
&gt; &amp;nbsp;&amp;nbsp;
&gt;
&gt;
&gt; &amp;nbsp;

回复