Hi Gyula, Does tumbling 5 seconds for aggregation meet your need? For example:
INSERT INTO QueryResult SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5' SECOND), sum(t.quantity) AS quantity FROM ItemTransactions AS t, Queries AS q WHERE t.itemId = q.itemId AND t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time GROUP BY t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND); Best, Jark On Thu, 5 Mar 2020 at 23:05, Gyula Fóra <gyula.f...@gmail.com> wrote: > I see, maybe I just dont understand how to properly express what I am > trying to compute. > > Basically I want to aggregate the quantities of the transactions that > happened in the 5 seconds before the query. > Every query.id belongs to a single query (event_time, itemid) but still I > have to group :/ > > Gyula > > On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <ykt...@gmail.com> wrote: > >> I think the issue is not caused by event time interval join, but the >> aggregation after the join: >> GROUP BY t.itemId, q.event_time, q.queryId; >> >> In this case, there is still no chance for Flink to determine whether the >> groups like (itemId, eventtime, queryId) have complete data or not. >> As a comparison, if you change the grouping key to a window which based >> only on q.event_time, then the query would emit insert only results. >> >> Best, >> Kurt >> >> >> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gyula.f...@gmail.com> wrote: >> >>> That's exactly the kind of behaviour I am looking for Kurt ("ignore all >>> delete messages"). >>> >>> As for the data completion, in my above example it is basically an event >>> time interval join. >>> With watermarks defined Flink should be able to compute results once in >>> exactly the same way as for the tumbling window. >>> >>> Gyula >>> >>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <ykt...@gmail.com> wrote: >>> >>>> Back to this case, I assume you are expecting something like "ignore >>>> all delete messages" flag? With this >>>> flag turned on, Flink will only send insert messages which >>>> corresponding current correct results to kafka and >>>> drop all retractions and deletes on the fly. >>>> >>>> Best, >>>> Kurt >>>> >>>> >>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <ykt...@gmail.com> wrote: >>>> >>>>> > I also don't completely understand at this point why I can write the >>>>> result of a group, tumble window aggregate to Kafka and not this window >>>>> join / aggregate. >>>>> >>>>> If you are doing a tumble window aggregate with watermark enabled, >>>>> Flink will only fire a final result for >>>>> each window at once, no modification or retractions will happen after >>>>> a window is calculated and fired. >>>>> But with some other arbitrary aggregations, there is not enough >>>>> information for Flink to determine whether >>>>> the data is complete or not, so the framework will keep calculating >>>>> results when receiving new records and >>>>> retract earlier results by firing retraction/deletion messages. >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> >>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gyula.f...@gmail.com> >>>>> wrote: >>>>> >>>>>> Thanks Benoît! >>>>>> >>>>>> I can see now how I can implement this myself through the provided >>>>>> sink interfaces but I was trying to avoid having to write code for this >>>>>> :D >>>>>> My initial motivation was to see whether we are able to write out any >>>>>> kind of table to Kafka as a simple stream of "upserts". >>>>>> >>>>>> I also don't completely understand at this point why I can write the >>>>>> result of a group, tumble window aggregate to Kafka and not this window >>>>>> join / aggregate. >>>>>> >>>>>> Cheers, >>>>>> Gyula >>>>>> >>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris < >>>>>> benoit.pa...@centraliens-lille.org> wrote: >>>>>> >>>>>>> Hi Gyula, >>>>>>> >>>>>>> I'm afraid conversion to see the retractions vs inserts can't be >>>>>>> done in pure SQL (though I'd love that feature). >>>>>>> >>>>>>> You might want to go lower level and implement a >>>>>>> RetractStreamTableSink [1][2] that you would wrap around a >>>>>>> KafkaTableSink >>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>> >>>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or >>>>>>> 'retract' signal. >>>>>>> You can then filter the DataStream accordingly before passing to the >>>>>>> KafkaTableSink. >>>>>>> >>>>>>> Hope this helps. >>>>>>> >>>>>>> Best regards >>>>>>> Benoît >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html >>>>>>> [2] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink >>>>>>> [3] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html >>>>>>> >>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Roman, >>>>>>>> >>>>>>>> This is the core logic: >>>>>>>> >>>>>>>> CREATE TABLE QueryResult ( >>>>>>>> queryId BIGINT, >>>>>>>> itemId STRING, >>>>>>>> quantity INT >>>>>>>> ) WITH ( >>>>>>>> 'connector.type' = 'kafka', >>>>>>>> 'connector.version' = 'universal', >>>>>>>> 'connector.topic' = 'query.output.log.1', >>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>', >>>>>>>> 'format.type' = 'json' >>>>>>>> ); >>>>>>>> >>>>>>>> INSERT INTO QueryResult >>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity >>>>>>>> FROM >>>>>>>> ItemTransactions AS t, >>>>>>>> Queries AS q >>>>>>>> WHERE >>>>>>>> t.itemId = q.itemId AND >>>>>>>> t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND >>>>>>>> q.event_time >>>>>>>> GROUP BY >>>>>>>> t.itemId, q.event_time, q.queryId; >>>>>>>> >>>>>>>> And the error I get is: >>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException: >>>>>>>> Invalid SQL update statement. >>>>>>>> at >>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697) >>>>>>>> at >>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) >>>>>>>> at >>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) >>>>>>>> at >>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548) >>>>>>>> at >>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310) >>>>>>>> at java.util.Optional.ifPresent(Optional.java:159) >>>>>>>> at >>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) >>>>>>>> at >>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) >>>>>>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) >>>>>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) >>>>>>>> Caused by: org.apache.flink.table.api.TableException: >>>>>>>> AppendStreamTableSink requires that Table has only insert changes. >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123) >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>>>>>> at >>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>>>>>> >>>>>>>> I am wondering what could I do to just simply pump the result >>>>>>>> updates to Kafka here. >>>>>>>> >>>>>>>> Gyula >>>>>>>> >>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman < >>>>>>>> khachatryan.ro...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Gyula, >>>>>>>>> >>>>>>>>> Could you provide the code of your Flink program, the error with >>>>>>>>> stacktrace and the Flink version? >>>>>>>>> >>>>>>>>> Thanks., >>>>>>>>> Roman >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi All! >>>>>>>>>> >>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API >>>>>>>>>> and I am trying to play around with it implementing and running a few >>>>>>>>>> use-cases. >>>>>>>>>> >>>>>>>>>> I have a simple window join + aggregation, grouped on some id >>>>>>>>>> that I want to write to Kafka but I am hitting the following error: >>>>>>>>>> >>>>>>>>>> "AppendStreamTableSink requires that Table has only insert >>>>>>>>>> changes." >>>>>>>>>> >>>>>>>>>> If I understand correctly the problem here is that since updates >>>>>>>>>> are possible within a single group, we have a retract stream and the >>>>>>>>>> Kafka >>>>>>>>>> Sink cannot handle that. I tried to search for the solution but I >>>>>>>>>> haven't >>>>>>>>>> found any satisfying answers. >>>>>>>>>> >>>>>>>>>> How can I simply tell the INSERT logic to ignore previous values >>>>>>>>>> and just always keep sending the latest (like you would see it on >>>>>>>>>> the CLI >>>>>>>>>> output). >>>>>>>>>> >>>>>>>>>> Thank you! >>>>>>>>>> Gyula >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Benoît Paris >>>>>>> Ingénieur Machine Learning Explicable >>>>>>> Tél : +33 6 60 74 23 00 >>>>>>> http://benoit.paris >>>>>>> http://explicable.ml >>>>>>> >>>>>>