Our topology has a metadata source that we push via Broadcast. Because this
metadata source is critical, but sometimes late, we added a buffering
mechanism via a SideOutput. We call the initial look-up from Broadcast
"join"  and the secondary, state-backed buffered  lookup, "late-join"

Today I noticed that if we implement the late join using a
KeyedBroadcastProcessFunction, (so we can set TTL timers while using
broadcast) everything seems to work. However, even though our
internal metrics show the correct numbers, the numbers in the Flink UI
falsely indicates that:

1) No broadcast data is sent to the late join, meaning Flink metrics
for the metadata operator does not indicate any extra records sent.
2) Primary Join's main stream (not Side Output) is indicated as being sent
to Late Join, meaning the Flink metrics input record number from Primary
Join matches Late Join's, even though our logs and internal metrics might
show zero traffic.

If I do the late join via CoProcessFunction using a metadata keyed stream
instead of broadcast, then the Flink UI shows the correct numbers
(unfortunately there is another side issue when we take that tack but I
won't go into that here).

I hope this was not too confusing. Again the issue is not that this does
not work -- it just looks like it does not work in the Flink UI.

Below is the approximate code. Perhaps I'm doing something wrong that
causes the weird reporting?

val metadata = MetadataTable
  .streamFromKafka(env)

val broadcast = createBroadcast(metadata)

val metadataJoined = sourceTables
.union(source1Tables)
.union(source2Tables)
.connect(broadcast)
.process(BroadcastMetadataJoin()) // this operator will send side output
data using Metadata.sideOutputTag

  .name("join")


val lateJoined = metadataJoined
  .getSideOutput(Metadata.sideOutputTag)
  .keyBy(_.primaryKey.getMetadataId)
  .connect(KeyedBroadcastMetadataJoin.broadcast(metadata))
  .process(KeyedBroadcastMetadataJoin())
  .name("late-join")

Reply via email to