????????????????????????flink??Kafka??????????mysql????mysql????????????????????????????????????????on
DUPLICATE??????????????????????????????????????????????????????????????linux????????python
*.py????????????????????????????????????????????
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic,
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
trck_id VARCHAR,
score INT
) WITH (
'connector' = 'kafka',
'topic' = 'alarm_test_g',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '10.2.2.73:2181',
'properties.bootstrap.servers' = '10.2.2.73:9092',
'format' = 'json'
)
"""
sink="""
CREATE TABLE g_source_tab (
trck_id VARCHAR,
score INT,
PRIMARY KEY (trck_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',
'table-name' = 'g',
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings =
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(source)
t_env.execute_sql(sink)
table_result1=t_env.execute_sql('''Insert into g_source_tab (`trck_id`,`score`)
VALUES (select
trck_id,score from kafka_source_tab ) ON DUPLICATE KEY UPDATE
score=score+1''')
table_result1.get_job_client().get_job_execution_result().result()