Hi Timo,
I got it, the issue was a (silly) mistake on my part. I unnecessarily put
all the processElement() logic inside the if condition. The if() condition
is there because I want to emit a disconnected STOPPED message only once.
So the correct code is :
@Override
public void processElement(IUHeartbeat heartbeat, Context ctx,
Collector<IUSessionMessage> out) throws Exception {
Boolean isDisconnected = isDisconnectedStateStore.value();
// LOGGER.info("Watermark: " +
ctx.timerService().currentWatermark() + " Processing timestamp : "+
heartbeat.getTimestamp() + ", isDisconnected : " + isDisconnected
// +" last registered timer :" +
registeredTimerStateStore.value());
// Delete previous timer.
if (registeredTimerStateStore.value() != null)
ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());
// Register a timer that will fire in the future if no further
events are received.
long timerFiringTimestamp = heartbeat.getTimestamp() +
DISCONNECTED_TIMEOUT;
ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
registeredTimerStateStore.update(timerFiringTimestamp);
// If this is the first message for this monitor or is the
first message after a disconnection.
if (isDisconnected == null || isDisconnected == Boolean.TRUE) {
// Emit a message indicating END of the disconnected state.
IUSessionMessage message = new IUSessionMessage(
new
IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),
"dummy", "dummy"),
new IUSessionInfo(heartbeat.getTimestamp(),
IUStatus.ENDED, IUEventType.NO_VALUE));
out.collect(message);
LOGGER.info(message.getSessionInfo().toString());
}
// Update the state store.
isDisconnectedStateStore.update(Boolean.FALSE);
}
This produces the expected output.
Also, I will assume that this is the best way to solve my problem - I can't
use Flink's session windows. Let me know if anyone has any other ideas
though!
Thank you for your time and quick response!
On Tue, Aug 11, 2020 at 1:45 PM Timo Walther <[email protected]> wrote:
> Hi Manas,
>
> at the first glance your code looks correct to me. I would investigate
> if your keys and watermarks are correct. Esp. the watermark frequency
> could be an issue. If watermarks are generated at the same time as the
> heartbeats itself, it might be the case that the timers fire first
> before the process() function is called which resets the timer.
>
> Maybe you can give us more information how watermarks are generated?
>
> Regards,
> Timo
>
> On 11.08.20 08:33, Manas Kale wrote:
> > Hi,
> > I have a bunch of devices that keep sending heartbeat messages. I want
> > to make an operator that emits messages when a device disconnects and
> > when a device stops being disconnected.
> > A device is considered disconnected if we don't receive any heartbeat
> > for more than some TIMEOUT duration.
> > This seemed like a good candidate for session windows, but I am not sure
> > how I can express the inverse logic (i.e. detecting periods of
> > inactivity instead of activity) using Flink's operators.
> > I want to use event time for all processing and ideally want to achieve
> > this behaviour using a single operator.
> >
> > So I am trying to implement a custom processfunction that, on every
> > heartbeat:
> >
> > * Deletes any previous event time timer
> > * Registers a new timer to fire at heartbeat.timestamp + TIMEOUT
> >
> > The basic idea is that every new heartbeat will keep pushing the timer
> > forward. Only when heartbeats stop arriving does the timer fire,
> > indicating the start of a disconnected state.
> > Code:
> >
> > public class IUDisconnectedStateDetectorextends
> KeyedProcessFunction<IUMonitorKey, IUHeartbeat, IUSessionMessage> {
> >
> > // Tracks if this monitor is disconnected or not.
> > private ValueState<Boolean>isDisconnectedStateStore;
> > // Tracks which timer was registered.
> > private ValueState<Long>registeredTimerStateStore;
> >
> > private final LoggerLOGGER =
> LoggerFactory.getLogger(IUDisconnectedStateDetector.class);
> >
> > // Called by the Flink runtime before starting this operator. We
> > initialize the state stores here.
> > @Override
> > public void open(Configuration parameters)throws Exception {
> > isDisconnectedStateStore = getRuntimeContext().getState(new
> ValueStateDescriptor<Boolean>(
> > DISCONNECTED_STATE_STORE_NAME, Boolean.class));
> > registeredTimerStateStore = getRuntimeContext().getState(new
> ValueStateDescriptor<Long>(
> > REGISTERED_TIMER_STATE_STORE_NAME, Long.class));
> > }
> >
> > @Override
> > public void processElement(IUHeartbeat heartbeat, Context ctx,
> Collector<IUSessionMessage> out)throws Exception {
> > Boolean isDisconnected =isDisconnectedStateStore.value();
> > LOGGER.info("Watermark: " + heartbeat +", isDisconnected : " +
> isDisconnected
> > +" last registered timer :"
> +registeredTimerStateStore.value());
> >
> >
> > // If this is the first message for this monitor or is the
> first message
> > after a disconnection.
> > if (isDisconnected ==null || isDisconnected == Boolean.TRUE) {
> > // Delete previous timer.
> > if (registeredTimerStateStore.value() !=null)
> >
> ctx.timerService().deleteEventTimeTimer(registeredTimerStateStore.value());
> >
> > // Register a timer that will fire in the future if no
> further events
> > are received.
> > long timerFiringTimestamp = heartbeat.getTimestamp()
> +DISCONNECTED_TIMEOUT;
> >
> ctx.timerService().registerEventTimeTimer(timerFiringTimestamp);
> > registeredTimerStateStore.update(timerFiringTimestamp);
> >
> > // Emit a message indicating END of the disconnected state.
> > IUSessionMessage message =new IUSessionMessage(
> > new
> IUMonitorFeatureKey(heartbeat.getMonitorKey().getMonitorName(),"dummy","dummy"),
> > new IUSessionInfo(heartbeat.getTimestamp(),
> IUStatus.ENDED, IUEventType.NO_VALUE));
> > out.collect(message);
> > LOGGER.info(message.getSessionInfo().toString());
> > // Update the state store.
> > isDisconnectedStateStore.update(Boolean.FALSE);
> > }
> > }
> >
> >
> > @Override
> > public void onTimer(long timestamp, OnTimerContext ctx,
> Collector<IUSessionMessage> out)throws Exception {
> > if (isDisconnectedStateStore.value() == Boolean.FALSE) {
> > // If this timer fires that means no message was received
> from the
> > monitor for some timeout duration.
> > // Update the state store.
> > isDisconnectedStateStore.update(Boolean.TRUE);
> >
> > // Emit a message indicating START of the disconnected
> state. Note that
> > since this is applicable for a monitor,
> > IUSessionMessage message =new IUSessionMessage(
> > new
> IUMonitorFeatureKey(ctx.getCurrentKey().getMonitorName(),"dummyFeatureName","dummyDeviceId"),
> > new IUSessionInfo(timestamp, IUStatus.STARTED,
> IUEventType.NO_VALUE));
> > out.collect(message);
> >
> > LOGGER.info(message.getSessionInfo().toString());
> > }
> > }
> > }
> >
> > *However, the above code does not behave as expected - the timer fires
> > even when (a) it has received heartbeats within the timeout and (b) I
> > have the code to delete it*. So, my questions:
> >
> > * Am I deleting the timer incorrectly? I use a state store to keep
> > track of registered timer's timestamps and use that value when
> deleting.
> > * Am I overcomplicating things? Can this be achieved using Flink's
> > inbuild session windowing operators?
> >
> > Thanks!
>
>