I noticed that has been significant work on the SQL / Table subsystem and
decided to evaluate it for one of our use cases.  The use case requires the
joining of two streams, which can be considered a stream of table upserts.
Critically, when joining the streams, we only want to join against the
latest value per key in one of the tables/streams.

Simply performing a join between the stream/tables is not sufficient, as it
will generate result of records other than the latest one.  E.g. if you
have two steam/tables with schema:

Telemetry [
  tstamp: Long
  item: String
  score: Int
  source: String
]

Scores [
  tstamp: Long
  item: String
  score: Int
]

tableEnv.sqlQuery("""
SELECT s.tstamp, s.item, s.score, t.source
FROM    Telemetry t INNER JOIN Scores s ON s.item = t.item
WHERE s.score <> t.score AND s.tstamp >= t.tstamp
""")

If the stream receives 3 records from the telemetry stream for the same
source and then a record that matches the item from the score stream that
updates the score, it will generate three output records, even though we
only want the latest record from the source to be considered.

If this were a regular database we could do the following to only get the
latest records with the telemetry table:

tableEnv.sqlQuery("""
SELECT a.tstamp, a.item, a.score, a.source
FROM Telemetry a
  INNER JOIN (
    SELECT MAX(tstamp), item, source
    FROM Telemetry
    GROUP BY item, source
  ) b ON a.item = b.item AND a.source = b.source
""")

and then execute the previous query against this LatestTelemetry table
instead of Telemetry.  But that does not work.  The query executed within
Flink, but still outputs multiple records, regardless of the order the
records come into the source streams.

I am wondering if there is a way to accomplish this within Flink's
SQL/Table abstractions.

Kafka Streams has the concept of a KTable, where records are considered
upserts and update previous records that have the same key.  Thus, when you
join against a KTable, you only join against the latest record for a given
key, rather than all previous records for the key.

Thoughts?

Reply via email to