Hi Jun, Is there any plug-ability that Developer can customize batching logic or inject custom code for this ? Shall I file Jira for this issues.
Thanks, Bhavesh On Tue, Sep 9, 2014 at 3:52 PM, Jun Rao <jun...@gmail.com> wrote: > No, the new producer doesn't address that problem. > > Thanks, > > Jun > > On Tue, Sep 9, 2014 at 12:59 PM, Bhavesh Mistry < > mistry.p.bhav...@gmail.com> > wrote: > > > HI Jun, > > > > Thanks for clarification. Follow up questions, does new producer solve > the > > issues highlight. In event of compression and async mode in new > producer, > > will it break down messages to this UPPER limit and submit or new > producer > > strictly honor batch size. I am just asking if compression batch size > > message reaches this configured limit, does batch broken down to sub > batch > > that is within limit. I would like to minimize the data loss due to this > > limit. > > > > > > Thanks, > > > > Bhavesh > > > > On Mon, Sep 8, 2014 at 9:17 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > Are you using compression in the producer? If so, message.max.bytes > > applies > > > to the compressed size of a batch of messages. Otherwise, > > message.max.bytes > > > applies to the size of each individual message. > > > > > > Thanks, > > > > > > Jun > > > > > > On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry < > > mistry.p.bhav...@gmail.com > > > > > > > wrote: > > > > > > > I am referring to wiki http://kafka.apache.org/08/configuration.html > > and > > > > following parameter control max batch message bytes as far as I know. > > > > Kafka Community, please correct me if I am wrong. I do not want to > > > create > > > > confusion for Kafka User Community here. Also, if you increase this > > > limit > > > > than you have to set the corresponding limit increase on consumer > side > > as > > > > well (fetch.message.max.bytes). > > > > > > > > Since we are using batch async mode, our messages are getting drop > > > sometime > > > > if the entire batch bytes exceed this limit so I was asking Kafka > > > > Developers if any optimal way to determine the batch size based on > this > > > > limit to minimize the data loss. Because, entire batch is rejected by > > > > brokers. > > > > > > > > message.max.bytes 1000000 The maximum size of a message that the > server > > > can > > > > receive. It is important that this property be in sync with the > maximum > > > > fetch size your consumers use or else an unruly producer will be able > > to > > > > publish messages too large for consumers to consume. > > > > > > > > Thanks, > > > > > > > > Bhavesh > > > > > > > > > > > > On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon < > > > > alexis.mi...@airbedandbreakfast.com> wrote: > > > > > > > > > Hi Bhavesh > > > > > > > > > > can you explain what limit you're referring to? > > > > > I'm asking because `message.max.bytes` is applied per message not > per > > > > > batch. > > > > > is there another limit I should be aware of? > > > > > > > > > > thanks > > > > > > > > > > > > > > > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry < > > > > mistry.p.bhav...@gmail.com > > > > > > > > > > > wrote: > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > We have similar problem. We have variable length of messages. > So > > > when > > > > > we > > > > > > have fixed size of Batch sometime the batch exceed the limit set > on > > > the > > > > > > brokers (2MB). > > > > > > > > > > > > So can Producer have some extra logic to determine the optimal > > batch > > > > size > > > > > > by looking at configured message.max.bytes value. > > > > > > > > > > > > During the metadata update, Producer will get this value from the > > > > Broker > > > > > > for each topic and Producer will check if current batch size > reach > > > this > > > > > > limit than break batch into smaller chunk such way that It would > > not > > > > > exceed > > > > > > limit (unless single message exceed the limit). Basically try to > > > avoid > > > > > data > > > > > > loss as much as possible. > > > > > > > > > > > > Please let me know what is your opinion on this... > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Bhavesh > > > > > > > > > > > > > > > > > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon < > > > > > > alexis.mi...@airbedandbreakfast.com> wrote: > > > > > > > > > > > > > Thanks Jun. > > > > > > > > > > > > > > I'll create a jira and try to provide a patch. I think this is > > > pretty > > > > > > > serious. > > > > > > > > > > > > > > On Friday, August 29, 2014, Jun Rao <jun...@gmail.com> wrote: > > > > > > > > > > > > > > > The goal of batching is mostly to reduce the # RPC calls to > the > > > > > broker. > > > > > > > If > > > > > > > > compression is enabled, a larger batch typically implies > better > > > > > > > compression > > > > > > > > ratio. > > > > > > > > > > > > > > > > The reason that we have to fail the whole batch is that the > > error > > > > > code > > > > > > in > > > > > > > > the produce response is per partition, instead of per > message. > > > > > > > > > > > > > > > > Retrying individual messages on MessageSizeTooLarge seems > > > > reasonable. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon < > > > > > > > > alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote: > > > > > > > > > > > > > > > > > Could you explain the goals of batches? I was assuming this > > was > > > > > > simply > > > > > > > a > > > > > > > > > performance optimization, but this behavior makes me think > > I'm > > > > > > missing > > > > > > > > > something. > > > > > > > > > is a batch more than a list of *independent* messages? > > > > > > > > > > > > > > > > > > Why would you reject the whole batch? One invalid message > > > causes > > > > > the > > > > > > > loss > > > > > > > > > of batch.num.messages-1 messages :( > > > > > > > > > It seems pretty critical to me. > > > > > > > > > > > > > > > > > > If ack=0, the producer will never know about it. > > > > > > > > > If ack !=0, the producer will retry the whole batch. If the > > > issue > > > > > was > > > > > > > > > related to data corruption (etc), retries might work. But > in > > > the > > > > > case > > > > > > > of > > > > > > > > > "big message", the batch will always be rejected and the > > > producer > > > > > > will > > > > > > > > give > > > > > > > > > up. > > > > > > > > > > > > > > > > > > If the messages are indeed considered independent, I think > > this > > > > is > > > > > a > > > > > > > > pretty > > > > > > > > > serious issue. > > > > > > > > > > > > > > > > > > I see 2 possible fix approaches: > > > > > > > > > - the broker could reject only the invalid messages > > > > > > > > > - the broker could reject the whole batch (like today) but > > the > > > > > > producer > > > > > > > > (if > > > > > > > > > ack!=0) could retry messages one at a time on exception > like > > > > > > > > > "MessageSizeTooLarge". > > > > > > > > > > > > > > > > > > opinions? > > > > > > > > > > > > > > > > > > Alexis > > > > > > > > > > > > > > > > > > ``` > > > > > > > > > [2014-08-29 16:00:35,170] WARN Produce request with > > correlation > > > > id > > > > > 46 > > > > > > > > > failed due to [test,1]: > > > kafka.common.MessageSizeTooLargeException > > > > > > > > > (kafka.producer.async.DefaultEventHandler) > > > > > > > > > [2014-08-29 16:00:35,284] WARN Produce request with > > correlation > > > > id > > > > > 51 > > > > > > > > > failed due to [test,0]: > > > kafka.common.MessageSizeTooLargeException > > > > > > > > > (kafka.producer.async.DefaultEventHandler) > > > > > > > > > [2014-08-29 16:00:35,392] WARN Produce request with > > correlation > > > > id > > > > > 56 > > > > > > > > > failed due to [test,0]: > > > kafka.common.MessageSizeTooLargeException > > > > > > > > > (kafka.producer.async.DefaultEventHandler) > > > > > > > > > [2014-08-29 16:00:35,499] WARN Produce request with > > correlation > > > > id > > > > > 61 > > > > > > > > > failed due to [test,1]: > > > kafka.common.MessageSizeTooLargeException > > > > > > > > > (kafka.producer.async.DefaultEventHandler) > > > > > > > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for > > > > topics > > > > > > test > > > > > > > > > with correlation ids in [43,62] > > > > > > > > (kafka.producer.async.DefaultEventHandler) > > > > > > > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of > 3 > > > > events > > > > > > > > > (kafka.producer.async.ProducerSendThread) > > > > > > > > > kafka.common.FailedToSendMessageException: Failed to send > > > > messages > > > > > > > after > > > > > > > > 3 > > > > > > > > > tries. > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <jun...@gmail.com > > > > > > > > <javascript:;>> wrote: > > > > > > > > > > > > > > > > > > > That's right. If one message in a batch exceeds the size > > > limit, > > > > > the > > > > > > > > whole > > > > > > > > > > batch is rejected. > > > > > > > > > > > > > > > > > > > > When determining message.max.bytes, the most important > > thing > > > to > > > > > > > > consider > > > > > > > > > is > > > > > > > > > > probably memory since currently we need to allocate > memory > > > for > > > > a > > > > > > full > > > > > > > > > > message in the broker and the producer and the consumer > > > client. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon < > > > > > > > > > > alexis.mi...@airbedandbreakfast.com <javascript:;>> > wrote: > > > > > > > > > > > > > > > > > > > > > am I miss reading this loop: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269 > > > > > > > > > > > > > > > > > > > > > > it seems like all messages from `validMessages` (which > is > > > > > > > > > > > ByteBufferMessageSet) are NOT appended if one of the > > > message > > > > > size > > > > > > > > > exceeds > > > > > > > > > > > the limit. > > > > > > > > > > > > > > > > > > > > > > I hope I'm missing something. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon < > > > > > > > > > > > alexis.mi...@airbedandbreakfast.com <javascript:;>> > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > > > > > > > thanks for you answer. > > > > > > > > > > > > Unfortunately the size won't help much, I'd like to > see > > > the > > > > > > > actual > > > > > > > > > > > message > > > > > > > > > > > > data. > > > > > > > > > > > > > > > > > > > > > > > > By the way what are the things to consider when > > deciding > > > on > > > > > > > > > > > > `message.max.bytes` value? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao < > > > jun...@gmail.com > > > > > > > > <javascript:;>> wrote: > > > > > > > > > > > > > > > > > > > > > > > >> The message size check is currently only done on the > > > > broker. > > > > > > If > > > > > > > > you > > > > > > > > > > > enable > > > > > > > > > > > >> trace level logging in RequestChannel, you will see > > the > > > > > > produce > > > > > > > > > > request, > > > > > > > > > > > >> which includes the size of each partition. > > > > > > > > > > > >> > > > > > > > > > > > >> Thanks, > > > > > > > > > > > >> > > > > > > > > > > > >> Jun > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon < > > > > > > > > > > > >> alexis.mi...@airbedandbreakfast.com <javascript:;>> > > > > wrote: > > > > > > > > > > > >> > > > > > > > > > > > >> > Hello, > > > > > > > > > > > >> > > > > > > > > > > > > >> > my brokers are reporting that some received > messages > > > > > exceed > > > > > > > the > > > > > > > > > > > >> > `message.max.bytes` value. > > > > > > > > > > > >> > I'd like to know what producers are at fault but > It > > is > > > > > > pretty > > > > > > > > much > > > > > > > > > > > >> > impossible: > > > > > > > > > > > >> > - the brokers don't log the content of the > rejected > > > > > messages > > > > > > > > > > > >> > - the log messages do not contain the IP of the > > > > producers > > > > > > > > > > > >> > - on the consumer side, no exception is thrown > > (afaik > > > it > > > > > is > > > > > > > > > because > > > > > > > > > > > >> Ack-0 > > > > > > > > > > > >> > is used). The only kind of notification is to > closed > > > the > > > > > > > > > connection. > > > > > > > > > > > >> > > > > > > > > > > > > >> > [1] Do you have any suggestions to track down the > > > guilty > > > > > > > > producers > > > > > > > > > > or > > > > > > > > > > > >> find > > > > > > > > > > > >> > out the message content? > > > > > > > > > > > >> > > > > > > > > > > > > >> > Even though it makes total sense to have the limit > > > > defined > > > > > > and > > > > > > > > > > applied > > > > > > > > > > > >> on > > > > > > > > > > > >> > the brokers, I was thinking that this check could > > also > > > > be > > > > > > > > applied > > > > > > > > > by > > > > > > > > > > > the > > > > > > > > > > > >> > producers. Some google results suggest that > > > > > > > `message.max.bytes` > > > > > > > > > > might > > > > > > > > > > > be > > > > > > > > > > > >> > used by the producers but I can't find any trace > of > > > that > > > > > > > > behavior > > > > > > > > > in > > > > > > > > > > > the > > > > > > > > > > > >> > code. > > > > > > > > > > > >> > > > > > > > > > > > > >> > The closest thing I have is > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67 > > > > > > > > > > > >> > but it simply logs the message size and content > and > > > the > > > > > log > > > > > > > > level > > > > > > > > > is > > > > > > > > > > > >> trace. > > > > > > > > > > > >> > > > > > > > > > > > > >> > [2] could you please confirm if such a > producer-side > > > > check > > > > > > > > exists? > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > thanks! > > > > > > > > > > > >> > > > > > > > > > > > > >> > Alexis > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >