I see, maybe I just dont understand how to properly express what I am
trying to compute.

Basically I want to aggregate the quantities of the transactions that
happened in the 5 seconds before the query.
Every query.id belongs to a single query (event_time, itemid) but still I
have to group :/

Gyula

On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <ykt...@gmail.com> wrote:

> I think the issue is not caused by event time interval join, but the
> aggregation after the join:
>     GROUP BY t.itemId, q.event_time, q.queryId;
>
> In this case, there is still no chance for Flink to determine whether the
> groups like (itemId, eventtime, queryId) have complete data or not.
> As a comparison, if you change the grouping key to a window which based
> only on q.event_time, then the query would emit insert only results.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
>> That's exactly the kind of behaviour I am looking for Kurt ("ignore all
>> delete messages").
>>
>> As for the data completion, in my above example it is basically an event
>> time interval join.
>> With watermarks defined Flink should be able to compute results once in
>> exactly the same way as for the tumbling window.
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <ykt...@gmail.com> wrote:
>>
>>> Back to this case, I assume you are expecting something like "ignore all
>>> delete messages" flag? With this
>>> flag turned on, Flink will only send insert messages which corresponding
>>> current correct results to kafka and
>>> drop all retractions and deletes on the fly.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <ykt...@gmail.com> wrote:
>>>
>>>> > I also don't completely understand at this point why I can write the
>>>> result of a group, tumble window aggregate to Kafka and not this window
>>>> join / aggregate.
>>>>
>>>> If you are doing a tumble window aggregate with watermark enabled,
>>>> Flink will only fire a final result for
>>>> each window at once, no modification or retractions will happen after a
>>>> window is calculated and fired.
>>>> But with some other arbitrary aggregations, there is not enough
>>>> information for Flink to determine whether
>>>> the data is complete or not, so the framework will keep calculating
>>>> results when receiving new records and
>>>> retract earlier results by firing retraction/deletion messages.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gyula.f...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Benoît!
>>>>>
>>>>> I can see now how I can implement this myself through the provided
>>>>> sink interfaces but I was trying to avoid having to write code for this :D
>>>>> My initial motivation was to see whether we are able to write out any
>>>>> kind of table to Kafka as a simple stream of "upserts".
>>>>>
>>>>> I also don't completely understand at this point why I can write the
>>>>> result of a group, tumble window aggregate to Kafka and not this window
>>>>> join / aggregate.
>>>>>
>>>>> Cheers,
>>>>> Gyula
>>>>>
>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>>>> benoit.pa...@centraliens-lille.org> wrote:
>>>>>
>>>>>> Hi Gyula,
>>>>>>
>>>>>> I'm afraid conversion to see the retractions vs inserts can't be done
>>>>>> in pure SQL (though I'd love that feature).
>>>>>>
>>>>>> You might want to go lower level and implement a
>>>>>> RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink
>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>>
>>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or
>>>>>> 'retract' signal.
>>>>>> You can then filter the DataStream accordingly before passing to the
>>>>>> KafkaTableSink.
>>>>>>
>>>>>> Hope this helps.
>>>>>>
>>>>>> Best regards
>>>>>> Benoît
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>>>> [3]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Roman,
>>>>>>>
>>>>>>> This is the core logic:
>>>>>>>
>>>>>>> CREATE TABLE QueryResult (
>>>>>>> queryId    BIGINT,
>>>>>>>   itemId    STRING,
>>>>>>>   quantity INT
>>>>>>> ) WITH (
>>>>>>> 'connector.type'     = 'kafka',
>>>>>>> 'connector.version' = 'universal',
>>>>>>> 'connector.topic'   = 'query.output.log.1',
>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>>>>> 'format.type' = 'json'
>>>>>>> );
>>>>>>>
>>>>>>> INSERT INTO QueryResult
>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>>>>> FROM
>>>>>>>   ItemTransactions AS t,
>>>>>>>   Queries AS q
>>>>>>> WHERE
>>>>>>>   t.itemId = q.itemId AND
>>>>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>>>>> q.event_time
>>>>>>> GROUP BY
>>>>>>>   t.itemId, q.event_time, q.queryId;
>>>>>>>
>>>>>>> And the error I get is:
>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid
>>>>>>> SQL update statement.
>>>>>>> at
>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>>>>> at
>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>>>>> at
>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>>>>> at
>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>>>>> at
>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>>>>> at
>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>>>>> at
>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>>>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>> at
>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>
>>>>>>> I am wondering what could I do to just simply pump the result
>>>>>>> updates to Kafka here.
>>>>>>>
>>>>>>> Gyula
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>>>>> khachatryan.ro...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Gyula,
>>>>>>>>
>>>>>>>> Could you provide the code of your Flink program, the error with
>>>>>>>> stacktrace and the Flink version?
>>>>>>>>
>>>>>>>> Thanks.,
>>>>>>>> Roman
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All!
>>>>>>>>>
>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API
>>>>>>>>> and I am trying to play around with it implementing and running a few
>>>>>>>>> use-cases.
>>>>>>>>>
>>>>>>>>> I have a simple window join + aggregation, grouped on some id that
>>>>>>>>> I want to write to Kafka but I am hitting the following error:
>>>>>>>>>
>>>>>>>>> "AppendStreamTableSink requires that Table has only insert
>>>>>>>>> changes."
>>>>>>>>>
>>>>>>>>> If I understand correctly the problem here is that since updates
>>>>>>>>> are possible within a single group, we have a retract stream and the 
>>>>>>>>> Kafka
>>>>>>>>> Sink cannot handle that. I tried to search for the solution but I 
>>>>>>>>> haven't
>>>>>>>>> found any satisfying answers.
>>>>>>>>>
>>>>>>>>> How can I simply tell the INSERT logic to ignore previous values
>>>>>>>>> and just always keep sending the latest (like you would see it on the 
>>>>>>>>> CLI
>>>>>>>>> output).
>>>>>>>>>
>>>>>>>>> Thank you!
>>>>>>>>> Gyula
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Benoît Paris
>>>>>> Ingénieur Machine Learning Explicable
>>>>>> Tél : +33 6 60 74 23 00
>>>>>> http://benoit.paris
>>>>>> http://explicable.ml
>>>>>>
>>>>>

Reply via email to