Hi! It seems that what you want is for *each* row compute the sum of the five previous rows (including the current row). This is the use case of over aggregation, not sliding window. I don't know if table API supports this, but you can see [1] for over aggregation in Flink SQL.
But if you insist on using sliding window, you can use the last_value aggregate function to get the last row in each window [2]. However it seems that table API still doesn't support this, so I would recommend checking out Flink SQL instead. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/over-agg/ [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/#aggregate-functions Long Nguyễn <longnguyen25111...@gmail.com> 于2021年11月2日周二 下午12:47写道: > Hi. Thank you for the clarification. > I updated my code as below and got the desired result. > > result = table.window(Slide.over( > > row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w")) > \ > .group_by(col('w')) \ > .select(call(read_raw_data, col('val')).sum) > > > However, I can only compute some aggregation over those bits and cannot > select the val column individually. If I remove the .sum like this: > > .select(call(read_raw_data, col('val'))) > > I get this error: > > py4j.protocol.Py4JJavaError: An error occurred while calling o90.select. > : org.apache.flink.table.api.ValidationException: Cannot resolve field > [val], input field list:[]. > > Do you know why? > > On Tue, Nov 2, 2021 at 9:25 AM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> You're not only grouping by the over window but also grouping by the >> value, thus only the records with the same value will be in the same group. >> I guess this is no intended. >> >> Long Nguyễn <longnguyen25111...@gmail.com> 于2021年11月2日周二 上午3:05写道: >> >>> I have set up a program that takes bits 0 and 1 from a Kafka topic and >>> then uses Flink to create a sliding count window of size 5. In that window, >>> I'd like to output 1 if there are 3 or more of the bit 1, otherwise, output >>> 0. >>> Currently, I follow the way of calculating the sum of bits in the window. >>> >>> import os >>> from urllib.parse import quote >>> >>> from pyflink.common import Row >>> from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings >>> from pyflink.table.expressions import col, lit, row_interval >>> from pyflink.table.udf import udf >>> from pyflink.table.window import Slide >>> >>> >>> settings = EnvironmentSettings.new_instance( >>> ).in_streaming_mode().use_blink_planner().build() >>> table_env = TableEnvironment.create(settings) >>> >>> kafka_sql_connector_jar_path = >>> quote(os.path.join(os.path.abspath(os.path.dirname(__file__)), >>> >>> 'flink-sql-connector-kafka_2.12-1.13.2.jar')) >>> >>> table_env.get_config() \ >>> .get_configuration() \ >>> .set_string("pipeline.jars", >>> "file://{}".format(kafka_sql_connector_jar_path)) >>> >>> WINDOW_SIZE = 5 >>> WINDOW_SLIDE = 1 >>> THRESHOLD = 3 >>> >>> >>> @udf(result_type=DataTypes.INT()) >>> def read_raw_data(data): >>> return int(data, base=0) >>> >>> >>> def sliding_window_demo(): >>> source_ddl = """ >>> CREATE TABLE input( >>> val BINARY, >>> proctime AS PROCTIME() >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'flink-demo-input', >>> 'properties.bootstrap.servers' = 'localhost:9092', >>> 'properties.group.id' = 'flink-demo', >>> 'scan.startup.mode' = 'earliest-offset', >>> 'format' = 'raw' >>> ) >>> """ >>> >>> temp_ddl = """ >>> CREATE TABLE temp( >>> res INT >>> ) WITH ( >>> 'connector' = 'print' >>> ) >>> """ >>> >>> table_env.execute_sql(source_ddl) >>> table_env.execute_sql(temp_ddl) >>> >>> table = table_env.from_path('input') >>> >>> result = table.window(Slide.over( >>> >>> row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w")) >>> \ >>> .group_by(col('w'), col('val')) \ >>> .select(read_raw_data(col('val')).sum) >>> >>> result.execute_insert('temp').wait() >>> >>> >>> However, when I call the sum expression on those bits in the window, >>> every time I add a 0 to the stream, the result is always 0, and when I add >>> a 1, it always returns 5 (which is the window size). >>> Can you tell me what I'm doing wrong? Thank you so much. >>> >>> -- >>> ------------------------------------------------------------ >>> -------------------------------------------------- >>> Nguyen Dich Long, >>> School of Information and Communication Technology (SoICT - >>> https://www.soict.hust.edu.vn) >>> Hanoi University of Science and Technology (https://www.hust.edu.vn) >>> 601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha >>> Noi, Vietnam >>> Tel: +84 (0)3.54.41.76.76 >>> Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com >>> >> > > -- > ------------------------------------------------------------ > -------------------------------------------------- > Nguyen Dich Long, > School of Information and Communication Technology (SoICT - > https://www.soict.hust.edu.vn) > Hanoi University of Science and Technology (https://www.hust.edu.vn) > 601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha > Noi, Vietnam > Tel: +84 (0)3.54.41.76.76 > Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com >