hello,

sorry for a long post, but this is a puzzling problem and i am enough of a
flink non-expert to be unsure what details are important or not.

background:
i have a flink pipeline that is a series of custom "joins" for the purposes
of user event "flattening" that i wrote a custom KeyedCoProcessFunction
that either joins on a parent id between the two connected streams using
the "left" event's primary key and the foreign key on the right event OR if
the right (child) event doesn't have a foreign key, tries to infer the join
using heuristics to limit the possible parent events and grabbing the
temporally-closest one.  both the inference and state cleanup for these
joins are happening on the onTimer method.

everything is based on event time, and i'm using kafka connector input
source for the right event inputs to these operators.  here's what the
pipeline looks like, with the joins in question acting like a chain of
joins with the output of the previous join (left input) being joined with a
new raw event source (right input):

[image: Screen Shot 2021-05-20 at 3.12.22 PM.png]
these join functions have a time window/duration or interval associated
with them to define the duration of join state and inference window.  this
is set per operator to allow for in order and out of order join thresholds
for id based joins, and this window acts as the scope for inference when a
right event that is an inference candidate (missing foreign key id) is
about to be evicted from state.

problem:

i have things coded up with side outputs for duplicate, late and dropped
events.  the dropped events case is the one i am focusing on since events
that go unmatched are dropped when they are evicted from state.  only rhs
events are the ones being dropped, with rhs events w/ foreign keys dropped
when they go unmatched (late/no left arrival or no valid inference based
left event).  with a wide enough time duration setting for both in order
and out of order durations, everything gets matched.  however, when testing
things out, i observed (expectedly) that the dropped events increases the
tighter you make the join window based on these durations.  great, this
makes sense.  i wanted to get a better understanding for these durations'
impacts, so i wrote our integration/stress test case to focus on just id
key based joins to start on this.

further, to help observe the dropping characteristics, i connected the side
outputs to some s3 sinks to store these dropped events.  originally, these
dropped right events were output properly to the s3 output.  for the
integration/stress test setup, they start to appear with durations < 1
minute.

however, i observed that they didn't include the flink Context.timestamp
encoded in the event structure anywhere (the left events were already
setting the timestamp in the processElement1 method).  i wanted this
information to see how event time processing worked in practice.  so, i
made a similarly simple change to the processElement2 function to set the
timestamp on these right events as they came in.

once i did this, things stopped dropping and everything joined, even if i
set the durations down to 1 second on either side.  wut?

i can comment out the single right hand side event timestamp setting code,
and get the dropped events (sans flink timestamp).  uncommenting this code
to put the timestamps back in, things fully match again.

what is going on?  it feels like a heisenberg effect with the "touched"
right events.

any help here would be greatly appreciated.



once i made this change, all the joins

Reply via email to