Let's see if i understood everything correctly:

1)
Let's say that metadata contains N records.

The UI output metrics indicate that /metadata /sends N records.
The UI input metrics for /join /and /late-join/ do each include N records (i.e N + whatever other data they receive).

You expected that the output of /metadata /be 2*N since they are broadcasted to 2 operators.

If so, then the metrics work as intended; they count the number of records that the operator emits; the duplication happens behind the scenes /somewhere/ outside the operator. In other words, the metric counts the number of /Collector#collect()/ calls.

2)
Let's say that /join /emits M records via the main output, and S records via the side-output.

The UI input metrics for /late-join /indicate that M records have been received.

You expected the input for /late-join/ to be S + N instead, the side-output + broadcast data (see 1) ).

If so, then yeah that's weird and shouldn't happen.

For clarification:
You use the broadcast variable for the /join /operator, but KeyedBroadcastMetadataJoin.broadcast(metadata) for the/late-join/.
Is this intended, or just a copy&paste error?

On 03.07.2018 04:16, Cliff Resnick wrote:
Our topology has a metadata source that we push via Broadcast. Because this metadata source is critical, but sometimes late, we added a buffering mechanism via a SideOutput. We call the initial look-up from Broadcast "join" and the secondary, state-backed buffered lookup, "late-join"

Today I noticed that if we implement the late join using a KeyedBroadcastProcessFunction, (so we can set TTL timers while using broadcast) everything seems to work. However, even though our internal metrics show the correct numbers, the numbers in the Flink UI falsely indicates that:

1) No broadcast data is sent to the late join, meaning Flink metrics for the metadata operator does not indicate any extra records sent. 2) Primary Join's main stream (not Side Output) is indicated as being sent to Late Join, meaning the Flink metrics input record number from Primary Join matches Late Join's, even though our logs and internal metrics might show zero traffic.

If I do the late join via CoProcessFunction using a metadata keyed stream instead of broadcast, then the Flink UI shows the correct numbers (unfortunately there is another side issue when we take that tack but I won't go into that here).

I hope this was not too confusing. Again the issue is not that this does not work -- it just looks like it does not work in the Flink UI.

Below is the approximate code. Perhaps I'm doing something wrong that causes the weird reporting?
val metadata = MetadataTable
   .streamFromKafka(env)
val broadcast = createBroadcast(metadata)
val metadataJoined = sourceTables .union(source1Tables) .union(source2Tables) .connect(broadcast) .process(BroadcastMetadataJoin()) // this operator will send side output data using Metadata.sideOutputTag
   .name("join")
val lateJoined = metadataJoined .getSideOutput(Metadata.sideOutputTag) .keyBy(_.primaryKey.getMetadataId) .connect(KeyedBroadcastMetadataJoin.broadcast(metadata)) .process(KeyedBroadcastMetadataJoin()) .name("late-join")


Reply via email to