[ https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989904#comment-16989904 ]
Boyang Chen commented on KAFKA-9280: ------------------------------------ I think the log for consumer is missing. There is no evidence for `Before receiving ACK from third replica, leader sent the message to consumer.` Also to be precise, leader doesn't send anything to either replica or consumer. It waits for the fetch requests from other parties to advance the high watermark. > Duplicate messages are observed in ACK mode ALL > ----------------------------------------------- > > Key: KAFKA-9280 > URL: https://issues.apache.org/jira/browse/KAFKA-9280 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.2.1 > Reporter: VIkram > Priority: Major > > In ack mode ALL, leader is sending the message to consumer even before > receiving the acknowledgements from other replicas. This can lead to > *+duplicate messages+*. > > Setup details: > * 1 zookeeper, 5 brokers > * Producer: Synchronous > * Topic: 1 partition, replication factor - 3, min isr - 2 > > Say First replica (Leader), Second replica and Third replica are the three > replicas of the topic. > > *Sequence of events:* > a) All brokers are up and running. > b) Clients started running. > c) Kill second replica of the topic. > d) Kill the third replica. Now min isr will not be satisfied. > e) Bring up third replica. Min isr will be satisfied. > > *Breakdown of step 'd':* > # Just before producer sends next message, killed third replica with kill -9 > (Leader takes time ~5sec to detect that the broker is down). > # Producer sent a message to leader. > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer retries sending the same message. (In our application it is > the next integer we send). > # Now when second/third replica is up, leader accepts the message and sends > the same message to consumer. *Thus sending duplicates.* > > > *Logs:* > # 2-3 seconds before producer sends next message, killed third replica with > kill -9 (Leader takes time ~5sec to detect that the broker is down). > _{{{_ > _> kill -9 49596_ > _}}}_ > __ > # Producer sent a message to leader. > _{{{_ > _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: > ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = > [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_ > _}}}_ > > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > _{{{_ > _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, > leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size > = -1, serialized value size = 6, headers = RecordHeaders(headers = [], > isReadOnly = false), key = null, value = p229-4)_ > _}}}_ > __ > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > _{{{_ > _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing > append operation on partition t229-0 (kafka.server.ReplicaManager)_ > _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 > for partition t229-0_ > _}}}_ > > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > _{{{_ > _java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > t229-0:120000 ms_ > _has passed since batch creation_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_ > _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > record(s) for t229-0:120000 ms has passed since batch creation_ > _}}}_ > > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer retries sending the same message. (In our application it is > the next integer we send). > # Now when second/third replica is up, leader accepts the message and sends > the same to consumer. Thus sending duplicates. > > Ideally, in ACK mode all it is expected that leader sends message to consumer > only after it receives ack from all other replicas. But this is not happening. > > +*Question*+ > 1) In ack =all case, Does leader send message to consumer only after all > in-sync followers receive the message? > (or) > will it send message to consumer and then wait for followers acknowledgement? > > +*Observation*+ > For a topic with replication factor > 1, We did a test to measure round trip > time (client1 -> kafka -> client2 -> kafka -> client1) of messages with both > acks = 1 and acks = all , and observed latency to be same in both cases. Is > this expected? -- This message was sent by Atlassian Jira (v8.3.4#803005)