Hi Jing Zhang,
Thanks for the reply! My current implementation is like this:
tableEnv.executeSql(
"CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end
BIGINT, num_select BIGINT) WITH ('connector' = 'kafka', 'scan.startup.mode'
= 'latest-offset')"
)
tableEnv.executeSql("""
|INSERT INTO ItemDesc
|SELECT
| item_id,
| channel_id,
| CAST(HOP_END(proctime, INTERVAL '15' SECOND, INTERVAL '60'
SECOND) AS BIGINT) AS window_end,
| COUNT(*) as num_select
|FROM mytable
|GROUP BY item_id, channel_id, HOP(proctime, INTERVAL '15' SECOND,
INTERVAL '60' SECOND)
""".stripMargin)
val result = tableEnv.sqlQuery("""
|SELECT roku_content_id, window_end, channel_id, num_select, row_num
|FROM (
| SELECT *
| ROW_NUMBER() OVER (PARTITION BY channel_id ORDER BY num_select
DESC) as row_num
| FROM ItemDesc)
|WHERE row_num <= 20
|""".stripMargin)
But I got the error:
org.apache.flink.table.api.ValidationException: Unable to create a sink for
writing table 'default_catalog.default_database.ItemDesc'.
The table ItemDesc is an intermediate table. If I put everything in a
single query, that doesn't work. If I create a table like this:
tableEnv.executeSql(
"CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end
BIGINT, num_select BIGINT) "
)
This also doesn't work.
Thanks,
Jing
On Thu, Dec 23, 2021 at 1:20 AM Jing Zhang <[email protected]> wrote:
> Hi Jing,
> In fact, I agree with you to use TopN [2] instead of Window TopN[1] by
> normalizing
> time into a unit with 5 minute, and add it to be one of partition keys.
> Please note two points when use TopN
> 1. the result is an update stream instead of append stream, which means
> the result sent might be retracted later
> 2. you could take care of state clean.
>
> However you said you meet with a problem when use TopN. I didn't
> understand your question here. Would you please explain a little more?
> > > I saw the one possibility is to create a table and insert the
> aggregated data to the table, then do top N like [1]. However, I cannot
> make this approach work because I need to specify the connector for this
> table and I may also need to create another kafka topic for this.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/
> topn/
>
> Jing Zhang <[email protected]> 于2021年12月23日周四 17:04写道:
>
>> Hi Jing,
>> I'm afraid there is no possible to Window TopN in SQL on 1.12 version
>> because window TopN is introduced since 1.13.
>>
>> > I saw the one possibility is to create a table and insert the
>> aggregated data to the table, then do top N like [1]. However, I cannot
>> make this approach work because I need to specify the connector for this
>> table and I may also need to create another kafka topic for this.
>> I didn't understand you here.
>> Do you mean you need a sink to store output data of TopN? However, you
>> still need a sink to store the output even you use Window TopN.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
>>
>> Best,
>> Jing Zhang
>>
>>
>> Jing <[email protected]> 于2021年12月23日周四 16:12写道:
>>
>>> Hi, Flink community,
>>>
>>> Is there any existing code I can use to get the window top N with Flink
>>> 1.12? I saw the one possibility is to create a table and insert the
>>> aggregated data to the table, then do top N like [1]. However, I cannot
>>> make this approach work because I need to specify the connector for this
>>> table and I may also need to create another kafka topic for this. Is there
>>> any existing way to do the Window Top N with Flink 1.12?
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
>>>
>>>
>>> Thanks,
>>> Jing
>>>
>>