If producer doesn't get a response, retries but both produce-requests succeeded, you will get duplicates. Kafka does not have a Idempotent Producer.
On Fri, Nov 13, 2015 at 4:35 AM, Prabhjot Bharaj <prabhbha...@gmail.com> wrote: > Hi Gwen, > > If producer cant get a response but the message got committed, Because of > retries, can this producer be committing messages more than once?? > I'm trying to see this problem from the point of view of Idempotent > Producer > > Thanks, > Prabhjot > On Nov 13, 2015 1:26 PM, "Hawin Jiang" <hawin.ji...@gmail.com> wrote: > >> Thanks Gwen for your excellent slides >> >> I will test it again based on your suggestions. >> >> >> >> >> Best regards >> Hawin >> >> On Thu, Nov 12, 2015 at 6:35 PM, Gwen Shapira <g...@confluent.io> wrote: >> >> > Hi, >> > >> > First, here's a handy slide-deck on avoiding data loss in Kafka: >> > >> > >> http://www.slideshare.net/gwenshap/kafka-reliability-when-it-absolutely-positively-has-to-be-there >> > >> > Note configuration parameters like the number of retries. >> > >> > Also, it looks like you are sending data to Kafka asynchronously, but >> you >> > don't have a callback - so if Send fails, you have no way of updating >> the >> > count, logging or anything else. I recommend taking a look at the >> callback >> > API for sending data and improving your test to at least record send >> > failures. >> > >> > Gwen >> > >> > >> > On Thu, Nov 12, 2015 at 11:10 AM, Hawin Jiang <hawin.ji...@gmail.com> >> > wrote: >> > >> > > Hi Pradeep >> > > >> > > Here is my configuration >> > > >> > > ############################# Producer Basics >> > ############################# >> > > >> > > # list of brokers used for bootstrapping knowledge about the rest of >> the >> > > cluster >> > > # format: host1:port1,host2:port2 ... >> > > metadata.broker.list=localhost:9092 >> > > >> > > # name of the partitioner class for partitioning events; default >> > partition >> > > spreads data randomly >> > > #partitioner.class= >> > > >> > > # specifies whether the messages are sent asynchronously (async) or >> > > synchronously (sync) >> > > producer.type=sync >> > > >> > > # specify the compression codec for all data generated: none, gzip, >> > snappy, >> > > lz4. >> > > # the old config values work as well: 0, 1, 2, 3 for none, gzip, >> snappy, >> > > lz4, respectively >> > > compression.codec=none >> > > >> > > # message encoder >> > > serializer.class=kafka.serializer.DefaultEncoder >> > > >> > > # allow topic level compression >> > > #compressed.topics= >> > > >> > > ############################# Async Producer >> > ############################# >> > > # maximum time, in milliseconds, for buffering data on the producer >> queue >> > > #queue.buffering.max.ms= >> > > >> > > # the maximum size of the blocking queue for buffering on the producer >> > > #queue.buffering.max.messages= >> > > >> > > # Timeout for event enqueue: >> > > # 0: events will be enqueued immediately or dropped if the queue is >> full >> > > # -ve: enqueue will block indefinitely if the queue is full >> > > # +ve: enqueue will block up to this many milliseconds if the queue is >> > full >> > > #queue.enqueue.timeout.ms= >> > > >> > > # the number of messages batched at the producer >> > > #batch.num.messages= >> > > >> > > >> > > >> > > @Jinxing >> > > >> > > I have not found the kafka 0.8.3.0 on >> > > http://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10. >> > > The latest release is 0.8.2.2. >> > > if we flush producer all the times, I think it will impact our >> > > performance. >> > > I just want to make sure how many messages in producer, then can be >> found >> > > it in consumer as well. >> > > We can not lost so many messages in our system. >> > > >> > > >> > > >> > > Best regards >> > > Hawin >> > > >> > > >> > > On Thu, Nov 12, 2015 at 7:02 AM, jinxing <jinxing6...@126.com> wrote: >> > > >> > > > i have 3 brokers; >> > > > the ack configuration is -1(all), meaning a message is sent >> > successfully >> > > > only after getting every broker's ack; >> > > > is this a bug? >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > At 2015-11-12 21:08:49, "Pradeep Gollakota" <pradeep...@gmail.com> >> > > wrote: >> > > > >What is your producer configuration? Specifically, how many acks >> are >> > you >> > > > >requesting from Kafka? >> > > > > >> > > > >On Thu, Nov 12, 2015 at 2:03 AM, jinxing <jinxing6...@126.com> >> wrote: >> > > > > >> > > > >> in kafka_0.8.3.0: >> > > > >> kafkaProducer = new KafkaProducer<>(properties, new >> > > > ByteArraySerializer(), >> > > > >> new ByteArraySerializer()); >> > > > >> kafkaProducer.flush(); >> > > > >> you can call the flush after sending every few messages; >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> At 2015-11-12 17:36:24, "Hawin Jiang" <hawin.ji...@gmail.com> >> > wrote: >> > > > >> >Hi Prabhjot >> > > > >> > >> > > > >> >The messages are "Thread1_kafka_1" and "Thread2_kafka_1". >> Something >> > > > like >> > > > >> >that. >> > > > >> > >> > > > >> >For GetOffsetShell report below: >> > > > >> > >> > > > >> >[kafka@dn-01 bin]$ ./kafka-run-class.sh >> kafka.tools.GetOffsetShell >> > > > >> >--broker-list dn-01:9092 --time -1 --topic kafka-test >> > > > >> >kafka-test:0:12529261 >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> >@Jinxing >> > > > >> > >> > > > >> >Can you share your flush example to me? How to avoid lost my >> > > messages? >> > > > >> >Thanks. >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> >Best regards >> > > > >> >Hawin >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> >On Thu, Nov 12, 2015 at 1:00 AM, jinxing <jinxing6...@126.com> >> > > wrote: >> > > > >> > >> > > > >> >> there is a flush api of the producer, you can call this to >> > prevent >> > > > >> >> messages lost; >> > > > >> >> >> > > > >> >> >> > > > >> >> maybe it can help; >> > > > >> >> >> > > > >> >> >> > > > >> >> >> > > > >> >> >> > > > >> >> >> > > > >> >> >> > > > >> >> >> > > > >> >> >> > > > >> >> At 2015-11-12 16:43:54, "Hawin Jiang" <hawin.ji...@gmail.com> >> > > wrote: >> > > > >> >> >Hi Jinxing >> > > > >> >> > >> > > > >> >> >I don't think we can resolve this issue by increasing >> producers. >> > > > if I >> > > > >> >> >increased more producers, it should lost more messages. >> > > > >> >> > >> > > > >> >> >I just test two producers. >> > > > >> >> >Thread Producer 1 has 83064 messages in producer side and >> 82273 >> > > > >> messages >> > > > >> >> in >> > > > >> >> >consumer side >> > > > >> >> >Thread Producer 2 has 89844 messages in producer side and >> 88892 >> > > > >> messages >> > > > >> >> in >> > > > >> >> >consumer side. >> > > > >> >> > >> > > > >> >> >Thanks. >> > > > >> >> > >> > > > >> >> > >> > > > >> >> > >> > > > >> >> >Best regards >> > > > >> >> >Hawin >> > > > >> >> > >> > > > >> >> > >> > > > >> >> >On Thu, Nov 12, 2015 at 12:24 AM, jinxing < >> jinxing6...@126.com> >> > > > wrote: >> > > > >> >> > >> > > > >> >> >> maybe there some changes in 0.9.0.0; >> > > > >> >> >> >> > > > >> >> >> >> > > > >> >> >> but still you can try increase producer sending rate, and >> see >> > if >> > > > >> there >> > > > >> >> are >> > > > >> >> >> message lost but no exception; >> > > > >> >> >> >> > > > >> >> >> >> > > > >> >> >> note that, to increase the producer sending rate, you must >> > have >> > > > >> enough >> > > > >> >> >> producer 'power'; >> > > > >> >> >> >> > > > >> >> >> >> > > > >> >> >> in my case, I have 50 producer sending message at the same >> > time >> > > > : ) >> > > > >> >> >> >> > > > >> >> >> >> > > > >> >> >> >> > > > >> >> >> >> > > > >> >> >> >> > > > >> >> >> >> > > > >> >> >> >> > > > >> >> >> At 2015-11-12 16:16:23, "Hawin Jiang" < >> hawin.ji...@gmail.com> >> > > > wrote: >> > > > >> >> >> >Hi Jinxing >> > > > >> >> >> > >> > > > >> >> >> >I am using kafka_2.10-0.9.0.0-SNAPSHOT. I have downloaded >> > > source >> > > > >> code >> > > > >> >> and >> > > > >> >> >> >installed it last week. >> > > > >> >> >> > >> > > > >> >> >> >I saw 97446 messages have been sent to kafka successfully. >> > > > >> >> >> > >> > > > >> >> >> >So far, I have not found any failed messages. >> > > > >> >> >> > >> > > > >> >> >> > >> > > > >> >> >> > >> > > > >> >> >> >Best regards >> > > > >> >> >> >Hawin >> > > > >> >> >> > >> > > > >> >> >> >On Thu, Nov 12, 2015 at 12:07 AM, jinxing < >> > jinxing6...@126.com >> > > > >> > > > >> wrote: >> > > > >> >> >> > >> > > > >> >> >> >> Hi, what version are you using ? >> > > > >> >> >> >> >> > > > >> >> >> >> >> > > > >> >> >> >> i am using 0.8.2.0, and I encountered this problem >> before; >> > > > >> >> >> >> >> > > > >> >> >> >> >> > > > >> >> >> >> it seems that if the message rate of the producer side >> is >> > to >> > > > high, >> > > > >> >> some >> > > > >> >> >> of >> > > > >> >> >> >> the messages will lost; >> > > > >> >> >> >> >> > > > >> >> >> >> >> > > > >> >> >> >> also, i found that the callback method of the producer >> > 'send' >> > > > API >> > > > >> is >> > > > >> >> not >> > > > >> >> >> >> reliable; >> > > > >> >> >> >> >> > > > >> >> >> >> >> > > > >> >> >> >> only successful sent message will trigger the callback, >> but >> > > the >> > > > >> >> failed >> > > > >> >> >> >> ones don't; >> > > > >> >> >> >> >> > > > >> >> >> >> >> > > > >> >> >> >> you saw this? >> > > > >> >> >> >> >> > > > >> >> >> >> >> > > > >> >> >> >> >> > > > >> >> >> >> >> > > > >> >> >> >> >> > > > >> >> >> >> >> > > > >> >> >> >> >> > > > >> >> >> >> >> > > > >> >> >> >> At 2015-11-12 16:01:17, "Hawin Jiang" < >> > hawin.ji...@gmail.com >> > > > >> > > > >> wrote: >> > > > >> >> >> >> >Hi All >> > > > >> >> >> >> > >> > > > >> >> >> >> >I have sent messages to Kafka for one minute. I found >> > 97446 >> > > > >> >> messages >> > > > >> >> >> in >> > > > >> >> >> >> >producer side and 96896 messages in consumer side for >> Case >> > > 1. >> > > > >> >> >> >> >I also tried case 2. I have faced the same issues. The >> > > > number is >> > > > >> >> not >> > > > >> >> >> >> match >> > > > >> >> >> >> >between producer and consumer. >> > > > >> >> >> >> >Can someone take a look at this issue? >> > > > >> >> >> >> >Thanks. >> > > > >> >> >> >> > >> > > > >> >> >> >> > >> > > > >> >> >> >> >Case 1: >> > > > >> >> >> >> > >> > > > >> >> >> >> >long startTime = System.currentTimeMillis(); >> > > > >> >> >> >> >long maxDurationInMilliseconds = 1 * 60 * 1000; >> > > > >> >> >> >> >int messageNo = 0; >> > > > >> >> >> >> >while (true) { >> > > > >> >> >> >> >if (System.currentTimeMillis() <= startTime >> > > > >> >> >> >> >+ maxDurationInMilliseconds) { >> > > > >> >> >> >> >messageNo++; >> > > > >> >> >> >> >String messageStr = "KAFKA_"+messageNo; >> > > > >> >> >> >> >System.out.println("Message: "+messageNo); >> > > > >> >> >> >> >producer.send(new KeyedMessage<Integer, >> > > > >> String>(topic,messageStr)); >> > > > >> >> >> >> >} else { >> > > > >> >> >> >> >producer.close(); >> > > > >> >> >> >> >System.out.println("Total kafka Message: "+messageNo); >> > > > >> >> >> >> >break; >> > > > >> >> >> >> >} >> > > > >> >> >> >> >} >> > > > >> >> >> >> > >> > > > >> >> >> >> > >> > > > >> >> >> >> >Case 2: >> > > > >> >> >> >> > >> > > > >> >> >> >> >for (int i=1;i<=12000;i++) >> > > > >> >> >> >> >String messageStr = "KAFKA_"+i; >> > > > >> >> >> >> >producer.send(new KeyedMessage<Integer, >> > > > >> String>(topic,messageStr)); >> > > > >> >> >> >> > >> > > > >> >> >> >> > >> > > > >> >> >> >> > >> > > > >> >> >> >> >Best regards >> > > > >> >> >> >> >Hawin >> > > > >> >> >> >> >> > > > >> >> >> >> > > > >> >> >> > > > >> >> > > > >> > > >> > >> >