Hi,
I would like to ask you for advice. I am trying to join and deduplicate events
from Kafka. Let’s say we have event of three types A, B and C. If B or C arrive
first, they have to wait for A until they are emitted downstream. If A does not
arrive until some predefined duration (gap), B/C are lost. Here is
pseudo-skeleton of implementation:
class JoinFn (gap: Duration) extends DoFn[IN, OUT] {
private final val stateId = "state"
private final val timerStateId = "timerState"
private final val timerId = "expiry"
// state used to keep track of already seen events
@StateId(stateId)
private val stateSpec = StateSpecs.value[State]()
@StateId(timerStateId)
private val timerStateSpec = StateSpecs.value(BooleanCoder.of())
@TimerId(timerId)
private val expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)
override def getAllowedTimestampSkew: Duration = gap
@ProcessElement
def processElement(…): Unit = {
// emit state downstream/updating state
…
}
@OnTimer(timerId)
def onExpiry(@StateId(stateId) state: ValueState[State],
@StateId(timerStateId) timerState: ValueState[java.lang.Boolean]):
Unit = {
// just increasing counters of dropped events
…
state.clear()
timerState.clear()
}
}
This solution works fine when we process events in realtime (latest offset).
When we try to re-process backlog, we see lot of dropped B/C events because of
watermark moving fast. So two runs of the same event time domain will yield in
different results. I was wondering if there is some better approach that would
work in both situations.
Thanks for advice,
Peter
[http://mailsig.zetaglobal.com/zetasig.png]<https://zetaglobal.com/>
Peter Benedikovic
Senior Software Engineer, Big Data
e. [email protected]<mailto:[email protected]>
a. 3 Park Ave, 33rd Floor, New York, NY, 10016
www.zetaglobal.com<http://www.zetaglobal.com>
[http://mailsig.zetaglobal.com/linkedin.png]<https://www.linkedin.com/company/zetaglobal>
[http://mailsig.zetaglobal.com/instagram.png]
<https://www.instagram.com/zetaglobal>
[http://mailsig.zetaglobal.com/facebook.png]
<https://www.facebook.com/zetaglobal>
[http://mailsig.zetaglobal.com/twitter.png] <https://www.twitter.com/zetaglobal>
This message contains information which may be confidential and privileged. You
may not use, copy, or disclose to anyone, the message, or any information
contained in this message for purposes not intended by the sender. If you have
received the message in error, please advise the sender by reply e-mail, and
delete the message .