Hi Team,

I am trying to build an audit like system where I read messages from "n"
Kafka queues, key by a unique key and then reduce them to a single message,
if it has passed through all the "n" Kafka queues in a window time of "m"
hours/days, the message has succeeded else it has expired.

I can get it working in my test case but can't get it working when there
are million of messages, there are very few messages that goes to the
success stage in the iteration, huge amount of messages are sent back to
the iteration, hence it create back pressure and it does not read the
messages from Kafka queues anymore. Since no new messages are read, the
messages inside the window no longer succeed, they keep going through the
iterator forever and expire although they must succeed.

I read about the buffer which when full creates back pressure and does not
read any more messages. The system is suppose to be a light weight audit
system and audit messages created are very small in size. Is it possible to
increase the size of the buffer to avoid back pressure? Is there an
alternative solution to this issue?

The code looks like this:

val unionInputStream = union(kafka1,kafka2,kafka3,kafka4)

def audit() = {
 reducedStream =
unionInputStream.keyby(keyFunction).window(TumblingProcessingTimeWindow).reduce(reduceFunction)
splitStreams = reducedStream.split(splitFunction)
splitStreams.select(success).addSink(terminalSink)
splitStreams.select(expire).addSink(expireSink)
(splitStreams.select(replay), splitStreams.select(success))
}

unionInputStream.iterate(audit(_))



Thanks and Regards,
Mahesh

Reply via email to