Hi Jun,

Is there any plug-ability that Developer can customize batching logic or
inject custom code for this ? Shall I file Jira for this issues.

Thanks,

Bhavesh

On Tue, Sep 9, 2014 at 3:52 PM, Jun Rao <jun...@gmail.com> wrote:

> No, the new producer doesn't address that problem.
>
> Thanks,
>
> Jun
>
> On Tue, Sep 9, 2014 at 12:59 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > HI Jun,
> >
> > Thanks for clarification.  Follow up questions, does new producer solve
> the
> > issues highlight.  In event of compression and async mode in new
> producer,
> > will it break down messages to this UPPER limit and submit or new
> producer
> > strictly honor batch size.  I am just asking if compression batch size
> > message reaches this configured limit, does batch broken down to sub
> batch
> > that is within limit.  I would like to minimize the data loss due to this
> > limit.
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Mon, Sep 8, 2014 at 9:17 PM, Jun Rao <jun...@gmail.com> wrote:
> >
> > > Are you using compression in the producer? If so, message.max.bytes
> > applies
> > > to the compressed size of a batch of messages. Otherwise,
> > message.max.bytes
> > > applies to the size of each individual message.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry <
> > mistry.p.bhav...@gmail.com
> > > >
> > > wrote:
> > >
> > > > I am referring to wiki http://kafka.apache.org/08/configuration.html
> > and
> > > > following parameter control max batch message bytes as far as I know.
> > > > Kafka Community, please correct me if I am wrong.  I do not want to
> > > create
> > > > confusion for Kafka User Community here.   Also, if you increase this
> > > limit
> > > > than you have to set the corresponding limit increase on consumer
> side
> > as
> > > > well (fetch.message.max.bytes).
> > > >
> > > > Since we are using batch async mode, our messages are getting drop
> > > sometime
> > > > if the entire batch bytes  exceed this limit so I was asking Kafka
> > > > Developers if any optimal way to determine the batch size based on
> this
> > > > limit to minimize the data loss. Because, entire batch is rejected by
> > > > brokers.
> > > >
> > > > message.max.bytes 1000000 The maximum size of a message that the
> server
> > > can
> > > > receive. It is important that this property be in sync with the
> maximum
> > > > fetch size your consumers use or else an unruly producer will be able
> > to
> > > > publish messages too large for consumers to consume.
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > >
> > > > On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> > > > alexis.mi...@airbedandbreakfast.com> wrote:
> > > >
> > > > > Hi Bhavesh
> > > > >
> > > > > can you explain what limit you're referring to?
> > > > > I'm asking because `message.max.bytes` is applied per message not
> per
> > > > > batch.
> > > > > is there another limit I should be aware of?
> > > > >
> > > > > thanks
> > > > >
> > > > >
> > > > > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> > > > mistry.p.bhav...@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > We have similar problem.  We have variable length of messages.
> So
> > > when
> > > > > we
> > > > > > have fixed size of Batch sometime the batch exceed the limit set
> on
> > > the
> > > > > > brokers (2MB).
> > > > > >
> > > > > > So can Producer have some extra logic to determine the optimal
> > batch
> > > > size
> > > > > > by looking at configured message.max.bytes  value.
> > > > > >
> > > > > > During the metadata update, Producer will get this value from the
> > > > Broker
> > > > > > for each topic and Producer will check if current batch size
> reach
> > > this
> > > > > > limit than break batch into smaller chunk such way that It would
> > not
> > > > > exceed
> > > > > > limit (unless single message exceed the limit). Basically try to
> > > avoid
> > > > > data
> > > > > > loss as much as possible.
> > > > > >
> > > > > > Please let me know what is your opinion on this...
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bhavesh
> > > > > >
> > > > > >
> > > > > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > > > > > alexis.mi...@airbedandbreakfast.com> wrote:
> > > > > >
> > > > > > > Thanks Jun.
> > > > > > >
> > > > > > > I'll create a jira and try to provide a patch. I think this is
> > > pretty
> > > > > > > serious.
> > > > > > >
> > > > > > > On Friday, August 29, 2014, Jun Rao <jun...@gmail.com> wrote:
> > > > > > >
> > > > > > > > The goal of batching is mostly to reduce the # RPC calls to
> the
> > > > > broker.
> > > > > > > If
> > > > > > > > compression is enabled, a larger batch typically implies
> better
> > > > > > > compression
> > > > > > > > ratio.
> > > > > > > >
> > > > > > > > The reason that we have to fail the whole batch is that the
> > error
> > > > > code
> > > > > > in
> > > > > > > > the produce response is per partition, instead of per
> message.
> > > > > > > >
> > > > > > > > Retrying individual messages on MessageSizeTooLarge seems
> > > > reasonable.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > > > > > alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > > >
> > > > > > > > > Could you explain the goals of batches? I was assuming this
> > was
> > > > > > simply
> > > > > > > a
> > > > > > > > > performance optimization, but this behavior makes me think
> > I'm
> > > > > > missing
> > > > > > > > > something.
> > > > > > > > > is a batch more than a list of *independent* messages?
> > > > > > > > >
> > > > > > > > > Why would you reject the whole batch? One invalid message
> > > causes
> > > > > the
> > > > > > > loss
> > > > > > > > > of batch.num.messages-1 messages :(
> > > > > > > > > It seems pretty critical to me.
> > > > > > > > >
> > > > > > > > > If ack=0, the producer will never know about it.
> > > > > > > > > If ack !=0, the producer will retry the whole batch. If the
> > > issue
> > > > > was
> > > > > > > > > related to data corruption (etc), retries might work. But
> in
> > > the
> > > > > case
> > > > > > > of
> > > > > > > > > "big message", the batch will always be rejected and the
> > > producer
> > > > > > will
> > > > > > > > give
> > > > > > > > > up.
> > > > > > > > >
> > > > > > > > > If the messages are indeed considered independent, I think
> > this
> > > > is
> > > > > a
> > > > > > > > pretty
> > > > > > > > > serious issue.
> > > > > > > > >
> > > > > > > > > I see 2 possible fix approaches:
> > > > > > > > > - the broker could reject only the invalid messages
> > > > > > > > > - the broker could reject the whole batch (like today) but
> > the
> > > > > > producer
> > > > > > > > (if
> > > > > > > > > ack!=0) could retry messages one at a time on exception
> like
> > > > > > > > > "MessageSizeTooLarge".
> > > > > > > > >
> > > > > > > > > opinions?
> > > > > > > > >
> > > > > > > > > Alexis
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > > [2014-08-29 16:00:35,170] WARN Produce request with
> > correlation
> > > > id
> > > > > 46
> > > > > > > > > failed due to [test,1]:
> > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > [2014-08-29 16:00:35,284] WARN Produce request with
> > correlation
> > > > id
> > > > > 51
> > > > > > > > > failed due to [test,0]:
> > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > [2014-08-29 16:00:35,392] WARN Produce request with
> > correlation
> > > > id
> > > > > 56
> > > > > > > > > failed due to [test,0]:
> > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > [2014-08-29 16:00:35,499] WARN Produce request with
> > correlation
> > > > id
> > > > > 61
> > > > > > > > > failed due to [test,1]:
> > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for
> > > > topics
> > > > > > test
> > > > > > > > > with correlation ids in [43,62]
> > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of
> 3
> > > > events
> > > > > > > > > (kafka.producer.async.ProducerSendThread)
> > > > > > > > > kafka.common.FailedToSendMessageException: Failed to send
> > > > messages
> > > > > > > after
> > > > > > > > 3
> > > > > > > > > tries.
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > > > > > >  at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > > > > > >  at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > > > > > > ```
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <jun...@gmail.com
> > > > > > > > <javascript:;>> wrote:
> > > > > > > > >
> > > > > > > > > > That's right. If one message in a batch exceeds the size
> > > limit,
> > > > > the
> > > > > > > > whole
> > > > > > > > > > batch is rejected.
> > > > > > > > > >
> > > > > > > > > > When determining message.max.bytes, the most important
> > thing
> > > to
> > > > > > > > consider
> > > > > > > > > is
> > > > > > > > > > probably memory since currently we need to allocate
> memory
> > > for
> > > > a
> > > > > > full
> > > > > > > > > > message in the broker and the producer and the consumer
> > > client.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > > > > > > alexis.mi...@airbedandbreakfast.com <javascript:;>>
> wrote:
> > > > > > > > > >
> > > > > > > > > > > am I miss reading this loop:
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > > > > > > >
> > > > > > > > > > > it seems like all messages from `validMessages` (which
> is
> > > > > > > > > > > ByteBufferMessageSet) are NOT appended if one of the
> > > message
> > > > > size
> > > > > > > > > exceeds
> > > > > > > > > > > the limit.
> > > > > > > > > > >
> > > > > > > > > > > I hope I'm missing something.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > > > > > > > alexis.mi...@airbedandbreakfast.com <javascript:;>>
> > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Jun,
> > > > > > > > > > > >
> > > > > > > > > > > > thanks for you answer.
> > > > > > > > > > > > Unfortunately the size won't help much, I'd like to
> see
> > > the
> > > > > > > actual
> > > > > > > > > > > message
> > > > > > > > > > > > data.
> > > > > > > > > > > >
> > > > > > > > > > > > By the way what are the things to consider when
> > deciding
> > > on
> > > > > > > > > > > > `message.max.bytes` value?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <
> > > jun...@gmail.com
> > > > > > > > <javascript:;>> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> The message size check is currently only done on the
> > > > broker.
> > > > > > If
> > > > > > > > you
> > > > > > > > > > > enable
> > > > > > > > > > > >> trace level logging in RequestChannel, you will see
> > the
> > > > > > produce
> > > > > > > > > > request,
> > > > > > > > > > > >> which includes the size of each partition.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Thanks,
> > > > > > > > > > > >>
> > > > > > > > > > > >> Jun
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > > > > > > > > >> alexis.mi...@airbedandbreakfast.com <javascript:;>>
> > > > wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >> > Hello,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > my brokers are reporting that some received
> messages
> > > > > exceed
> > > > > > > the
> > > > > > > > > > > >> > `message.max.bytes` value.
> > > > > > > > > > > >> > I'd like to know what producers are at fault but
> It
> > is
> > > > > > pretty
> > > > > > > > much
> > > > > > > > > > > >> > impossible:
> > > > > > > > > > > >> > - the brokers don't log the content of the
> rejected
> > > > > messages
> > > > > > > > > > > >> > - the log messages do not contain the IP of the
> > > > producers
> > > > > > > > > > > >> > - on the consumer side, no exception is thrown
> > (afaik
> > > it
> > > > > is
> > > > > > > > > because
> > > > > > > > > > > >> Ack-0
> > > > > > > > > > > >> > is used). The only kind of notification is to
> closed
> > > the
> > > > > > > > > connection.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [1] Do you have any suggestions to track down the
> > > guilty
> > > > > > > > producers
> > > > > > > > > > or
> > > > > > > > > > > >> find
> > > > > > > > > > > >> > out the message content?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Even though it makes total sense to have the limit
> > > > defined
> > > > > > and
> > > > > > > > > > applied
> > > > > > > > > > > >> on
> > > > > > > > > > > >> > the brokers, I was thinking that this check could
> > also
> > > > be
> > > > > > > > applied
> > > > > > > > > by
> > > > > > > > > > > the
> > > > > > > > > > > >> > producers. Some google results suggest that
> > > > > > > `message.max.bytes`
> > > > > > > > > > might
> > > > > > > > > > > be
> > > > > > > > > > > >> > used by the producers but I can't find any trace
> of
> > > that
> > > > > > > > behavior
> > > > > > > > > in
> > > > > > > > > > > the
> > > > > > > > > > > >> > code.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The closest thing I have is
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > > > > > > >> > but it simply logs the message size and content
> and
> > > the
> > > > > log
> > > > > > > > level
> > > > > > > > > is
> > > > > > > > > > > >> trace.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [2] could you please confirm if such a
> producer-side
> > > > check
> > > > > > > > exists?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > thanks!
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Alexis
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to