I think the issue is not caused by event time interval join, but the
aggregation after the join:
    GROUP BY t.itemId, q.event_time, q.queryId;

In this case, there is still no chance for Flink to determine whether the
groups like (itemId, eventtime, queryId) have complete data or not.
As a comparison, if you change the grouping key to a window which based
only on q.event_time, then the query would emit insert only results.

Best,
Kurt


On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gyula.f...@gmail.com> wrote:

> That's exactly the kind of behaviour I am looking for Kurt ("ignore all
> delete messages").
>
> As for the data completion, in my above example it is basically an event
> time interval join.
> With watermarks defined Flink should be able to compute results once in
> exactly the same way as for the tumbling window.
>
> Gyula
>
> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <ykt...@gmail.com> wrote:
>
>> Back to this case, I assume you are expecting something like "ignore all
>> delete messages" flag? With this
>> flag turned on, Flink will only send insert messages which corresponding
>> current correct results to kafka and
>> drop all retractions and deletes on the fly.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <ykt...@gmail.com> wrote:
>>
>>> > I also don't completely understand at this point why I can write the
>>> result of a group, tumble window aggregate to Kafka and not this window
>>> join / aggregate.
>>>
>>> If you are doing a tumble window aggregate with watermark enabled, Flink
>>> will only fire a final result for
>>> each window at once, no modification or retractions will happen after a
>>> window is calculated and fired.
>>> But with some other arbitrary aggregations, there is not enough
>>> information for Flink to determine whether
>>> the data is complete or not, so the framework will keep calculating
>>> results when receiving new records and
>>> retract earlier results by firing retraction/deletion messages.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>
>>>> Thanks Benoît!
>>>>
>>>> I can see now how I can implement this myself through the provided sink
>>>> interfaces but I was trying to avoid having to write code for this :D
>>>> My initial motivation was to see whether we are able to write out any
>>>> kind of table to Kafka as a simple stream of "upserts".
>>>>
>>>> I also don't completely understand at this point why I can write the
>>>> result of a group, tumble window aggregate to Kafka and not this window
>>>> join / aggregate.
>>>>
>>>> Cheers,
>>>> Gyula
>>>>
>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>>> benoit.pa...@centraliens-lille.org> wrote:
>>>>
>>>>> Hi Gyula,
>>>>>
>>>>> I'm afraid conversion to see the retractions vs inserts can't be done
>>>>> in pure SQL (though I'd love that feature).
>>>>>
>>>>> You might want to go lower level and implement a
>>>>> RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink
>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>>
>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or
>>>>> 'retract' signal.
>>>>> You can then filter the DataStream accordingly before passing to the
>>>>> KafkaTableSink.
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>> Best regards
>>>>> Benoît
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>>> [3]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>>
>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Roman,
>>>>>>
>>>>>> This is the core logic:
>>>>>>
>>>>>> CREATE TABLE QueryResult (
>>>>>> queryId    BIGINT,
>>>>>>   itemId    STRING,
>>>>>>   quantity INT
>>>>>> ) WITH (
>>>>>> 'connector.type'     = 'kafka',
>>>>>> 'connector.version' = 'universal',
>>>>>> 'connector.topic'   = 'query.output.log.1',
>>>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>>>> 'format.type' = 'json'
>>>>>> );
>>>>>>
>>>>>> INSERT INTO QueryResult
>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>>>> FROM
>>>>>>   ItemTransactions AS t,
>>>>>>   Queries AS q
>>>>>> WHERE
>>>>>>   t.itemId = q.itemId AND
>>>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>>>> q.event_time
>>>>>> GROUP BY
>>>>>>   t.itemId, q.event_time, q.queryId;
>>>>>>
>>>>>> And the error I get is:
>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid
>>>>>> SQL update statement.
>>>>>> at
>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>>>> at
>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>>>> at
>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>>>> at
>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>>>> at
>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>>>> at
>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>>>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>>>> at
>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>>>> at
>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>> at
>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>
>>>>>> I am wondering what could I do to just simply pump the result updates
>>>>>> to Kafka here.
>>>>>>
>>>>>> Gyula
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>>>> khachatryan.ro...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Gyula,
>>>>>>>
>>>>>>> Could you provide the code of your Flink program, the error with
>>>>>>> stacktrace and the Flink version?
>>>>>>>
>>>>>>> Thanks.,
>>>>>>> Roman
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All!
>>>>>>>>
>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API and
>>>>>>>> I am trying to play around with it implementing and running a few 
>>>>>>>> use-cases.
>>>>>>>>
>>>>>>>> I have a simple window join + aggregation, grouped on some id that
>>>>>>>> I want to write to Kafka but I am hitting the following error:
>>>>>>>>
>>>>>>>> "AppendStreamTableSink requires that Table has only insert changes."
>>>>>>>>
>>>>>>>> If I understand correctly the problem here is that since updates
>>>>>>>> are possible within a single group, we have a retract stream and the 
>>>>>>>> Kafka
>>>>>>>> Sink cannot handle that. I tried to search for the solution but I 
>>>>>>>> haven't
>>>>>>>> found any satisfying answers.
>>>>>>>>
>>>>>>>> How can I simply tell the INSERT logic to ignore previous values
>>>>>>>> and just always keep sending the latest (like you would see it on the 
>>>>>>>> CLI
>>>>>>>> output).
>>>>>>>>
>>>>>>>> Thank you!
>>>>>>>> Gyula
>>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> Benoît Paris
>>>>> Ingénieur Machine Learning Explicable
>>>>> Tél : +33 6 60 74 23 00
>>>>> http://benoit.paris
>>>>> http://explicable.ml
>>>>>
>>>>

Reply via email to