循环发送数据的时候,需要确保数据的时间戳是增长的,否则可能会被作为迟到数据丢弃。
On Mon, Dec 6, 2021 at 2:11 PM su wenwen <[email protected]> wrote: > hi,zebing! > > You can go to localhost:8081 and see if it works. Also, data written to > Kafka should be in double quotes. > example: > {"amount": 500, "course_code": "97iscn4g8k","event_time":"2021-12-01 > 17:54:41"} > > Window aggregation needs to pay attention to the progress of watermark. > ________________________________ > 发件人: duanzebing <[email protected]> > 发送时间: 2021年12月4日 18:14 > 收件人: [email protected] <[email protected]> > 主题: apache-flink - 在pyflink1.14中聚合函数无法输出的问题 > > 大家好: > > 我是一个pyflink初学者,遇到一个flinksql中聚合后无法sink的问题,但是我的代码完全按照官方文档进行,到最后依然无法解决,只能像各位求助。 > 我的 source语句为: > CREATE TABLE random_source ( > amount int, > course_code string, > `event_time` TIMESTAMP(3) , > WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'ding_broadcast_vip_order_test', > 'properties.bootstrap.servers' = '192.168.100.135:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ) > sink为: > CREATE TABLE print_sink ( > amount BIGINT, > course_code string, > `event_time_start` TIMESTAMP(3), > `event_time_end` TIMESTAMP(3) > ) WITH ( > 'connector' = 'print' > ) > 而我尝试过如下两种方式聚合,可是都遇到同样的问题: > insert into print_sink select > count(amount) as amount > ,course_code > ,tumble_start(event_time, interval '1' minute) as > event_time_start > ,tumble_end(event_time, interval '1' minute) as > event_time_end > from random_source > group by tumble(event_time, interval '1' > minute),course_code > 以及: > insert into print_sink select > sum(amount),course_code,window_start,window_end > from table( > tumble( > table random_source, > descriptor(event_time), > interval '1' minutes > ) > ) group by window_start, window_end,,course_code > 我的kafka数据结构为 > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 > 17:44:40’} > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 > 17:45:40’} > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 > 17:46:40’} > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 > 17:47:40’} > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 > 17:48:40’} > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 > 17:49:40’} > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 > 17:50:40’} > {'amount': 500, 'course_code': '97iscn4g8k','event_time':'2021-12-01 > 17:51:40’} > > > 在我的预期中他应该是按照每分钟作为窗口,然后按照course_code进行聚合,可结果是sink没有任何输出,仿佛没有接受到任何数据。但是在为仅仅是普通的select > * from random_source时,sink又能够正常输出, > 我实在无能为力,请各位不吝赐教:下面附上我的出问题的代码。 > 非常感谢各位 > > from pyflink.table import EnvironmentSettings, StreamTableEnvironment, > TableEnvironment > > > def hello_world(): > """ > 从随机Source读取数据,然后直接利用PrintSink输出。 > """ > settings = EnvironmentSettings.in_streaming_mode() > # env = StreamExecutionEnvironment.get_execution_environment() > # t_env = StreamTableEnvironment.create(s_env) > t_env = TableEnvironment.create(settings) > t_env.get_config().get_configuration().set_string("pipeline.jars", > > "file:///Users/duanzebing/Downloads/flink-sql-connector-kafka_2.11-1.14.0.jar") > > source_ddl = """ > CREATE TABLE random_source ( > amount int, > course_code string, > `event_time` TIMESTAMP(3) , > WATERMARK FOR event_time AS event_time - > INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'ding_broadcast_vip_order_test', > 'properties.bootstrap.servers' = ' > 192.168.100.135:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ) > """ > # f_sequence_ as PROCTIME() # METADATA FROM > 'timestamp' > # 注册source > t_env.execute_sql(source_ddl) > # 数据提取 > > """ > sink_ddl = """ > CREATE TABLE print_sink ( > amount BIGINT, > course_code string, > `event_time_start` TIMESTAMP(3), > `event_time_end` TIMESTAMP(3) > ) WITH ( > 'connector' = 'print' > ) > """ > # 注册sink > t_env.execute_sql(sink_ddl) > > t_env.execute_sql('''insert into print_sink select > count(amount) as amount > ,course_code > ,tumble_start(event_time, interval '1' minute) > as event_time_start > ,tumble_end(event_time, interval '1' minute) > as event_time_end > from random_source > group by tumble(event_time, interval '1' > minute),course_code > ''').wait() > > # tab.insert_into('print_sink') > # tab.execute_insert('print_sink', overwrite=False).wait() > # 执行作业 > # t_env.execute("Flink Hello World") > > > if __name__ == '__main__': > hello_world() > > >
