python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。
代码如下,使用了MATCH_RECOGNIZE:

    s_env = StreamExecutionEnvironment.get_execution_environment()
    b_s_settings =
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
    st_env = StreamTableEnvironment.create(s_env,
environment_settings=b_s_settings)
    configuration = st_env.get_config().get_configuration()
    configuration.set_string("taskmanager.memory.task.off-heap.size",
"500m")

    s_env.set_parallelism(1)

    kafka_source = """CREATE TABLE source (
         flow_name STRING,
         flow_id STRING,
         component STRING,
         filename STRING,
         event_time TIMESTAMP(3),
         WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
        ) WITH (
         'connector' = 'kafka',
         'topic' = 'cep',
         'properties.bootstrap.servers' = 'localhost:9092',
         'format' = 'json',
         'scan.startup.mode' = 'latest-offset'
        )"""



    postgres_sink = """
        CREATE TABLE cep_result (
        `filename`         STRING,
        `start_tstamp`          TIMESTAMP(3),
        `end_tstamp`           TIMESTAMP(3)
        ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres',
        'connector.table' = 'cep_result',
        'connector.driver' = 'org.postgresql.Driver',
        'connector.username' = 'postgres',
        'connector.password' = 'my_password',
        'connector.write.flush.max-rows' = '1'
        )
        """

    st_env.sql_update(kafka_source)
    st_env.sql_update(postgres_sink)

    postgres_sink_sql = '''
        INSERT INTO cep_result
        SELECT *
        FROM source
            MATCH_RECOGNIZE (
                PARTITION BY filename
                ORDER BY event_time
                MEASURES
                    (A.event_time) AS start_tstamp,
                    (D.event_time) AS end_tstamp
                ONE ROW PER MATCH
                AFTER MATCH SKIP PAST LAST ROW
                PATTERN (A B C D)
                DEFINE
                    A AS component = 'XXX',
                    B AS component = 'YYY',
                    C AS component = 'ZZZ',
                    D AS component = 'WWW'
            ) MR
    '''

    sql_result = st_env.execute_sql(postgres_sink_sql)



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复