Hi,

I would like to ask you for advice. I am trying to join and deduplicate events 
from Kafka. Let’s say we have event of three types A, B and C. If B or C arrive 
first, they have to wait for A until they are emitted downstream. If A does not 
arrive until some predefined duration (gap), B/C are lost. Here is 
pseudo-skeleton of implementation:

class JoinFn (gap: Duration) extends DoFn[IN, OUT] {

private final val stateId = "state"
private final val timerStateId = "timerState"
private final val timerId = "expiry"
// state used to keep track of already seen events
@StateId(stateId)
private val stateSpec = StateSpecs.value[State]()

@StateId(timerStateId)
private val timerStateSpec = StateSpecs.value(BooleanCoder.of())

@TimerId(timerId)
private val expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)

override def getAllowedTimestampSkew: Duration = gap

@ProcessElement
def processElement(…): Unit =  {
// emit state downstream/updating state
…
}

@OnTimer(timerId)
def onExpiry(@StateId(stateId) state: ValueState[State],
             @StateId(timerStateId) timerState: ValueState[java.lang.Boolean]): 
Unit = {
  // just increasing counters of dropped events
…
  state.clear()
  timerState.clear()
}
}

This solution works fine when we process events in realtime (latest offset). 
When we try to re-process backlog, we see lot of dropped B/C events because of 
watermark moving fast. So two runs of the same event time domain will yield in 
different results. I was wondering if there is some better approach that would 
work in both situations.

Thanks for advice,
Peter




[http://mailsig.zetaglobal.com/zetasig.png]<https://zetaglobal.com/>

Peter Benedikovic
Senior Software Engineer, Big Data

e.  [email protected]<mailto:[email protected]>

a. 3 Park Ave, 33rd Floor, New York, NY, 10016
www.zetaglobal.com<http://www.zetaglobal.com>
[http://mailsig.zetaglobal.com/linkedin.png]<https://www.linkedin.com/company/zetaglobal>
     [http://mailsig.zetaglobal.com/instagram.png] 
<https://www.instagram.com/zetaglobal>      
[http://mailsig.zetaglobal.com/facebook.png] 
<https://www.facebook.com/zetaglobal>      
[http://mailsig.zetaglobal.com/twitter.png] <https://www.twitter.com/zetaglobal>



This message contains information which may be confidential and privileged. You 
may not use, copy, or disclose to anyone, the message, or any information 
contained in this message for purposes not intended by the sender. If you have 
received the message in error, please advise the sender by reply e-mail, and 
delete the message .


Reply via email to