I see, maybe I just dont understand how to properly express what I am trying to compute.
Basically I want to aggregate the quantities of the transactions that happened in the 5 seconds before the query. Every query.id belongs to a single query (event_time, itemid) but still I have to group :/ Gyula On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <ykt...@gmail.com> wrote: > I think the issue is not caused by event time interval join, but the > aggregation after the join: > GROUP BY t.itemId, q.event_time, q.queryId; > > In this case, there is still no chance for Flink to determine whether the > groups like (itemId, eventtime, queryId) have complete data or not. > As a comparison, if you change the grouping key to a window which based > only on q.event_time, then the query would emit insert only results. > > Best, > Kurt > > > On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > >> That's exactly the kind of behaviour I am looking for Kurt ("ignore all >> delete messages"). >> >> As for the data completion, in my above example it is basically an event >> time interval join. >> With watermarks defined Flink should be able to compute results once in >> exactly the same way as for the tumbling window. >> >> Gyula >> >> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <ykt...@gmail.com> wrote: >> >>> Back to this case, I assume you are expecting something like "ignore all >>> delete messages" flag? With this >>> flag turned on, Flink will only send insert messages which corresponding >>> current correct results to kafka and >>> drop all retractions and deletes on the fly. >>> >>> Best, >>> Kurt >>> >>> >>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <ykt...@gmail.com> wrote: >>> >>>> > I also don't completely understand at this point why I can write the >>>> result of a group, tumble window aggregate to Kafka and not this window >>>> join / aggregate. >>>> >>>> If you are doing a tumble window aggregate with watermark enabled, >>>> Flink will only fire a final result for >>>> each window at once, no modification or retractions will happen after a >>>> window is calculated and fired. >>>> But with some other arbitrary aggregations, there is not enough >>>> information for Flink to determine whether >>>> the data is complete or not, so the framework will keep calculating >>>> results when receiving new records and >>>> retract earlier results by firing retraction/deletion messages. >>>> >>>> Best, >>>> Kurt >>>> >>>> >>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gyula.f...@gmail.com> >>>> wrote: >>>> >>>>> Thanks Benoît! >>>>> >>>>> I can see now how I can implement this myself through the provided >>>>> sink interfaces but I was trying to avoid having to write code for this :D >>>>> My initial motivation was to see whether we are able to write out any >>>>> kind of table to Kafka as a simple stream of "upserts". >>>>> >>>>> I also don't completely understand at this point why I can write the >>>>> result of a group, tumble window aggregate to Kafka and not this window >>>>> join / aggregate. >>>>> >>>>> Cheers, >>>>> Gyula >>>>> >>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris < >>>>> benoit.pa...@centraliens-lille.org> wrote: >>>>> >>>>>> Hi Gyula, >>>>>> >>>>>> I'm afraid conversion to see the retractions vs inserts can't be done >>>>>> in pure SQL (though I'd love that feature). >>>>>> >>>>>> You might want to go lower level and implement a >>>>>> RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink >>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>> >>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or >>>>>> 'retract' signal. >>>>>> You can then filter the DataStream accordingly before passing to the >>>>>> KafkaTableSink. >>>>>> >>>>>> Hope this helps. >>>>>> >>>>>> Best regards >>>>>> Benoît >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html >>>>>> [2] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink >>>>>> [3] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html >>>>>> >>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Roman, >>>>>>> >>>>>>> This is the core logic: >>>>>>> >>>>>>> CREATE TABLE QueryResult ( >>>>>>> queryId BIGINT, >>>>>>> itemId STRING, >>>>>>> quantity INT >>>>>>> ) WITH ( >>>>>>> 'connector.type' = 'kafka', >>>>>>> 'connector.version' = 'universal', >>>>>>> 'connector.topic' = 'query.output.log.1', >>>>>>> 'connector.properties.bootstrap.servers' = '<broker>', >>>>>>> 'format.type' = 'json' >>>>>>> ); >>>>>>> >>>>>>> INSERT INTO QueryResult >>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity >>>>>>> FROM >>>>>>> ItemTransactions AS t, >>>>>>> Queries AS q >>>>>>> WHERE >>>>>>> t.itemId = q.itemId AND >>>>>>> t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND >>>>>>> q.event_time >>>>>>> GROUP BY >>>>>>> t.itemId, q.event_time, q.queryId; >>>>>>> >>>>>>> And the error I get is: >>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid >>>>>>> SQL update statement. >>>>>>> at >>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697) >>>>>>> at >>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) >>>>>>> at >>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) >>>>>>> at >>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548) >>>>>>> at >>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310) >>>>>>> at java.util.Optional.ifPresent(Optional.java:159) >>>>>>> at >>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) >>>>>>> at >>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) >>>>>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) >>>>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) >>>>>>> Caused by: org.apache.flink.table.api.TableException: >>>>>>> AppendStreamTableSink requires that Table has only insert changes. >>>>>>> at >>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123) >>>>>>> at >>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>>> at >>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>> >>>>>>> I am wondering what could I do to just simply pump the result >>>>>>> updates to Kafka here. >>>>>>> >>>>>>> Gyula >>>>>>> >>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman < >>>>>>> khachatryan.ro...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Gyula, >>>>>>>> >>>>>>>> Could you provide the code of your Flink program, the error with >>>>>>>> stacktrace and the Flink version? >>>>>>>> >>>>>>>> Thanks., >>>>>>>> Roman >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi All! >>>>>>>>> >>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API >>>>>>>>> and I am trying to play around with it implementing and running a few >>>>>>>>> use-cases. >>>>>>>>> >>>>>>>>> I have a simple window join + aggregation, grouped on some id that >>>>>>>>> I want to write to Kafka but I am hitting the following error: >>>>>>>>> >>>>>>>>> "AppendStreamTableSink requires that Table has only insert >>>>>>>>> changes." >>>>>>>>> >>>>>>>>> If I understand correctly the problem here is that since updates >>>>>>>>> are possible within a single group, we have a retract stream and the >>>>>>>>> Kafka >>>>>>>>> Sink cannot handle that. I tried to search for the solution but I >>>>>>>>> haven't >>>>>>>>> found any satisfying answers. >>>>>>>>> >>>>>>>>> How can I simply tell the INSERT logic to ignore previous values >>>>>>>>> and just always keep sending the latest (like you would see it on the >>>>>>>>> CLI >>>>>>>>> output). >>>>>>>>> >>>>>>>>> Thank you! >>>>>>>>> Gyula >>>>>>>>> >>>>>>>> >>>>>> >>>>>> -- >>>>>> Benoît Paris >>>>>> Ingénieur Machine Learning Explicable >>>>>> Tél : +33 6 60 74 23 00 >>>>>> http://benoit.paris >>>>>> http://explicable.ml >>>>>> >>>>>