I think the issue is not caused by event time interval join, but the aggregation after the join: GROUP BY t.itemId, q.event_time, q.queryId;
In this case, there is still no chance for Flink to determine whether the groups like (itemId, eventtime, queryId) have complete data or not. As a comparison, if you change the grouping key to a window which based only on q.event_time, then the query would emit insert only results. Best, Kurt On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > That's exactly the kind of behaviour I am looking for Kurt ("ignore all > delete messages"). > > As for the data completion, in my above example it is basically an event > time interval join. > With watermarks defined Flink should be able to compute results once in > exactly the same way as for the tumbling window. > > Gyula > > On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <ykt...@gmail.com> wrote: > >> Back to this case, I assume you are expecting something like "ignore all >> delete messages" flag? With this >> flag turned on, Flink will only send insert messages which corresponding >> current correct results to kafka and >> drop all retractions and deletes on the fly. >> >> Best, >> Kurt >> >> >> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <ykt...@gmail.com> wrote: >> >>> > I also don't completely understand at this point why I can write the >>> result of a group, tumble window aggregate to Kafka and not this window >>> join / aggregate. >>> >>> If you are doing a tumble window aggregate with watermark enabled, Flink >>> will only fire a final result for >>> each window at once, no modification or retractions will happen after a >>> window is calculated and fired. >>> But with some other arbitrary aggregations, there is not enough >>> information for Flink to determine whether >>> the data is complete or not, so the framework will keep calculating >>> results when receiving new records and >>> retract earlier results by firing retraction/deletion messages. >>> >>> Best, >>> Kurt >>> >>> >>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gyula.f...@gmail.com> wrote: >>> >>>> Thanks Benoît! >>>> >>>> I can see now how I can implement this myself through the provided sink >>>> interfaces but I was trying to avoid having to write code for this :D >>>> My initial motivation was to see whether we are able to write out any >>>> kind of table to Kafka as a simple stream of "upserts". >>>> >>>> I also don't completely understand at this point why I can write the >>>> result of a group, tumble window aggregate to Kafka and not this window >>>> join / aggregate. >>>> >>>> Cheers, >>>> Gyula >>>> >>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris < >>>> benoit.pa...@centraliens-lille.org> wrote: >>>> >>>>> Hi Gyula, >>>>> >>>>> I'm afraid conversion to see the retractions vs inserts can't be done >>>>> in pure SQL (though I'd love that feature). >>>>> >>>>> You might want to go lower level and implement a >>>>> RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink >>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>> >>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or >>>>> 'retract' signal. >>>>> You can then filter the DataStream accordingly before passing to the >>>>> KafkaTableSink. >>>>> >>>>> Hope this helps. >>>>> >>>>> Best regards >>>>> Benoît >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html >>>>> [2] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink >>>>> [3] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html >>>>> >>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Roman, >>>>>> >>>>>> This is the core logic: >>>>>> >>>>>> CREATE TABLE QueryResult ( >>>>>> queryId BIGINT, >>>>>> itemId STRING, >>>>>> quantity INT >>>>>> ) WITH ( >>>>>> 'connector.type' = 'kafka', >>>>>> 'connector.version' = 'universal', >>>>>> 'connector.topic' = 'query.output.log.1', >>>>>> 'connector.properties.bootstrap.servers' = '<broker>', >>>>>> 'format.type' = 'json' >>>>>> ); >>>>>> >>>>>> INSERT INTO QueryResult >>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity >>>>>> FROM >>>>>> ItemTransactions AS t, >>>>>> Queries AS q >>>>>> WHERE >>>>>> t.itemId = q.itemId AND >>>>>> t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND >>>>>> q.event_time >>>>>> GROUP BY >>>>>> t.itemId, q.event_time, q.queryId; >>>>>> >>>>>> And the error I get is: >>>>>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid >>>>>> SQL update statement. >>>>>> at >>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697) >>>>>> at >>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) >>>>>> at >>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) >>>>>> at >>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548) >>>>>> at >>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310) >>>>>> at java.util.Optional.ifPresent(Optional.java:159) >>>>>> at >>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) >>>>>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) >>>>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) >>>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) >>>>>> Caused by: org.apache.flink.table.api.TableException: >>>>>> AppendStreamTableSink requires that Table has only insert changes. >>>>>> at >>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123) >>>>>> at >>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>> at >>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>> >>>>>> I am wondering what could I do to just simply pump the result updates >>>>>> to Kafka here. >>>>>> >>>>>> Gyula >>>>>> >>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman < >>>>>> khachatryan.ro...@gmail.com> wrote: >>>>>> >>>>>>> Hi Gyula, >>>>>>> >>>>>>> Could you provide the code of your Flink program, the error with >>>>>>> stacktrace and the Flink version? >>>>>>> >>>>>>> Thanks., >>>>>>> Roman >>>>>>> >>>>>>> >>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi All! >>>>>>>> >>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API and >>>>>>>> I am trying to play around with it implementing and running a few >>>>>>>> use-cases. >>>>>>>> >>>>>>>> I have a simple window join + aggregation, grouped on some id that >>>>>>>> I want to write to Kafka but I am hitting the following error: >>>>>>>> >>>>>>>> "AppendStreamTableSink requires that Table has only insert changes." >>>>>>>> >>>>>>>> If I understand correctly the problem here is that since updates >>>>>>>> are possible within a single group, we have a retract stream and the >>>>>>>> Kafka >>>>>>>> Sink cannot handle that. I tried to search for the solution but I >>>>>>>> haven't >>>>>>>> found any satisfying answers. >>>>>>>> >>>>>>>> How can I simply tell the INSERT logic to ignore previous values >>>>>>>> and just always keep sending the latest (like you would see it on the >>>>>>>> CLI >>>>>>>> output). >>>>>>>> >>>>>>>> Thank you! >>>>>>>> Gyula >>>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> Benoît Paris >>>>> Ingénieur Machine Learning Explicable >>>>> Tél : +33 6 60 74 23 00 >>>>> http://benoit.paris >>>>> http://explicable.ml >>>>> >>>>