[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL
[ https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040343#comment-17040343 ] Stanislav Kozlovski commented on KAFKA-9280: The example I gave was a fetch request from a follower. If the consumer were to send a fetch request with that offset before all followers' requests were processed, the consumer would likely get nothing because the high watermark offset wouldn't have been raised by the leader > Duplicate messages are observed in ACK mode ALL > --- > > Key: KAFKA-9280 > URL: https://issues.apache.org/jira/browse/KAFKA-9280 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.1 >Reporter: VIkram >Priority: Major > > In ack mode ALL, leader is sending the message to consumer even before > receiving the acknowledgements from other replicas. This can lead to > *+duplicate messages+*. > > Setup details: > * 1 zookeeper, 5 brokers > * Producer: Synchronous > * Topic: 1 partition, replication factor - 3, min isr - 2 > > Say First replica (Leader), Second replica and Third replica are the three > replicas of the topic. > > *Sequence of events:* > a) All brokers are up and running. > b) Clients started running. > c) Kill second replica of the topic. > d) Kill the third replica. Now min isr will not be satisfied. > e) Bring up third replica. Min isr will be satisfied. > > *Breakdown of step 'd':* > # Just before producer sends next message, killed third replica with kill -9 > (Leader takes time ~5sec to detect that the broker is down). > # Producer sent a message to leader. > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer retries sending the same message. (In our application it is > the next integer we send). > # Now when second/third replica is up, leader accepts the message and sends > the same message to consumer. *Thus sending duplicates.* > > > *Logs:* > # 2-3 seconds before producer sends next message, killed third replica with > kill -9 (Leader takes time ~5sec to detect that the broker is down). > _{{{_ > _> kill -9 49596_ > _}}}_ > __ > # Producer sent a message to leader. > _{{{_ > _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: > ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = > [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_ > _}}}_ > > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > _{{{_ > _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, > leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size > = -1, serialized value size = 6, headers = RecordHeaders(headers = [], > isReadOnly = false), key = null, value = p229-4)_ > _}}}_ > __ > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > _{{{_ > _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing > append operation on partition t229-0 (kafka.server.ReplicaManager)_ > _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 > for partition t229-0_ > _}}}_ > > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > _{{{_ > _java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > t229-0:12 ms_ > _has passed since batch creation_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_ > _Caused by:
[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL
[ https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036125#comment-17036125 ] VIkram commented on KAFKA-9280: --- Will the consumer FetchRequest\{from=1000} gets served? > Duplicate messages are observed in ACK mode ALL > --- > > Key: KAFKA-9280 > URL: https://issues.apache.org/jira/browse/KAFKA-9280 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.1 >Reporter: VIkram >Priority: Major > > In ack mode ALL, leader is sending the message to consumer even before > receiving the acknowledgements from other replicas. This can lead to > *+duplicate messages+*. > > Setup details: > * 1 zookeeper, 5 brokers > * Producer: Synchronous > * Topic: 1 partition, replication factor - 3, min isr - 2 > > Say First replica (Leader), Second replica and Third replica are the three > replicas of the topic. > > *Sequence of events:* > a) All brokers are up and running. > b) Clients started running. > c) Kill second replica of the topic. > d) Kill the third replica. Now min isr will not be satisfied. > e) Bring up third replica. Min isr will be satisfied. > > *Breakdown of step 'd':* > # Just before producer sends next message, killed third replica with kill -9 > (Leader takes time ~5sec to detect that the broker is down). > # Producer sent a message to leader. > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer retries sending the same message. (In our application it is > the next integer we send). > # Now when second/third replica is up, leader accepts the message and sends > the same message to consumer. *Thus sending duplicates.* > > > *Logs:* > # 2-3 seconds before producer sends next message, killed third replica with > kill -9 (Leader takes time ~5sec to detect that the broker is down). > _{{{_ > _> kill -9 49596_ > _}}}_ > __ > # Producer sent a message to leader. > _{{{_ > _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: > ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = > [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_ > _}}}_ > > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > _{{{_ > _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, > leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size > = -1, serialized value size = 6, headers = RecordHeaders(headers = [], > isReadOnly = false), key = null, value = p229-4)_ > _}}}_ > __ > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > _{{{_ > _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing > append operation on partition t229-0 (kafka.server.ReplicaManager)_ > _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 > for partition t229-0_ > _}}}_ > > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > _{{{_ > _java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > t229-0:12 ms_ > _has passed since batch creation_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_ > _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > record(s) for t229-0:12 ms has passed since batch creation_ > _}}}_ > > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer retries sending
[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL
[ https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17034540#comment-17034540 ] Stanislav Kozlovski commented on KAFKA-9280: [~vikram484] this shouldn't happen because the leader waits for a second fetch request that proves that the follower has that offset. e.g high watermark is 1000. leader and follower are both at 1000. The follower dies but managed to send a fetch request - FetchRequest\{from=1000} in flight. Meanwhile the producer produces offset 1001 with acks=all. The leader will not acknowledge that produce request until all in-sync followers issue a FetchRequest with a `from` value of at least 1001. Does that make sense? > Duplicate messages are observed in ACK mode ALL > --- > > Key: KAFKA-9280 > URL: https://issues.apache.org/jira/browse/KAFKA-9280 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.1 >Reporter: VIkram >Priority: Major > > In ack mode ALL, leader is sending the message to consumer even before > receiving the acknowledgements from other replicas. This can lead to > *+duplicate messages+*. > > Setup details: > * 1 zookeeper, 5 brokers > * Producer: Synchronous > * Topic: 1 partition, replication factor - 3, min isr - 2 > > Say First replica (Leader), Second replica and Third replica are the three > replicas of the topic. > > *Sequence of events:* > a) All brokers are up and running. > b) Clients started running. > c) Kill second replica of the topic. > d) Kill the third replica. Now min isr will not be satisfied. > e) Bring up third replica. Min isr will be satisfied. > > *Breakdown of step 'd':* > # Just before producer sends next message, killed third replica with kill -9 > (Leader takes time ~5sec to detect that the broker is down). > # Producer sent a message to leader. > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer retries sending the same message. (In our application it is > the next integer we send). > # Now when second/third replica is up, leader accepts the message and sends > the same message to consumer. *Thus sending duplicates.* > > > *Logs:* > # 2-3 seconds before producer sends next message, killed third replica with > kill -9 (Leader takes time ~5sec to detect that the broker is down). > _{{{_ > _> kill -9 49596_ > _}}}_ > __ > # Producer sent a message to leader. > _{{{_ > _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: > ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = > [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_ > _}}}_ > > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > _{{{_ > _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, > leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size > = -1, serialized value size = 6, headers = RecordHeaders(headers = [], > isReadOnly = false), key = null, value = p229-4)_ > _}}}_ > __ > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > _{{{_ > _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing > append operation on partition t229-0 (kafka.server.ReplicaManager)_ > _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 > for partition t229-0_ > _}}}_ > > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > _{{{_ > _java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > t229-0:12 ms_ > _has passed since batch creation_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_ > _at >
[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL
[ https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17028811#comment-17028811 ] VIkram commented on KAFKA-9280: --- [~bchen225242] Any update on this. I still face this issue > Duplicate messages are observed in ACK mode ALL > --- > > Key: KAFKA-9280 > URL: https://issues.apache.org/jira/browse/KAFKA-9280 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.1 >Reporter: VIkram >Priority: Major > > In ack mode ALL, leader is sending the message to consumer even before > receiving the acknowledgements from other replicas. This can lead to > *+duplicate messages+*. > > Setup details: > * 1 zookeeper, 5 brokers > * Producer: Synchronous > * Topic: 1 partition, replication factor - 3, min isr - 2 > > Say First replica (Leader), Second replica and Third replica are the three > replicas of the topic. > > *Sequence of events:* > a) All brokers are up and running. > b) Clients started running. > c) Kill second replica of the topic. > d) Kill the third replica. Now min isr will not be satisfied. > e) Bring up third replica. Min isr will be satisfied. > > *Breakdown of step 'd':* > # Just before producer sends next message, killed third replica with kill -9 > (Leader takes time ~5sec to detect that the broker is down). > # Producer sent a message to leader. > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer retries sending the same message. (In our application it is > the next integer we send). > # Now when second/third replica is up, leader accepts the message and sends > the same message to consumer. *Thus sending duplicates.* > > > *Logs:* > # 2-3 seconds before producer sends next message, killed third replica with > kill -9 (Leader takes time ~5sec to detect that the broker is down). > _{{{_ > _> kill -9 49596_ > _}}}_ > __ > # Producer sent a message to leader. > _{{{_ > _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: > ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = > [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_ > _}}}_ > > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > _{{{_ > _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, > leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size > = -1, serialized value size = 6, headers = RecordHeaders(headers = [], > isReadOnly = false), key = null, value = p229-4)_ > _}}}_ > __ > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > _{{{_ > _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing > append operation on partition t229-0 (kafka.server.ReplicaManager)_ > _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 > for partition t229-0_ > _}}}_ > > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > _{{{_ > _java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > t229-0:12 ms_ > _has passed since batch creation_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_ > _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > record(s) for t229-0:12 ms has passed since batch creation_ > _}}}_ > > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer retries
[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL
[ https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991419#comment-16991419 ] VIkram commented on KAFKA-9280: --- Let's say high watermark for topic partition is 1000 and leader, follower replicas have same messages exactly. In this scenario, producer sends a message to leader and other replicas and consumer sends a fetch request to leader. Is there a possibility here, where a consumer fetch request will be served before other replicas fetch request? > Duplicate messages are observed in ACK mode ALL > --- > > Key: KAFKA-9280 > URL: https://issues.apache.org/jira/browse/KAFKA-9280 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.1 >Reporter: VIkram >Priority: Major > > In ack mode ALL, leader is sending the message to consumer even before > receiving the acknowledgements from other replicas. This can lead to > *+duplicate messages+*. > > Setup details: > * 1 zookeeper, 5 brokers > * Producer: Synchronous > * Topic: 1 partition, replication factor - 3, min isr - 2 > > Say First replica (Leader), Second replica and Third replica are the three > replicas of the topic. > > *Sequence of events:* > a) All brokers are up and running. > b) Clients started running. > c) Kill second replica of the topic. > d) Kill the third replica. Now min isr will not be satisfied. > e) Bring up third replica. Min isr will be satisfied. > > *Breakdown of step 'd':* > # Just before producer sends next message, killed third replica with kill -9 > (Leader takes time ~5sec to detect that the broker is down). > # Producer sent a message to leader. > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer retries sending the same message. (In our application it is > the next integer we send). > # Now when second/third replica is up, leader accepts the message and sends > the same message to consumer. *Thus sending duplicates.* > > > *Logs:* > # 2-3 seconds before producer sends next message, killed third replica with > kill -9 (Leader takes time ~5sec to detect that the broker is down). > _{{{_ > _> kill -9 49596_ > _}}}_ > __ > # Producer sent a message to leader. > _{{{_ > _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: > ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = > [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_ > _}}}_ > > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > _{{{_ > _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, > leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size > = -1, serialized value size = 6, headers = RecordHeaders(headers = [], > isReadOnly = false), key = null, value = p229-4)_ > _}}}_ > __ > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > _{{{_ > _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing > append operation on partition t229-0 (kafka.server.ReplicaManager)_ > _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 > for partition t229-0_ > _}}}_ > > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > _{{{_ > _java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > t229-0:12 ms_ > _has passed since batch creation_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_ > _Caused by:
[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL
[ https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16989904#comment-16989904 ] Boyang Chen commented on KAFKA-9280: I think the log for consumer is missing. There is no evidence for `Before receiving ACK from third replica, leader sent the message to consumer.` Also to be precise, leader doesn't send anything to either replica or consumer. It waits for the fetch requests from other parties to advance the high watermark. > Duplicate messages are observed in ACK mode ALL > --- > > Key: KAFKA-9280 > URL: https://issues.apache.org/jira/browse/KAFKA-9280 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.1 >Reporter: VIkram >Priority: Major > > In ack mode ALL, leader is sending the message to consumer even before > receiving the acknowledgements from other replicas. This can lead to > *+duplicate messages+*. > > Setup details: > * 1 zookeeper, 5 brokers > * Producer: Synchronous > * Topic: 1 partition, replication factor - 3, min isr - 2 > > Say First replica (Leader), Second replica and Third replica are the three > replicas of the topic. > > *Sequence of events:* > a) All brokers are up and running. > b) Clients started running. > c) Kill second replica of the topic. > d) Kill the third replica. Now min isr will not be satisfied. > e) Bring up third replica. Min isr will be satisfied. > > *Breakdown of step 'd':* > # Just before producer sends next message, killed third replica with kill -9 > (Leader takes time ~5sec to detect that the broker is down). > # Producer sent a message to leader. > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > # So far, producer believes that the message was not received by leader > whereas the consumer actually received it. > # Now producer retries sending the same message. (In our application it is > the next integer we send). > # Now when second/third replica is up, leader accepts the message and sends > the same message to consumer. *Thus sending duplicates.* > > > *Logs:* > # 2-3 seconds before producer sends next message, killed third replica with > kill -9 (Leader takes time ~5sec to detect that the broker is down). > _{{{_ > _> kill -9 49596_ > _}}}_ > __ > # Producer sent a message to leader. > _{{{_ > _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: > ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = > [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_ > _}}}_ > > # Before the leader knows that third replica is down, it accepts the message > from producer. > # Leader forwards the message to third replica. > # Before receiving ACK from third replica, leader sent the message to > consumer. > _{{{_ > _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, > leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size > = -1, serialized value size = 6, headers = RecordHeaders(headers = [], > isReadOnly = false), key = null, value = p229-4)_ > _}}}_ > __ > # Leader doesn't get an ACK from third replica. > # Now leader detects that third replica is down and throws > NOT_ENOUGH_REPLICAS_EXCEPTION. > _{{{_ > _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing > append operation on partition t229-0 (kafka.server.ReplicaManager)_ > _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 > for partition t229-0_ > _}}}_ > > # Now leader stops accepting messages from producer. > # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION > after timeout (2min in our case) . > _{{{_ > _java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > t229-0:12 ms_ > _has passed since batch creation_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_ > _at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_ > _Caused by: