Actually this use case lead me to start thinking about one question: If watermark is enabled, could we also support GROUP BY event_time instead of forcing user defining a window based on the event_time.
GROUP BY a standalone event_time can also be treated as a special window, which has both start_time and end_time equals to event_time. And when watermark surpass the event_time, we can still get the complete data of such group and do required aggregation and then emit insert only results. That would ease user's burden for not having to define a window when they already have event time and watermark defined. Best, Kurt On Fri, Mar 6, 2020 at 10:26 AM Jark Wu <imj...@gmail.com> wrote: > Hi Gyula, > > Does tumbling 5 seconds for aggregation meet your need? For example: > > INSERT INTO QueryResult > SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5' > SECOND), sum(t.quantity) AS quantity > FROM > ItemTransactions AS t, > Queries AS q > WHERE > t.itemId = q.itemId AND > t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time > GROUP BY > t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND); > > Best, > Jark > > On Thu, 5 Mar 2020 at 23:05, Gyula Fóra <gyula.f...@gmail.com> wrote: > >> I see, maybe I just dont understand how to properly express what I am >> trying to compute. >> >> Basically I want to aggregate the quantities of the transactions that >> happened in the 5 seconds before the query. >> Every query.id belongs to a single query (event_time, itemid) but still >> I have to group :/ >> >> Gyula >> >> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <ykt...@gmail.com> wrote: >> >>> I think the issue is not caused by event time interval join, but the >>> aggregation after the join: >>> GROUP BY t.itemId, q.event_time, q.queryId; >>> >>> In this case, there is still no chance for Flink to determine whether >>> the groups like (itemId, eventtime, queryId) have complete data or not. >>> As a comparison, if you change the grouping key to a window which based >>> only on q.event_time, then the query would emit insert only results. >>> >>> Best, >>> Kurt >>> >>> >>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gyula.f...@gmail.com> wrote: >>> >>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore all >>>> delete messages"). >>>> >>>> As for the data completion, in my above example it is basically an >>>> event time interval join. >>>> With watermarks defined Flink should be able to compute results once in >>>> exactly the same way as for the tumbling window. >>>> >>>> Gyula >>>> >>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <ykt...@gmail.com> wrote: >>>> >>>>> Back to this case, I assume you are expecting something like "ignore >>>>> all delete messages" flag? With this >>>>> flag turned on, Flink will only send insert messages which >>>>> corresponding current correct results to kafka and >>>>> drop all retractions and deletes on the fly. >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> >>>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <ykt...@gmail.com> wrote: >>>>> >>>>>> > I also don't completely understand at this point why I can write >>>>>> the result of a group, tumble window aggregate to Kafka and not this >>>>>> window >>>>>> join / aggregate. >>>>>> >>>>>> If you are doing a tumble window aggregate with watermark enabled, >>>>>> Flink will only fire a final result for >>>>>> each window at once, no modification or retractions will happen after >>>>>> a window is calculated and fired. >>>>>> But with some other arbitrary aggregations, there is not enough >>>>>> information for Flink to determine whether >>>>>> the data is complete or not, so the framework will keep calculating >>>>>> results when receiving new records and >>>>>> retract earlier results by firing retraction/deletion messages. >>>>>> >>>>>> Best, >>>>>> Kurt >>>>>> >>>>>> >>>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks Benoît! >>>>>>> >>>>>>> I can see now how I can implement this myself through the provided >>>>>>> sink interfaces but I was trying to avoid having to write code for this >>>>>>> :D >>>>>>> My initial motivation was to see whether we are able to write out >>>>>>> any kind of table to Kafka as a simple stream of "upserts". >>>>>>> >>>>>>> I also don't completely understand at this point why I can write the >>>>>>> result of a group, tumble window aggregate to Kafka and not this window >>>>>>> join / aggregate. >>>>>>> >>>>>>> Cheers, >>>>>>> Gyula >>>>>>> >>>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris < >>>>>>> benoit.pa...@centraliens-lille.org> wrote: >>>>>>> >>>>>>>> Hi Gyula, >>>>>>>> >>>>>>>> I'm afraid conversion to see the retractions vs inserts can't be >>>>>>>> done in pure SQL (though I'd love that feature). >>>>>>>> >>>>>>>> You might want to go lower level and implement a >>>>>>>> RetractStreamTableSink [1][2] that you would wrap around a >>>>>>>> KafkaTableSink >>>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>> >>>>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' >>>>>>>> or >>>>>>>> 'retract' signal. >>>>>>>> You can then filter the DataStream accordingly before passing to >>>>>>>> the KafkaTableSink. >>>>>>>> >>>>>>>> Hope this helps. >>>>>>>> >>>>>>>> Best regards >>>>>>>> Benoît >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html >>>>>>>> [2] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink >>>>>>>> [3] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html >>>>>>>> >>>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Roman, >>>>>>>>> >>>>>>>>> This is the core logic: >>>>>>>>> >>>>>>>>> CREATE TABLE QueryResult ( >>>>>>>>> queryId BIGINT, >>>>>>>>> itemId STRING, >>>>>>>>> quantity INT >>>>>>>>> ) WITH ( >>>>>>>>> 'connector.type' = 'kafka', >>>>>>>>> 'connector.version' = 'universal', >>>>>>>>> 'connector.topic' = 'query.output.log.1', >>>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>', >>>>>>>>> 'format.type' = 'json' >>>>>>>>> ); >>>>>>>>> >>>>>>>>> INSERT INTO QueryResult >>>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity >>>>>>>>> FROM >>>>>>>>> ItemTransactions AS t, >>>>>>>>> Queries AS q >>>>>>>>> WHERE >>>>>>>>> t.itemId = q.itemId AND >>>>>>>>> t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND >>>>>>>>> q.event_time >>>>>>>>> GROUP BY >>>>>>>>> t.itemId, q.event_time, q.queryId; >>>>>>>>> >>>>>>>>> And the error I get is: >>>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException: >>>>>>>>> Invalid SQL update statement. >>>>>>>>> at >>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310) >>>>>>>>> at java.util.Optional.ifPresent(Optional.java:159) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) >>>>>>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) >>>>>>>>> Caused by: org.apache.flink.table.api.TableException: >>>>>>>>> AppendStreamTableSink requires that Table has only insert changes. >>>>>>>>> at >>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>>>>> at >>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>>> >>>>>>>>> I am wondering what could I do to just simply pump the result >>>>>>>>> updates to Kafka here. >>>>>>>>> >>>>>>>>> Gyula >>>>>>>>> >>>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman < >>>>>>>>> khachatryan.ro...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Gyula, >>>>>>>>>> >>>>>>>>>> Could you provide the code of your Flink program, the error with >>>>>>>>>> stacktrace and the Flink version? >>>>>>>>>> >>>>>>>>>> Thanks., >>>>>>>>>> Roman >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi All! >>>>>>>>>>> >>>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API >>>>>>>>>>> and I am trying to play around with it implementing and running a >>>>>>>>>>> few >>>>>>>>>>> use-cases. >>>>>>>>>>> >>>>>>>>>>> I have a simple window join + aggregation, grouped on some id >>>>>>>>>>> that I want to write to Kafka but I am hitting the following error: >>>>>>>>>>> >>>>>>>>>>> "AppendStreamTableSink requires that Table has only insert >>>>>>>>>>> changes." >>>>>>>>>>> >>>>>>>>>>> If I understand correctly the problem here is that since updates >>>>>>>>>>> are possible within a single group, we have a retract stream and >>>>>>>>>>> the Kafka >>>>>>>>>>> Sink cannot handle that. I tried to search for the solution but I >>>>>>>>>>> haven't >>>>>>>>>>> found any satisfying answers. >>>>>>>>>>> >>>>>>>>>>> How can I simply tell the INSERT logic to ignore previous values >>>>>>>>>>> and just always keep sending the latest (like you would see it on >>>>>>>>>>> the CLI >>>>>>>>>>> output). >>>>>>>>>>> >>>>>>>>>>> Thank you! >>>>>>>>>>> Gyula >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Benoît Paris >>>>>>>> Ingénieur Machine Learning Explicable >>>>>>>> Tél : +33 6 60 74 23 00 >>>>>>>> http://benoit.paris >>>>>>>> http://explicable.ml >>>>>>>> >>>>>>>