>From MirrorMaker.scala :

      // Defaults to no data loss settings.
      maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG,
Int.MaxValue.toString)
      maybeSetDefaultProperty(producerProps,
ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)

I think the settings would prevent data loss (see KAFKA-2452).

On Thu, Sep 14, 2017 at 2:47 PM, Vu Nguyen <vuzilla...@gmail.com> wrote:

> I am reading the current MirrorMaker code and am trying to understand if
> MirrorMaker  has any chance at losing messages.  With the usage of the Max
> value for ProducerConfig.MAX_BLOCK_MS_CONFIG and
> ProducerConfig.RETRIES_CONFIG
> settings, it appears that the producer.flush() call in
> maybeFlushAndCommitOffsets would block forever if there were any long-term
> connectivity issues.  The callback is called only on success in that case.
>
> Is that correct?
>
> Thanks
>
> Vu
>
>
>     def maybeFlushAndCommitOffsets() {
> >
> >       val commitRequested = mirrorMakerConsumer.commitRequested()
> >
> >       if (commitRequested || System.currentTimeMillis() -
> > lastOffsetCommitMs > offsetCommitIntervalMs) {
> >
> >         debug("Committing MirrorMaker state.")
> >
> >         producer.flush()
> >
> >         commitOffsets(mirrorMakerConsumer)
> >
> >         lastOffsetCommitMs = System.currentTimeMillis()
> >
> >         if (commitRequested)
> >
> >           mirrorMakerConsumer.notifyCommit()
> >
> >       }
> >
> >     }
> >
> >
>

Reply via email to