On Wed, Feb 21, 2018 at 3:24 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Elias,
>
> Flink does not have built-in support for upsert stream -> table
> conversions, yet. However, the community is working on that (see FLINK-8545
> [1]).
> With a workaround, you can also solve the issue with what Flink supports
> so far.
>

Fabian,

Thanks for the reply.  Great to see some progress on this area.  If we
could implement this job in Flink rather than Kafka Stream it would mean
one less technology to support and to train our developers on, which is
always a plus.



> The approach with the MAX(tstamp) query was good idea, but the query needs
> another join predicate on time.
>
> tableEnv.sqlQuery("""
> SELECT a.tstamp, a.item, a.score, a.source
> FROM Telemetry a
>   INNER JOIN (
>     SELECT MAX(tstamp) AS maxT, item, source
>     FROM Telemetry
>     GROUP BY item, source
>   ) b ON a.item = b.item AND a.source = b.source AND a.tstamp = maxT
> """)
>
> Otherwise, the table will have multiple records for each combination of
> item and score as you noticed.
>
> HOWEVER, you might not want to use the query above because it will
> accumulate all records from Telemetry in state and never clean them up.
> The reason for this is that the query planner is not smart enough yet to
> infer that old records will never be joined (this is implied by the join
> condition on time).
>

Thanks for the correction.  But, yes, the indefinite accumulation is a deal
breakers for using this approach.


A better solution is to use a custom user-defined aggregation function [2]
> (LAST_VAL) that returns the value with associated max timestamp.
>
> SELECT item, source, MAX(tstamp), LAST_VAL(score, tstamp)
> FROM Telemetry
> GROUP BY item, source
>
> LAST_VAL would have an accumulator that stores a score and its associated
> timestamp.
> When a new (score, timestamp) pair is accumulated, the UDAGG compares the
> timestamps and only updates the accumulator if the new timestamp is larger.
>

I'll give this approach a try.


Btw. I'm not sure if KStreams only updates the KTable if the update has a
> higher timestamp or just take the last received record.
> That might be an issue with out-of-order data. I would check the behavior
> if you expect data with out-of-order timestamps.
>

I believe you are correct.  KS will attempt to consume records from across
partitions by attempting to align their timestamps, but it won't reorder
records within a partition, which can be problematic if you can't guarantee
ordered records within a partition.  While I talked about KTables, in
reality the job I wrote is a combination of the KS Stream DSL and Operator
API, to get around some of these issues.

The upsert stream table conversion that we are working on will support
> event time (max timestamp) or processing time (last value) upserts.
>

Excellent.


Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8545
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
> dev/table/udfs.html#aggregation-functions
>

Reply via email to