Hi Gyula,

Does tumbling 5 seconds for aggregation meet your need? For example:

INSERT INTO QueryResult
SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
SECOND), sum(t.quantity) AS quantity
FROM
  ItemTransactions AS t,
  Queries AS q
WHERE
  t.itemId = q.itemId AND
  t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
GROUP BY
  t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);

Best,
Jark

On Thu, 5 Mar 2020 at 23:05, Gyula Fóra <gyula.f...@gmail.com> wrote:

> I see, maybe I just dont understand how to properly express what I am
> trying to compute.
>
> Basically I want to aggregate the quantities of the transactions that
> happened in the 5 seconds before the query.
> Every query.id belongs to a single query (event_time, itemid) but still I
> have to group :/
>
> Gyula
>
> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young <ykt...@gmail.com> wrote:
>
>> I think the issue is not caused by event time interval join, but the
>> aggregation after the join:
>>     GROUP BY t.itemId, q.event_time, q.queryId;
>>
>> In this case, there is still no chance for Flink to determine whether the
>> groups like (itemId, eventtime, queryId) have complete data or not.
>> As a comparison, if you change the grouping key to a window which based
>> only on q.event_time, then the query would emit insert only results.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore all
>>> delete messages").
>>>
>>> As for the data completion, in my above example it is basically an event
>>> time interval join.
>>> With watermarks defined Flink should be able to compute results once in
>>> exactly the same way as for the tumbling window.
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young <ykt...@gmail.com> wrote:
>>>
>>>> Back to this case, I assume you are expecting something like "ignore
>>>> all delete messages" flag? With this
>>>> flag turned on, Flink will only send insert messages which
>>>> corresponding current correct results to kafka and
>>>> drop all retractions and deletes on the fly.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young <ykt...@gmail.com> wrote:
>>>>
>>>>> > I also don't completely understand at this point why I can write the
>>>>> result of a group, tumble window aggregate to Kafka and not this window
>>>>> join / aggregate.
>>>>>
>>>>> If you are doing a tumble window aggregate with watermark enabled,
>>>>> Flink will only fire a final result for
>>>>> each window at once, no modification or retractions will happen after
>>>>> a window is calculated and fired.
>>>>> But with some other arbitrary aggregations, there is not enough
>>>>> information for Flink to determine whether
>>>>> the data is complete or not, so the framework will keep calculating
>>>>> results when receiving new records and
>>>>> retract earlier results by firing retraction/deletion messages.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Benoît!
>>>>>>
>>>>>> I can see now how I can implement this myself through the provided
>>>>>> sink interfaces but I was trying to avoid having to write code for this 
>>>>>> :D
>>>>>> My initial motivation was to see whether we are able to write out any
>>>>>> kind of table to Kafka as a simple stream of "upserts".
>>>>>>
>>>>>> I also don't completely understand at this point why I can write the
>>>>>> result of a group, tumble window aggregate to Kafka and not this window
>>>>>> join / aggregate.
>>>>>>
>>>>>> Cheers,
>>>>>> Gyula
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>>>>> benoit.pa...@centraliens-lille.org> wrote:
>>>>>>
>>>>>>> Hi Gyula,
>>>>>>>
>>>>>>> I'm afraid conversion to see the retractions vs inserts can't be
>>>>>>> done in pure SQL (though I'd love that feature).
>>>>>>>
>>>>>>> You might want to go lower level and implement a
>>>>>>> RetractStreamTableSink [1][2] that you would wrap around a 
>>>>>>> KafkaTableSink
>>>>>>> [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>>
>>>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or
>>>>>>> 'retract' signal.
>>>>>>> You can then filter the DataStream accordingly before passing to the
>>>>>>> KafkaTableSink.
>>>>>>>
>>>>>>> Hope this helps.
>>>>>>>
>>>>>>> Best regards
>>>>>>> Benoît
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>>>>> [3]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Roman,
>>>>>>>>
>>>>>>>> This is the core logic:
>>>>>>>>
>>>>>>>> CREATE TABLE QueryResult (
>>>>>>>> queryId    BIGINT,
>>>>>>>>   itemId    STRING,
>>>>>>>>   quantity INT
>>>>>>>> ) WITH (
>>>>>>>> 'connector.type'     = 'kafka',
>>>>>>>> 'connector.version' = 'universal',
>>>>>>>> 'connector.topic'   = 'query.output.log.1',
>>>>>>>> 'connector.properties.bootstrap.servers' = '<broker>',
>>>>>>>> 'format.type' = 'json'
>>>>>>>> );
>>>>>>>>
>>>>>>>> INSERT INTO QueryResult
>>>>>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>>>>>> FROM
>>>>>>>>   ItemTransactions AS t,
>>>>>>>>   Queries AS q
>>>>>>>> WHERE
>>>>>>>>   t.itemId = q.itemId AND
>>>>>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>>>>>> q.event_time
>>>>>>>> GROUP BY
>>>>>>>>   t.itemId, q.event_time, q.queryId;
>>>>>>>>
>>>>>>>> And the error I get is:
>>>>>>>> org.apache.flink.table.client.gateway.SqlExecutionException:
>>>>>>>> Invalid SQL update statement.
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>>>>>>> at java.util.Optional.ifPresent(Optional.java:159)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>>>>>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>>>>>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>>>>>>> Caused by: org.apache.flink.table.api.TableException:
>>>>>>>> AppendStreamTableSink requires that Table has only insert changes.
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>>>>>> at
>>>>>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>>>>>>
>>>>>>>> I am wondering what could I do to just simply pump the result
>>>>>>>> updates to Kafka here.
>>>>>>>>
>>>>>>>> Gyula
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>>>>>>> khachatryan.ro...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Gyula,
>>>>>>>>>
>>>>>>>>> Could you provide the code of your Flink program, the error with
>>>>>>>>> stacktrace and the Flink version?
>>>>>>>>>
>>>>>>>>> Thanks.,
>>>>>>>>> Roman
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All!
>>>>>>>>>>
>>>>>>>>>> Excuse my stupid question, I am pretty new to the Table/SQL API
>>>>>>>>>> and I am trying to play around with it implementing and running a few
>>>>>>>>>> use-cases.
>>>>>>>>>>
>>>>>>>>>> I have a simple window join + aggregation, grouped on some id
>>>>>>>>>> that I want to write to Kafka but I am hitting the following error:
>>>>>>>>>>
>>>>>>>>>> "AppendStreamTableSink requires that Table has only insert
>>>>>>>>>> changes."
>>>>>>>>>>
>>>>>>>>>> If I understand correctly the problem here is that since updates
>>>>>>>>>> are possible within a single group, we have a retract stream and the 
>>>>>>>>>> Kafka
>>>>>>>>>> Sink cannot handle that. I tried to search for the solution but I 
>>>>>>>>>> haven't
>>>>>>>>>> found any satisfying answers.
>>>>>>>>>>
>>>>>>>>>> How can I simply tell the INSERT logic to ignore previous values
>>>>>>>>>> and just always keep sending the latest (like you would see it on 
>>>>>>>>>> the CLI
>>>>>>>>>> output).
>>>>>>>>>>
>>>>>>>>>> Thank you!
>>>>>>>>>> Gyula
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Benoît Paris
>>>>>>> Ingénieur Machine Learning Explicable
>>>>>>> Tél : +33 6 60 74 23 00
>>>>>>> http://benoit.paris
>>>>>>> http://explicable.ml
>>>>>>>
>>>>>>

Reply via email to