>
>
> 大家好:
>
> 我是一个pyflink初学者,遇到一个flinksql中聚合后无法sink的问题,但是我的代码完全按照官方文档进行,到最后依然无法解决,只能像各位求助。
> 我的 source语句为:
> CREATE TABLE random_source (
> amount int,
> course_code string,
> `event_time` TIMESTAMP(3) ,
> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'ding_broadcast_vip_order_test',
> 'properties.bootstrap.servers' = '192.168.100.135:9092',
> 'properties.group.id <http://properties.group.id/>' = 'testGroup',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'json'
> )
> sink为:
> CREATE TABLE print_sink (
> amount BIGINT,
> course_code string,
> `event_time_start` TIMESTAMP(3),
> `event_time_end` TIMESTAMP(3)
> ) WITH (
> 'connector' = 'print'
> )
> 而我尝试过如下两种方式聚合,可是都遇到同样的问题:
> insert into print_sink select
> count(amount) as amount
> ,course_code
> ,tumble_start(event_time, interval '1' minute) as
> event_time_start
> ,tumble_end(event_time, interval '1' minute) as
> event_time_end
> from random_source
> group by tumble(event_time, interval '1'
> minute),course_code
> 以及:
> insert into print_sink select
> sum(amount),course_code,window_start,window_end
> from table(
> tumble(
> table random_source,
> descriptor(event_time),
> interval '1' minutes
> )
> ) group by window_start, window_end,,course_code
> 我的kafka数据结构为
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:44:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:45:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:46:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:47:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:48:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:49:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:50:40’}
> {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01
> 17:51:40’}
>
> 在我的预期中他应该是按照每分钟作为窗口,然后按照course_code进行聚合,可结果是sink没有任何输出,仿佛没有接受到任何数据。但是在为仅仅是普通的select
> * from random_source时,sink又能够正常输出,
> 我实在无能为力,请各位不吝赐教:下面附上我的出问题的代码。
> 非常感谢各位
>
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment,
> TableEnvironment
>
>
> def hello_world():
> """
> 从随机Source读取数据,然后直接利用PrintSink输出。
> """
> settings = EnvironmentSettings.in_streaming_mode()
> # env = StreamExecutionEnvironment.get_execution_environment()
> # t_env = StreamTableEnvironment.create(s_env)
> t_env = TableEnvironment.create(settings)
> t_env.get_config().get_configuration().set_string("pipeline.jars",
>
> "file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar
> <file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar>")
>
> source_ddl = """
> CREATE TABLE random_source (
> amount int,
> course_code string,
> `event_time` TIMESTAMP(3) ,
> WATERMARK FOR event_time AS event_time - INTERVAL
> '5' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'ding_broadcast_vip_order_test',
> 'properties.bootstrap.servers' =
> '192.168.100.135:9092',
> 'properties.group.id <http://properties.group.id/>'
> = 'testGroup',
> 'scan.startup.mode' = 'latest-offset',
> 'format' = 'json'
> )
> """
> # f_sequence_ as PROCTIME() # METADATA FROM
> 'timestamp'
> # 注册source
> t_env.execute_sql(source_ddl)
> # 数据提取
>
> """
> sink_ddl = """
> CREATE TABLE print_sink (
> amount BIGINT,
> course_code string,
> `event_time_start` TIMESTAMP(3),
> `event_time_end` TIMESTAMP(3)
> ) WITH (
> 'connector' = 'print'
> )
> """
> # 注册sink
> t_env.execute_sql(sink_ddl)
>
> t_env.execute_sql('''insert into print_sink select
> count(amount) as amount
> ,course_code
> ,tumble_start(event_time, interval '1' minute) as
> event_time_start
> ,tumble_end(event_time, interval '1' minute) as
> event_time_end
> from random_source
> group by tumble(event_time, interval '1'
> minute),course_code
> ''').wait()
>
> # tab.insert_into('print_sink')
> # tab.execute_insert('print_sink', overwrite=False).wait()
> # 执行作业
> # t_env.execute("Flink Hello World")
>
>
> if __name__ == '__main__':
> hello_world()
>
>