I am reading the current MirrorMaker code and am trying to understand if
MirrorMaker  has any chance at losing messages.  With the usage of the Max
value for ProducerConfig.MAX_BLOCK_MS_CONFIG and ProducerConfig.RETRIES_CONFIG
settings, it appears that the producer.flush() call in
maybeFlushAndCommitOffsets would block forever if there were any long-term
connectivity issues.  The callback is called only on success in that case.

Is that correct?

Thanks

Vu


    def maybeFlushAndCommitOffsets() {
>
>       val commitRequested = mirrorMakerConsumer.commitRequested()
>
>       if (commitRequested || System.currentTimeMillis() -
> lastOffsetCommitMs > offsetCommitIntervalMs) {
>
>         debug("Committing MirrorMaker state.")
>
>         producer.flush()
>
>         commitOffsets(mirrorMakerConsumer)
>
>         lastOffsetCommitMs = System.currentTimeMillis()
>
>         if (commitRequested)
>
>           mirrorMakerConsumer.notifyCommit()
>
>       }
>
>     }
>
>

Reply via email to