Thank you! looking though that helped me figure out what i needed to do!
at first i was nervous because the docs you sent me are using the SQL
approach and i wanted to use the python code i had, but the i noticed the
the example SQL it was passing rowtime from one window to another.
I determined that I could use col("tumble_window").rowtime attribute on
the window column and alias it so I could use it in the next window.
and lastly i originally was using proctime for my first window and i needed
to convert over to using a timestamp and watermark so i could call
".rowtime" i was getting an error when trying to call .rowtime on a
proctime window.
once i switched those 2 things i was able to cascade the window using the
python code (i didn't need to use SQL syntax for this)
Thank you
Nick
On Sun, Mar 17, 2024 at 9:32 PM Xuyang <xyzhong...@163.com> wrote:
> Hi, Nick.
> Can you try `cascading window aggregation` here[1] if it meets your needs?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation
>
>
> --
> Best!
> Xuyang
>
>
> At 2024-03-16 06:21:52, "Nick Hecht" <nick.he...@zainartech.com> wrote:
>
> Hello,
>
> I'm working on a pyflink application (1.15 deployed to aws). I have made a
> few applications and I feel like I'm making good progress, but I have a
> different problem that I think requires that I have multiple stages each
> with their own window.
> I'm not sure how I can properly pass time into my second section. I'll
> share some pseudo code that shows what i'm trying to do.
>
>
> # first i have a preformat stage that does some manipulation of the
> kinesis stream
> formatted_source = t_env.sql_query(
> """SELECT
> JSON_VALUE(`data`, '$.a') AS `a`,
> JSON_VALUE(`data`, '$.b') AS `b`,
> JSON_VALUE(`data`, '$.response.filtered[0]' RETURNING DOUBLE)
> AS `x`,
> JSON_VALUE(`data`, '$.response.filtered[1]' RETURNING DOUBLE)
> AS `y`,
> JSON_VALUE(`data`, '$.response.filtered[2]' RETURNING DOUBLE)
> AS `z`,
> JSON_VALUE(`data`, '$.response.c' RETURNING INT) AS `c`,
> TO_TIMESTAMP_LTZ(CAST(JSON_VALUE(`data`,
> '$.request.tracking.time') AS BIGINT), 3) AS `time`,
> CAST(JSON_VALUE(`data`, '$.response.tracking.time') AS BIGINT)
> AS `v`,
> `proc_time`
> FROM source"""
> )
>
> # next i have a stage that combines records by (a, b and time)
> first_stage = (
> formatted_source.window(
> Tumble.over(lit(2).seconds).on(col("proc_time")).alias(
> "tumble_window")
> )
> .group_by(col("a"), col("b"), col("time"), col("tumble_window"))
> .select(
> col("a"),
> col("b"),
> col("time"),
> first_process(
> col("a"),
> col("b"),
> col("x"),
> col("y"),
> col("z"),
> col("c"),
> col("time"),
> col("v"),
> ).alias("first_out"),
> )
> .where(col("first_out").is_not_null)
> )
>
> # I want to create a second stage here but i have no idea what to
> pass into the window ".on()"
> second_stage = (
> first_stage.window(
> Slide.over(lit(1).hour).every(lit(2).seconds).on(col(
> "proc_time")).alias("slide_window")
> )
> .group_by(col("a"),col("b"),col("slide_window")).select(
> col("a"),
> col("b"),
> second_process(
> col("a"),
> col("b"),
> col("first_out"),
> ).alias("second_out"),
> )
> ).execute().print()
>
> second_stage.wait()
>
> My first stage and sql parts are working fine. I'm not sure how to pass in
> a valid time value into my second stage because I cannot add it to my first
> stage group.
> Any advice would be appreciated!
>
> Thank you!
>
> Nick Hecht
>
>