Hi!

It seems that what you want is for *each* row compute the sum of the five
previous rows (including the current row). This is the use case of over
aggregation, not sliding window. I don't know if table API supports this,
but you can see [1] for over aggregation in Flink SQL.

But if you insist on using sliding window, you can use the last_value
aggregate function to get the last row in each window [2]. However it seems
that table API still doesn't support this, so I would recommend checking
out Flink SQL instead.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/over-agg/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/#aggregate-functions

Long Nguyễn <longnguyen25111...@gmail.com> 于2021年11月2日周二 下午12:47写道:

> Hi. Thank you for the clarification.
> I updated my code as below and got the desired result.
>
> result = table.window(Slide.over(
>         
> row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w"))
>  \
>         .group_by(col('w')) \
>         .select(call(read_raw_data, col('val')).sum)
>
>
> However, I can only compute some aggregation over those bits and cannot
> select the val column individually. If I remove the .sum like this:
>
> .select(call(read_raw_data, col('val')))
>
> I get this error:
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o90.select.
> : org.apache.flink.table.api.ValidationException: Cannot resolve field
> [val], input field list:[].
>
> Do you know why?
>
> On Tue, Nov 2, 2021 at 9:25 AM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi!
>>
>> You're not only grouping by the over window but also grouping by the
>> value, thus only the records with the same value will be in the same group.
>> I guess this is no intended.
>>
>> Long Nguyễn <longnguyen25111...@gmail.com> 于2021年11月2日周二 上午3:05写道:
>>
>>> I have set up a program that takes bits 0 and 1 from a Kafka topic and
>>> then uses Flink to create a sliding count window of size 5. In that window,
>>> I'd like to output 1 if there are 3 or more of the bit 1, otherwise, output
>>> 0.
>>> Currently, I follow the way of calculating the sum of bits in the window.
>>>
>>> import os
>>> from urllib.parse import quote
>>>
>>> from pyflink.common import Row
>>> from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
>>> from pyflink.table.expressions import col, lit, row_interval
>>> from pyflink.table.udf import udf
>>> from pyflink.table.window import Slide
>>>
>>>
>>> settings = EnvironmentSettings.new_instance(
>>> ).in_streaming_mode().use_blink_planner().build()
>>> table_env = TableEnvironment.create(settings)
>>>
>>> kafka_sql_connector_jar_path = 
>>> quote(os.path.join(os.path.abspath(os.path.dirname(__file__)),
>>>                                                   
>>> 'flink-sql-connector-kafka_2.12-1.13.2.jar'))
>>>
>>> table_env.get_config() \
>>>     .get_configuration() \
>>>     .set_string("pipeline.jars", 
>>> "file://{}".format(kafka_sql_connector_jar_path))
>>>
>>> WINDOW_SIZE = 5
>>> WINDOW_SLIDE = 1
>>> THRESHOLD = 3
>>>
>>>
>>> @udf(result_type=DataTypes.INT())
>>> def read_raw_data(data):
>>>     return int(data, base=0)
>>>
>>>
>>> def sliding_window_demo():
>>>     source_ddl = """
>>>             CREATE TABLE input(
>>>                 val BINARY,
>>>                 proctime AS PROCTIME()
>>>             ) WITH (
>>>               'connector' = 'kafka',
>>>               'topic' = 'flink-demo-input',
>>>               'properties.bootstrap.servers' = 'localhost:9092',
>>>               'properties.group.id' = 'flink-demo',
>>>               'scan.startup.mode' = 'earliest-offset',
>>>               'format' = 'raw'
>>>             )
>>>             """
>>>
>>>     temp_ddl = """
>>>             CREATE TABLE temp(
>>>                 res INT
>>>             ) WITH (
>>>               'connector' = 'print'
>>>             )
>>>             """
>>>
>>>     table_env.execute_sql(source_ddl)
>>>     table_env.execute_sql(temp_ddl)
>>>
>>>     table = table_env.from_path('input')
>>>
>>>     result = table.window(Slide.over(
>>>         
>>> row_interval(WINDOW_SIZE)).every(row_interval(WINDOW_SLIDE)).on(col('proctime')).alias("w"))
>>>  \
>>>         .group_by(col('w'), col('val')) \
>>>         .select(read_raw_data(col('val')).sum)
>>>
>>>     result.execute_insert('temp').wait()
>>>
>>>
>>> However, when I call the sum expression on those bits in the window,
>>> every time I add a 0 to the stream, the result is always 0, and when I add
>>> a 1, it always returns 5 (which is the window size).
>>> Can you tell me what I'm doing wrong? Thank you so much.
>>>
>>> --
>>> ------------------------------------------------------------
>>> --------------------------------------------------
>>> Nguyen Dich Long,
>>> School of Information and Communication Technology (SoICT -
>>> https://www.soict.hust.edu.vn)
>>> Hanoi University of Science and Technology (https://www.hust.edu.vn)
>>> 601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha
>>> Noi, Vietnam
>>> Tel: +84 (0)3.54.41.76.76
>>> Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com
>>>
>>
>
> --
> ------------------------------------------------------------
> --------------------------------------------------
> Nguyen Dich Long,
> School of Information and Communication Technology (SoICT -
> https://www.soict.hust.edu.vn)
> Hanoi University of Science and Technology (https://www.hust.edu.vn)
> 601, B1 Building - No 1, Dai Co Viet Street, Hai Ba Trung District, Ha
> Noi, Vietnam
> Tel: +84 (0)3.54.41.76.76
> Email: long.nd162...@sis.hust.edu.vn; longnguyen25111...@gmail.com
>

Reply via email to