Actually this use case lead me to start thinking about one question:
If watermark is enabled, could we also support GROUP BY event_time instead
of forcing
user defining a window based on the event_time.

GROUP BY a standalone event_time can also be treated as a special window,
which has
both start_time and end_time equals to event_time. And when watermark
surpass the event_time,
we can still get the complete data of such group and do required
aggregation and then emit
insert only results.

That would ease user's burden for not having to define a window when they
already have event
time and watermark defined.

Best,
Kurt


On Fri, Mar 6, 2020 at 10:26 AM Jark Wu <imj...@gmail.com> wrote:

> Hi Gyula,
>
> Does tumbling 5 seconds for aggregation meet your need? For example:
>
> INSERT INTO QueryResult
> SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
> SECOND), sum(t.quantity) AS quantity
> FROM
>   ItemTransactions AS t,
>   Queries AS q
> WHERE
>   t.itemId = q.itemId AND
>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
> GROUP BY
>   t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);
>
> Best,
> Jark
>
> On Thu, 5 Mar 2020 at 23:05, Gyula Fóra <gyula.f...@gmail.com> wrote:
>
>> I see, maybe I just dont understand how to properly express what I am
>> trying to compute.
>>
>> Basically I want to aggregate the quantities of the transactions that
>> happened in the 5 seconds before the query.
>> Every query.id belongs to a single query (event_time, itemid) but still
>> I have to group :/
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <ykt...@gmail.com> wrote:
>>
>>> I think the issue is not caused by event time interval join, but the
>>> aggregation after the join:
>>>     GROUP BY t.itemId, q.event_time, q.queryId;
>>>
>>> In this case, there is still no chance for Flink to determine whether
>>> the groups like (itemId, eventtime, queryId) have complete data or not.
>>> As a comparison, if you change the grouping key to a window which based
>>> only on q.event_time, then the query would emit insert only results.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>
>>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore all
>>>> delete messages").
>>>>
>>>> As for the data completion, in my above example it is basically an
>>>> event time interval join.
>>>> With watermarks defined Flink should be able to compute results once in
>>>> exactly the same way as for the tumbling window.
>>>>
>>>> Gyula
>>>>
>>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <ykt...@gmail.com> wrote:
>>>>
>>>>> Back to this case, I assume you are expecting something like "ignore
>>>>> all delete messages" flag? With this
>>>>> flag turned on, Flink will only send insert messages which
>>>>> corresponding current correct results to kafka and
>>>>> drop all retractions and deletes on the fly.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <ykt...@gmail.com> wrote:
>>>>>
>>>>>> > I also don't completely understand at this point why I can write
>>>>>> the result of a group, tumble window aggregate to Kafka and not this 
>>>>>> window
>>>>>> join / aggregate.
>>>>>>
>>>>>> If you are doing a tumble window aggregate with watermark enabled,
>>>>>> Flink will only fire a final result for
>>>>>> each window at once, no modification or retractions will happen after
>>>>>> a window is calculated and fired.
>>>>>> But with some other arbitrary aggregations, there is not enough
>>>>>> information for Flink to determine whether
>>>>>> the data is complete or not, so the framework will keep calculating
>>>>>> results when receiving new records and
>>>>>> retract earlier results by firing retraction/deletion messages.
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Benoît!
>>>>>>>
>>>>>>> I can see now how I can implement this myself through the provided
>>>>>>> sink interfaces but I was trying to avoid having to write code for this 
>>>>>>> :D
>>>>>>> My initial motivation was to see whether we are able to write out
>>>>>>> any kind of table to Kafka as a simple stream of "upserts".
>>>>>>>
>>>>>>> I also don't completely understand at this point why I can write the
>>>>>>> result of a group, tumble window aggregate to Kafka and not this window
>>>>>>> join / aggregate.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Gyula
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>>>>>> benoit.pa...@centraliens-lille.org> wrote:
>>>>>>>
>>>>>>>> Hi Gyula,
>>>>>>>>
>>>>>>>> I'm afraid conversion to see the retractions vs inserts can't be
>>>>>>>> done in pure SQL (though I'd love that feature).
>>>>>>>>
>>>>>>>> You might want to go lower level and implement a
>>>>>>>> RetractStreamTableSink [1][2] that you would wrap around a 
>>>>>>>> KafkaTableSink
>>>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>>
>>>>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' 
>>>>>>>> or
>>>>>>>> 'retract' signal.
>>>>>>>> You can then filter the DataStream accordingly before passing to
>>>>>>>> the KafkaTableSink.
>>>>>>>>
>>>>>>>> Hope this helps.
>>>>>>>>
>>>>>>>> Best regards
>>>>>>>> Benoît
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>>>>>> [2]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>>>>>> [3]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Roman,
>>>>>>>>>
>>>>>>>>> This is the core logic:
>>>>>>>>>
>>>>>>>>> CREATE TABLE QueryResult (
>>>>>>>>> queryId    BIGINT,
>>>>>>>>>   itemId    STRING,
>>>>>>>>>   quantity INT
>>>>>>>>> ) WITH (
>>>>>>>>> 'connector.type'     = 'kafka',
>>>>>>>>> 'connector.version' = 'universal',
>>>>>>>>> 'connector.topic'   = 'query.output.log.1',
>>>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>>>>>>> 'format.type' = 'json'
>>>>>>>>> );
>>>>>>>>>
>>>>>>>>> INSERT INTO QueryResult
>>>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>>>>>>> FROM
>>>>>>>>>   ItemTransactions AS t,
>>>>>>>>>   Queries AS q
>>>>>>>>> WHERE
>>>>>>>>>   t.itemId = q.itemId AND
>>>>>>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>>>>>>> q.event_time
>>>>>>>>> GROUP BY
>>>>>>>>>   t.itemId, q.event_time, q.queryId;
>>>>>>>>>
>>>>>>>>> And the error I get is:
>>>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException:
>>>>>>>>> Invalid SQL update statement.
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>>>>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>>>>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>>
>>>>>>>>> I am wondering what could I do to just simply pump the result
>>>>>>>>> updates to Kafka here.
>>>>>>>>>
>>>>>>>>> Gyula
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>>>>>>> khachatryan.ro...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Gyula,
>>>>>>>>>>
>>>>>>>>>> Could you provide the code of your Flink program, the error with
>>>>>>>>>> stacktrace and the Flink version?
>>>>>>>>>>
>>>>>>>>>> Thanks.,
>>>>>>>>>> Roman
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi All!
>>>>>>>>>>>
>>>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API
>>>>>>>>>>> and I am trying to play around with it implementing and running a 
>>>>>>>>>>> few
>>>>>>>>>>> use-cases.
>>>>>>>>>>>
>>>>>>>>>>> I have a simple window join + aggregation, grouped on some id
>>>>>>>>>>> that I want to write to Kafka but I am hitting the following error:
>>>>>>>>>>>
>>>>>>>>>>> "AppendStreamTableSink requires that Table has only insert
>>>>>>>>>>> changes."
>>>>>>>>>>>
>>>>>>>>>>> If I understand correctly the problem here is that since updates
>>>>>>>>>>> are possible within a single group, we have a retract stream and 
>>>>>>>>>>> the Kafka
>>>>>>>>>>> Sink cannot handle that. I tried to search for the solution but I 
>>>>>>>>>>> haven't
>>>>>>>>>>> found any satisfying answers.
>>>>>>>>>>>
>>>>>>>>>>> How can I simply tell the INSERT logic to ignore previous values
>>>>>>>>>>> and just always keep sending the latest (like you would see it on 
>>>>>>>>>>> the CLI
>>>>>>>>>>> output).
>>>>>>>>>>>
>>>>>>>>>>> Thank you!
>>>>>>>>>>> Gyula
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Benoît Paris
>>>>>>>> Ingénieur Machine Learning Explicable
>>>>>>>> Tél : +33 6 60 74 23 00
>>>>>>>> http://benoit.paris
>>>>>>>> http://explicable.ml
>>>>>>>>
>>>>>>>

Reply via email to