??????????????????????????????delete??????????????????????????????
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE source_tab (
 trck_id VARCHAR,
 score  INT,
PRIMARY KEY (trck_id) NOT ENFORCED
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 
 'table-name' = 'g',   
 'username' = 'root',
 'password' = '123456t',
 'sink.buffer-flush.interval' = '1s'
)
"""
sink="""
CREATE TABLE sink_tab (
 trck_id VARCHAR,
 score  INT,
PRIMARY KEY (trck_id) NOT ENFORCED
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 
 'table-name' = 'g_copy',   
 'username' = 'root',
 'password' = '123456t',
 'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)


t_env.execute_sql(source)
t_env.execute_sql(sink)


t_env.execute_sql('''delete from source_tab where trck_id='aew'  ''')
table_result1=t_env.execute_sql('''insert into  sink_tab select * from 
source_tab ''')
table_result1.get_job_client().get_job_execution_result().result()

回复