Thanks Guowei and Caizhi.
As Guowei noted, I am using Table API and it seems that it does not support
triggers at the moment. Is there a plan to support custom triggers in Table
API/SQL too?
Also, if I follow Guowei's suggestion, should I use DataStream for other
parts of the aggregate computation too or is there a way to create a
GroupedWindowedTable from the DataStream?

Thanks,

On Thu, Sep 2, 2021 at 9:24 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, John
>
> I agree with Caizhi that you might need to customize a window trigger. But
> there is a small addition, you need to convert Table to DataStream first.
> Then you can customize the trigger of the window. Because as far as I
> know, Table API does not support custom windows yet. For details on how to
> convert, you can refer to [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#datastream-api-integration
> Best,
> Guowei
>
>
> On Fri, Sep 3, 2021 at 10:28 AM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi!
>>
>> You might want to use your custom trigger to achieve this.
>>
>> Tumble windows are using EventTimeTrigger by default. Flink has another
>> built-in trigger called CountTrigger but it only fires for every X records,
>> ignoring the event time completely. You might want to create your own
>> trigger to combine the two, or more specifically, combine
>> EventTimeTrigger#onEventTime and CountTrigger#onElement.
>>
>> For more about custom triggers see
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers
>>
>> John Smith <mylearningemail2...@gmail.com> 于2021年9月3日周五 上午2:00写道:
>>
>>> Hi,
>>>
>>> Sorry if this has been answered previously but I couldn't find any
>>> answer for the question and would appreciate any help.
>>> Context:
>>> Let's say I have a log stream in Kafka where message values have an *id*
>>> field along with a few other fields. I want to count the number of messages
>>> for each id for a tumbling window of* ten minutes *and if the count for
>>> any id in the given window is higher than 5, I want to write the message
>>> into the sink topic. However, I don't want to wait until the end of the 10
>>> minute window to emit the result and want to immediately emit the result
>>> when the count is more than 5 for an id in the window. For example, if I
>>> see 6 messages in the first minute for an id, I want to trigger a write
>>> with the count of 6 in the sink topic immediately and not wait the whole 10
>>> minutes.
>>> The following code does the aggregation but emits the results at the end
>>> of the window. How can I trigger the emitting result earlier?
>>>
>>> final Table resultTable = sourceTable
>>>                 .select( $("id")
>>>                         , $("key")
>>>                 
>>> .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w")   )
>>>                 .groupBy($("w"), $("id"))
>>>                 .select($("w").start().as("WindowStart"), $("id"), 
>>> $("key").count().as("count"))
>>>                 ;
>>>
>>> resultTable.execute().print();
>>>
>>>
>>> Thanks in advance!
>>>
>>>

Reply via email to