If producer doesn't get a response, retries but both produce-requests
succeeded, you will get duplicates. Kafka does not have a Idempotent
Producer.

On Fri, Nov 13, 2015 at 4:35 AM, Prabhjot Bharaj <prabhbha...@gmail.com>
wrote:

> Hi Gwen,
>
> If producer cant get a response but the message got committed, Because of
> retries, can this producer be committing messages more than once??
> I'm trying to see this problem from the point of view of Idempotent
> Producer
>
> Thanks,
> Prabhjot
> On Nov 13, 2015 1:26 PM, "Hawin Jiang" <hawin.ji...@gmail.com> wrote:
>
>> Thanks Gwen for your excellent slides
>>
>> I will test it again based on your suggestions.
>>
>>
>>
>>
>> Best regards
>> Hawin
>>
>> On Thu, Nov 12, 2015 at 6:35 PM, Gwen Shapira <g...@confluent.io> wrote:
>>
>> > Hi,
>> >
>> > First, here's a handy slide-deck on avoiding data loss in Kafka:
>> >
>> >
>> http://www.slideshare.net/gwenshap/kafka-reliability-when-it-absolutely-positively-has-to-be-there
>> >
>> > Note configuration parameters like the number of retries.
>> >
>> > Also, it looks like you are sending data to Kafka asynchronously, but
>> you
>> > don't have a callback - so if Send fails, you have no way of updating
>> the
>> > count, logging or anything else. I recommend taking a look at the
>> callback
>> > API for sending data and improving your test to at least record send
>> > failures.
>> >
>> > Gwen
>> >
>> >
>> > On Thu, Nov 12, 2015 at 11:10 AM, Hawin Jiang <hawin.ji...@gmail.com>
>> > wrote:
>> >
>> > > Hi  Pradeep
>> > >
>> > > Here is my configuration
>> > >
>> > > ############################# Producer Basics
>> > #############################
>> > >
>> > > # list of brokers used for bootstrapping knowledge about the rest of
>> the
>> > > cluster
>> > > # format: host1:port1,host2:port2 ...
>> > > metadata.broker.list=localhost:9092
>> > >
>> > > # name of the partitioner class for partitioning events; default
>> > partition
>> > > spreads data randomly
>> > > #partitioner.class=
>> > >
>> > > # specifies whether the messages are sent asynchronously (async) or
>> > > synchronously (sync)
>> > > producer.type=sync
>> > >
>> > > # specify the compression codec for all data generated: none, gzip,
>> > snappy,
>> > > lz4.
>> > > # the old config values work as well: 0, 1, 2, 3 for none, gzip,
>> snappy,
>> > > lz4, respectively
>> > > compression.codec=none
>> > >
>> > > # message encoder
>> > > serializer.class=kafka.serializer.DefaultEncoder
>> > >
>> > > # allow topic level compression
>> > > #compressed.topics=
>> > >
>> > > ############################# Async Producer
>> > #############################
>> > > # maximum time, in milliseconds, for buffering data on the producer
>> queue
>> > > #queue.buffering.max.ms=
>> > >
>> > > # the maximum size of the blocking queue for buffering on the producer
>> > > #queue.buffering.max.messages=
>> > >
>> > > # Timeout for event enqueue:
>> > > # 0: events will be enqueued immediately or dropped if the queue is
>> full
>> > > # -ve: enqueue will block indefinitely if the queue is full
>> > > # +ve: enqueue will block up to this many milliseconds if the queue is
>> > full
>> > > #queue.enqueue.timeout.ms=
>> > >
>> > > # the number of messages batched at the producer
>> > > #batch.num.messages=
>> > >
>> > >
>> > >
>> > > @Jinxing
>> > >
>> > > I have not found the kafka 0.8.3.0 on
>> > > http://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10.
>> > > The latest release is 0.8.2.2.
>> > > if we flush producer all the times,  I think it will impact our
>> > > performance.
>> > > I just want to make sure how many messages in producer, then can be
>> found
>> > > it in consumer as well.
>> > > We can not lost so many messages in our system.
>> > >
>> > >
>> > >
>> > > Best regards
>> > > Hawin
>> > >
>> > >
>> > > On Thu, Nov 12, 2015 at 7:02 AM, jinxing <jinxing6...@126.com> wrote:
>> > >
>> > > > i have 3 brokers;
>> > > > the ack configuration is -1(all), meaning a message is sent
>> > successfully
>> > > > only after getting every broker's  ack;
>> > > > is this a bug?
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > At 2015-11-12 21:08:49, "Pradeep Gollakota" <pradeep...@gmail.com>
>> > > wrote:
>> > > > >What is your producer configuration? Specifically, how many acks
>> are
>> > you
>> > > > >requesting from Kafka?
>> > > > >
>> > > > >On Thu, Nov 12, 2015 at 2:03 AM, jinxing <jinxing6...@126.com>
>> wrote:
>> > > > >
>> > > > >> in kafka_0.8.3.0:
>> > > > >> kafkaProducer = new KafkaProducer<>(properties, new
>> > > > ByteArraySerializer(),
>> > > > >> new ByteArraySerializer());
>> > > > >> kafkaProducer.flush();
>> > > > >> you can call the flush after sending every few messages;
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >> At 2015-11-12 17:36:24, "Hawin Jiang" <hawin.ji...@gmail.com>
>> > wrote:
>> > > > >> >Hi  Prabhjot
>> > > > >> >
>> > > > >> >The messages are "Thread1_kafka_1" and "Thread2_kafka_1".
>> Something
>> > > > like
>> > > > >> >that.
>> > > > >> >
>> > > > >> >For GetOffsetShell report below:
>> > > > >> >
>> > > > >> >[kafka@dn-01 bin]$ ./kafka-run-class.sh
>> kafka.tools.GetOffsetShell
>> > > > >> >--broker-list dn-01:9092 --time -1 --topic kafka-test
>> > > > >> >kafka-test:0:12529261
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> >@Jinxing
>> > > > >> >
>> > > > >> >Can you share your flush example to me?  How to avoid lost my
>> > > messages?
>> > > > >> >Thanks.
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> >Best regards
>> > > > >> >Hawin
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> >On Thu, Nov 12, 2015 at 1:00 AM, jinxing <jinxing6...@126.com>
>> > > wrote:
>> > > > >> >
>> > > > >> >> there is a flush api of the producer, you can call this to
>> > prevent
>> > > > >> >> messages lost;
>> > > > >> >>
>> > > > >> >>
>> > > > >> >> maybe it can help;
>> > > > >> >>
>> > > > >> >>
>> > > > >> >>
>> > > > >> >>
>> > > > >> >>
>> > > > >> >>
>> > > > >> >>
>> > > > >> >>
>> > > > >> >> At 2015-11-12 16:43:54, "Hawin Jiang" <hawin.ji...@gmail.com>
>> > > wrote:
>> > > > >> >> >Hi  Jinxing
>> > > > >> >> >
>> > > > >> >> >I don't think we can resolve this issue by increasing
>> producers.
>> > > > if I
>> > > > >> >> >increased more producers, it should lost more messages.
>> > > > >> >> >
>> > > > >> >> >I just test two producers.
>> > > > >> >> >Thread Producer 1 has 83064 messages in producer side and
>> 82273
>> > > > >> messages
>> > > > >> >> in
>> > > > >> >> >consumer side
>> > > > >> >> >Thread Producer 2 has 89844 messages in producer side and
>> 88892
>> > > > >> messages
>> > > > >> >> in
>> > > > >> >> >consumer side.
>> > > > >> >> >
>> > > > >> >> >Thanks.
>> > > > >> >> >
>> > > > >> >> >
>> > > > >> >> >
>> > > > >> >> >Best regards
>> > > > >> >> >Hawin
>> > > > >> >> >
>> > > > >> >> >
>> > > > >> >> >On Thu, Nov 12, 2015 at 12:24 AM, jinxing <
>> jinxing6...@126.com>
>> > > > wrote:
>> > > > >> >> >
>> > > > >> >> >> maybe there some changes in 0.9.0.0;
>> > > > >> >> >>
>> > > > >> >> >>
>> > > > >> >> >> but still you can try increase producer sending rate, and
>> see
>> > if
>> > > > >> there
>> > > > >> >> are
>> > > > >> >> >> message lost but no exception;
>> > > > >> >> >>
>> > > > >> >> >>
>> > > > >> >> >> note that, to increase the producer sending rate, you must
>> > have
>> > > > >> enough
>> > > > >> >> >> producer 'power';
>> > > > >> >> >>
>> > > > >> >> >>
>> > > > >> >> >> in my case,  I have 50 producer sending message at the same
>> > time
>> > > > : )
>> > > > >> >> >>
>> > > > >> >> >>
>> > > > >> >> >>
>> > > > >> >> >>
>> > > > >> >> >>
>> > > > >> >> >>
>> > > > >> >> >>
>> > > > >> >> >> At 2015-11-12 16:16:23, "Hawin Jiang" <
>> hawin.ji...@gmail.com>
>> > > > wrote:
>> > > > >> >> >> >Hi  Jinxing
>> > > > >> >> >> >
>> > > > >> >> >> >I am using kafka_2.10-0.9.0.0-SNAPSHOT.  I have downloaded
>> > > source
>> > > > >> code
>> > > > >> >> and
>> > > > >> >> >> >installed it last week.
>> > > > >> >> >> >
>> > > > >> >> >> >I saw 97446 messages have been sent to kafka successfully.
>> > > > >> >> >> >
>> > > > >> >> >> >So far, I have not found any failed messages.
>> > > > >> >> >> >
>> > > > >> >> >> >
>> > > > >> >> >> >
>> > > > >> >> >> >Best regards
>> > > > >> >> >> >Hawin
>> > > > >> >> >> >
>> > > > >> >> >> >On Thu, Nov 12, 2015 at 12:07 AM, jinxing <
>> > jinxing6...@126.com
>> > > >
>> > > > >> wrote:
>> > > > >> >> >> >
>> > > > >> >> >> >> Hi, what version are you using ?
>> > > > >> >> >> >>
>> > > > >> >> >> >>
>> > > > >> >> >> >> i am using 0.8.2.0, and I encountered this problem
>> before;
>> > > > >> >> >> >>
>> > > > >> >> >> >>
>> > > > >> >> >> >> it seems that if the message rate of the producer side
>> is
>> > to
>> > > > high,
>> > > > >> >> some
>> > > > >> >> >> of
>> > > > >> >> >> >> the messages will lost;
>> > > > >> >> >> >>
>> > > > >> >> >> >>
>> > > > >> >> >> >> also, i found that the callback method of the producer
>> > 'send'
>> > > > API
>> > > > >> is
>> > > > >> >> not
>> > > > >> >> >> >> reliable;
>> > > > >> >> >> >>
>> > > > >> >> >> >>
>> > > > >> >> >> >> only successful sent message will trigger the callback,
>> but
>> > > the
>> > > > >> >> failed
>> > > > >> >> >> >> ones don't;
>> > > > >> >> >> >>
>> > > > >> >> >> >>
>> > > > >> >> >> >> you saw this?
>> > > > >> >> >> >>
>> > > > >> >> >> >>
>> > > > >> >> >> >>
>> > > > >> >> >> >>
>> > > > >> >> >> >>
>> > > > >> >> >> >>
>> > > > >> >> >> >>
>> > > > >> >> >> >>
>> > > > >> >> >> >> At 2015-11-12 16:01:17, "Hawin Jiang" <
>> > hawin.ji...@gmail.com
>> > > >
>> > > > >> wrote:
>> > > > >> >> >> >> >Hi  All
>> > > > >> >> >> >> >
>> > > > >> >> >> >> >I have sent messages to Kafka for one minute.  I found
>> > 97446
>> > > > >> >> messages
>> > > > >> >> >> in
>> > > > >> >> >> >> >producer side and 96896 messages in consumer side for
>> Case
>> > > 1.
>> > > > >> >> >> >> >I also tried case 2. I have faced the same issues.  The
>> > > > number is
>> > > > >> >> not
>> > > > >> >> >> >> match
>> > > > >> >> >> >> >between producer and consumer.
>> > > > >> >> >> >> >Can someone take a look at this issue?
>> > > > >> >> >> >> >Thanks.
>> > > > >> >> >> >> >
>> > > > >> >> >> >> >
>> > > > >> >> >> >> >Case 1:
>> > > > >> >> >> >> >
>> > > > >> >> >> >> >long startTime = System.currentTimeMillis();
>> > > > >> >> >> >> >long maxDurationInMilliseconds = 1 * 60 * 1000;
>> > > > >> >> >> >> >int messageNo = 0;
>> > > > >> >> >> >> >while (true) {
>> > > > >> >> >> >> >if (System.currentTimeMillis() <= startTime
>> > > > >> >> >> >> >+ maxDurationInMilliseconds) {
>> > > > >> >> >> >> >messageNo++;
>> > > > >> >> >> >> >String messageStr = "KAFKA_"+messageNo;
>> > > > >> >> >> >> >System.out.println("Message: "+messageNo);
>> > > > >> >> >> >> >producer.send(new KeyedMessage<Integer,
>> > > > >> String>(topic,messageStr));
>> > > > >> >> >> >> >} else {
>> > > > >> >> >> >> >producer.close();
>> > > > >> >> >> >> >System.out.println("Total kafka Message: "+messageNo);
>> > > > >> >> >> >> >break;
>> > > > >> >> >> >> >}
>> > > > >> >> >> >> >}
>> > > > >> >> >> >> >
>> > > > >> >> >> >> >
>> > > > >> >> >> >> >Case 2:
>> > > > >> >> >> >> >
>> > > > >> >> >> >> >for (int i=1;i<=12000;i++)
>> > > > >> >> >> >> >String messageStr = "KAFKA_"+i;
>> > > > >> >> >> >> >producer.send(new KeyedMessage<Integer,
>> > > > >> String>(topic,messageStr));
>> > > > >> >> >> >> >
>> > > > >> >> >> >> >
>> > > > >> >> >> >> >
>> > > > >> >> >> >> >Best regards
>> > > > >> >> >> >> >Hawin
>> > > > >> >> >> >>
>> > > > >> >> >>
>> > > > >> >>
>> > > > >>
>> > > >
>> > >
>> >
>>
>

Reply via email to