????????????????????source????????????????????sink1????????????1??2????????????add_insert_sql??????????????source??????????????sink2????????????????1??2??
??????????????1??2??1??2
------------------ ???????? ------------------
??????:
"Dian Fu"
<[email protected]>;
????????: 2021??3??12??(??????) ????10:24
??????: "user-zh"<[email protected]>;"??????"<[email protected]>;
????: Re: ????statement????????????
????????????????????????????????????1??2??1??2????
??????????????????????????????????????????1????????????????sink1??????????sink2????????????2??1????????????????????????????2??2
On Mon, Mar 8, 2021 at 9:42 AM ?????? <[email protected]> wrote:
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view("simple_source", table)
table_env.execute_sql("""
CREATE TABLE first_sink_table (
id BIGINT,
data VARCHAR
) WITH (
'connector' = 'print'
)
""")
table_env.execute_sql("""
CREATE TABLE second_sink_table (
id BIGINT,
data VARCHAR
) WITH (
'connector' = 'print'
)
""")
# ????????statement????
statement_set = table_env.create_statement_set()
# ????TABLE API ??table??????first_sink_table??????
statement_set.add_insert("first_sink_table", table)
# ????SQL??table??????second_sink_table??????
statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM
simple_source")
# ????????
statement_set.execute().wait()????????????????????????????????????????????????????????????1??2??1??2????????1??1??2??2??4&gt;
+I(1,Hi)
4&gt; +I(1,Hi)
4&gt; +I(2,Hello)
4&gt; +I(2,Hello)