>From MirrorMaker.scala : // Defaults to no data loss settings. maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString) maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
I think the settings would prevent data loss (see KAFKA-2452). On Thu, Sep 14, 2017 at 2:47 PM, Vu Nguyen <vuzilla...@gmail.com> wrote: > I am reading the current MirrorMaker code and am trying to understand if > MirrorMaker has any chance at losing messages. With the usage of the Max > value for ProducerConfig.MAX_BLOCK_MS_CONFIG and > ProducerConfig.RETRIES_CONFIG > settings, it appears that the producer.flush() call in > maybeFlushAndCommitOffsets would block forever if there were any long-term > connectivity issues. The callback is called only on success in that case. > > Is that correct? > > Thanks > > Vu > > > def maybeFlushAndCommitOffsets() { > > > > val commitRequested = mirrorMakerConsumer.commitRequested() > > > > if (commitRequested || System.currentTimeMillis() - > > lastOffsetCommitMs > offsetCommitIntervalMs) { > > > > debug("Committing MirrorMaker state.") > > > > producer.flush() > > > > commitOffsets(mirrorMakerConsumer) > > > > lastOffsetCommitMs = System.currentTimeMillis() > > > > if (commitRequested) > > > > mirrorMakerConsumer.notifyCommit() > > > > } > > > > } > > > > >