Hey! I have a use case in which im grouping a stream by session id - so far pretty standard, note that i need to do it through SQL and not by the table api. In my use case i have 2 trigger conditions though - while one is time (session inactivity) the other is based on a specific event marked as a "last" event. AFAIK SQL does not support custom triggers - so what i end up doing is doing group by in the SQL - then converting the result to a stream along with a boolean field that marks whether at least one of the events was the end event - then adding my custom trigger on top of it. It looks something like this:
Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent), sessionId, count(*) FROM source Group By sessionId"); tableEnv.toRetractStream(result, Row.class, streamQueryConfig) .filter(tuple -> tuple.f0) .map(...) .returns(...) .keyBy("sessionId") .window(EventTimeSessionWindows.withGap(Time.hours(4))) .trigger(new SessionEndedByTimeOrEndTrigger()) .process(...take last element from the group by result..) This seems like a weird work around to, isn't it? my window is basically of the SQL result rather than on the source stream. Ideally i would keyby the sessionId before running the SQL but then a) would I need to register a table per key? b) would i be able to use the custom trigger per window? basically i want to group by session id and have a window for every session that supports both time and custom trigger. Assuming i need to use SQL (reason is the query is dynamically loaded), is there a better solution for it? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/