Hi Elias,

Flink does not have built-in support for upsert stream -> table
conversions, yet. However, the community is working on that (see FLINK-8545
[1]).
With a workaround, you can also solve the issue with what Flink supports so
far.

The approach with the MAX(tstamp) query was good idea, but the query needs
another join predicate on time.

tableEnv.sqlQuery("""
SELECT a.tstamp, a.item, a.score, a.source
FROM Telemetry a
  INNER JOIN (
    SELECT MAX(tstamp) AS maxT, item, source
    FROM Telemetry
    GROUP BY item, source
  ) b ON a.item = b.item AND a.source = b.source AND a.tstamp = maxT
""")

Otherwise, the table will have multiple records for each combination of
item and score as you noticed.

HOWEVER, you might not want to use the query above because it will
accumulate all records from Telemetry in state and never clean them up.
The reason for this is that the query planner is not smart enough yet to
infer that old records will never be joined (this is implied by the join
condition on time).

A better solution is to use a custom user-defined aggregation function [2]
(LAST_VAL) that returns the value with associated max timestamp.

SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp)
FROM Telemetry
GROUP BY item, source

LAST_VAL would have an accumulator that stores a score and its associated
timestamp.
When a new (score, timestamp) pair is accumulated, the UDAGG compares the
timestamps and only updates the accumulator if the new timestamp is larger.

Btw. I'm not sure if KStreams only updates the KTable if the update has a
higher timestamp or just take the last received record.
That might be an issue with out-of-order data. I would check the behavior
if you expect data with out-of-order timestamps.

The upsert stream table conversion that we are working on will support
event time (max timestamp) or processing time (last value) upserts.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8545
[2] https://ci.apache.org/projects/flink/flink-docs-
release-1.4/dev/table/udfs.html#aggregation-functions

2018-02-21 1:06 GMT+01:00 Elias Levy <fearsome.lucid...@gmail.com>:

> [ Adding the list back in, as this clarifies my question ]
>
> On Tue, Feb 20, 2018 at 3:42 PM, Darshan Singh <darshan.m...@gmail.com>
> wrote:
>
>> I am no expert in Flink but I will try my best. Issue you mentioned will
>> be with all streaming systems even with Kafka KTable I use them a lot for
>> similar sort of requirements.
>>
>> In Kafka you have KTable on Telemetry with 3 records and join with say
>> Scores which could be KTable or Kstrem  and you start your streaming query
>> as mentioned above it will give just 1 row as expected. However, if there
>> is a new value for the same key with timestamp greater than previous max
>> will be added to the Telemetry it will output the new value as well and
>> that is main idea about the streaming anyway you want to see the changed
>> value. So once you started streaming you will get whatever is the outcome
>> of your
>>
>
> Darshan,
>
> Thanks for the reply.  I've already implemented this job using Kafka
> Streams, so I am aware of how KTables behaves.  I would have helped if I
> had included some sample data in my post, so here it is.  If you have this
> data coming into Telemetry:
>
> ts, item, score, source
> 0, item1, 1, source1
> 1, item1, 1, source1
> 2, item1, 1, source1
>
> And this comes into Scores:
>
> ts, item, score
> 3, item1, 3
>
> Flink will output 3 records from the queries I mentioned:
>
> (3, item1, 3, source1)
> (3, item1, 3, source1)
> (3, item1, 3, source1)
>
>
> In contrast, if you run the query in Kafka Stream configuring Telemetry as
> a KTable keyed by (item, source), the output will be a single record.  In
> Telemetry record for key (item1, source1) at time 1 will overwrite the
> record at time 0, and the record at time 2 will overwrite the one at time
> 1.  By the time the record at time 3 comes in via Scores, it will be joined
> only with the record from time 2 in Telemetry.
>
> Yes, it is possible for the Kafka Streams query to output multiple records
> if the records from the different streams are not time aligned, as Kafka
> Streams only guarantees a best effort aligning the streams. But in the
> common case the output will be a single record.
>
>
> I think in fllink you can do the same, from your telemeter stream/table
>> you can create the LatestTelemetry table using similar sql(I am sure it
>> should give you latest timestamp's data) as you did with the RDBMS and then
>> join with scores table. You should get similar results to KTable or any
>> other streaming system.
>>
>
> Not sure if you missed it, but I actually executed the query to define the
> LatestTelemetry table in Flink using that query and joined against it.  The
> output was the same three records.
>
>

Reply via email to