[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL

2020-02-19 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17040343#comment-17040343
 ] 

Stanislav Kozlovski commented on KAFKA-9280:


The example I gave was a fetch request from a follower.

If the consumer were to send a fetch request with that offset before all 
followers' requests were processed, the consumer would likely get nothing 
because the high watermark offset wouldn't have been raised by the leader

> Duplicate messages are observed in ACK mode ALL
> ---
>
> Key: KAFKA-9280
> URL: https://issues.apache.org/jira/browse/KAFKA-9280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: VIkram
>Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before 
> receiving the acknowledgements from other replicas. This can lead to 
> *+duplicate messages+*.
>  
> Setup details:
>  * 1 zookeeper, 5 brokers
>  * Producer: Synchronous
>  * Topic: 1 partition, replication factor - 3, min isr - 2
>  
> Say First replica (Leader), Second replica and Third replica are the three 
> replicas of the topic.
>  
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>  
> *Breakdown of step 'd':*
>  # Just before producer sends next message, killed third replica with kill -9 
> (Leader takes time ~5sec to detect that the broker is down).
>  # Producer sent a message to leader.
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same message to consumer. *Thus sending duplicates.*
>  
>  
> *Logs:*
>  # 2-3 seconds before producer sends next message, killed third replica with 
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
>  __ 
>  # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = 
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>  
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
> _{{{_
>  _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size 
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [], 
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
>  __ 
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition t229-0_
> _}}}_
>  
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
> _{{{_
>  _java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> t229-0:12 ms_
> _has passed since batch creation_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_
> _Caused by: 

[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL

2020-02-13 Thread VIkram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036125#comment-17036125
 ] 

VIkram commented on KAFKA-9280:
---

Will the consumer FetchRequest\{from=1000} gets served?

> Duplicate messages are observed in ACK mode ALL
> ---
>
> Key: KAFKA-9280
> URL: https://issues.apache.org/jira/browse/KAFKA-9280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: VIkram
>Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before 
> receiving the acknowledgements from other replicas. This can lead to 
> *+duplicate messages+*.
>  
> Setup details:
>  * 1 zookeeper, 5 brokers
>  * Producer: Synchronous
>  * Topic: 1 partition, replication factor - 3, min isr - 2
>  
> Say First replica (Leader), Second replica and Third replica are the three 
> replicas of the topic.
>  
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>  
> *Breakdown of step 'd':*
>  # Just before producer sends next message, killed third replica with kill -9 
> (Leader takes time ~5sec to detect that the broker is down).
>  # Producer sent a message to leader.
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same message to consumer. *Thus sending duplicates.*
>  
>  
> *Logs:*
>  # 2-3 seconds before producer sends next message, killed third replica with 
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
>  __ 
>  # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = 
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>  
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
> _{{{_
>  _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size 
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [], 
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
>  __ 
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition t229-0_
> _}}}_
>  
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
> _{{{_
>  _java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> t229-0:12 ms_
> _has passed since batch creation_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_
> _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
> record(s) for t229-0:12 ms has passed since batch creation_
> _}}}_
>  
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending 

[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL

2020-02-11 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17034540#comment-17034540
 ] 

Stanislav Kozlovski commented on KAFKA-9280:


[~vikram484] this shouldn't happen because the leader waits for a second fetch 
request that proves that the follower has that offset.

e.g high watermark is 1000. leader and follower are both at 1000. The follower 
dies but managed to send a fetch request - FetchRequest\{from=1000} in flight. 
Meanwhile the producer produces offset 1001 with acks=all. The leader will not 
acknowledge that produce request until all in-sync followers issue a 
FetchRequest with a `from` value of at least 1001.

Does that make sense?

> Duplicate messages are observed in ACK mode ALL
> ---
>
> Key: KAFKA-9280
> URL: https://issues.apache.org/jira/browse/KAFKA-9280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: VIkram
>Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before 
> receiving the acknowledgements from other replicas. This can lead to 
> *+duplicate messages+*.
>  
> Setup details:
>  * 1 zookeeper, 5 brokers
>  * Producer: Synchronous
>  * Topic: 1 partition, replication factor - 3, min isr - 2
>  
> Say First replica (Leader), Second replica and Third replica are the three 
> replicas of the topic.
>  
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>  
> *Breakdown of step 'd':*
>  # Just before producer sends next message, killed third replica with kill -9 
> (Leader takes time ~5sec to detect that the broker is down).
>  # Producer sent a message to leader.
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same message to consumer. *Thus sending duplicates.*
>  
>  
> *Logs:*
>  # 2-3 seconds before producer sends next message, killed third replica with 
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
>  __ 
>  # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = 
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>  
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
> _{{{_
>  _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size 
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [], 
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
>  __ 
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition t229-0_
> _}}}_
>  
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
> _{{{_
>  _java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> t229-0:12 ms_
> _has passed since batch creation_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
>     _at 
> 

[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL

2020-02-03 Thread VIkram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17028811#comment-17028811
 ] 

VIkram commented on KAFKA-9280:
---

[~bchen225242]  Any update on this. I still face this issue

> Duplicate messages are observed in ACK mode ALL
> ---
>
> Key: KAFKA-9280
> URL: https://issues.apache.org/jira/browse/KAFKA-9280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: VIkram
>Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before 
> receiving the acknowledgements from other replicas. This can lead to 
> *+duplicate messages+*.
>  
> Setup details:
>  * 1 zookeeper, 5 brokers
>  * Producer: Synchronous
>  * Topic: 1 partition, replication factor - 3, min isr - 2
>  
> Say First replica (Leader), Second replica and Third replica are the three 
> replicas of the topic.
>  
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>  
> *Breakdown of step 'd':*
>  # Just before producer sends next message, killed third replica with kill -9 
> (Leader takes time ~5sec to detect that the broker is down).
>  # Producer sent a message to leader.
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same message to consumer. *Thus sending duplicates.*
>  
>  
> *Logs:*
>  # 2-3 seconds before producer sends next message, killed third replica with 
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
>  __ 
>  # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = 
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>  
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
> _{{{_
>  _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size 
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [], 
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
>  __ 
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition t229-0_
> _}}}_
>  
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
> _{{{_
>  _java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> t229-0:12 ms_
> _has passed since batch creation_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_
> _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
> record(s) for t229-0:12 ms has passed since batch creation_
> _}}}_
>  
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries 

[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL

2019-12-09 Thread VIkram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991419#comment-16991419
 ] 

VIkram commented on KAFKA-9280:
---

Let's say high watermark for topic partition is 1000 and leader, follower 
replicas have same messages exactly. In this scenario, producer sends a message 
to leader and other replicas and consumer sends a fetch request to leader. Is 
there a possibility here, where a consumer fetch request will be served before 
other replicas fetch request? 

> Duplicate messages are observed in ACK mode ALL
> ---
>
> Key: KAFKA-9280
> URL: https://issues.apache.org/jira/browse/KAFKA-9280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: VIkram
>Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before 
> receiving the acknowledgements from other replicas. This can lead to 
> *+duplicate messages+*.
>  
> Setup details:
>  * 1 zookeeper, 5 brokers
>  * Producer: Synchronous
>  * Topic: 1 partition, replication factor - 3, min isr - 2
>  
> Say First replica (Leader), Second replica and Third replica are the three 
> replicas of the topic.
>  
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>  
> *Breakdown of step 'd':*
>  # Just before producer sends next message, killed third replica with kill -9 
> (Leader takes time ~5sec to detect that the broker is down).
>  # Producer sent a message to leader.
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same message to consumer. *Thus sending duplicates.*
>  
>  
> *Logs:*
>  # 2-3 seconds before producer sends next message, killed third replica with 
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
>  __ 
>  # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = 
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>  
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
> _{{{_
>  _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size 
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [], 
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
>  __ 
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition t229-0_
> _}}}_
>  
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
> _{{{_
>  _java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> t229-0:12 ms_
> _has passed since batch creation_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_
> _Caused by: 

[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL

2019-12-06 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16989904#comment-16989904
 ] 

Boyang Chen commented on KAFKA-9280:


I think the log for consumer is missing. There is no evidence for `Before 
receiving ACK from third replica, leader sent the message to consumer.`

Also to be precise, leader doesn't send anything to either replica or consumer. 
It waits for the fetch requests from other parties to advance the high 
watermark.

> Duplicate messages are observed in ACK mode ALL
> ---
>
> Key: KAFKA-9280
> URL: https://issues.apache.org/jira/browse/KAFKA-9280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: VIkram
>Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before 
> receiving the acknowledgements from other replicas. This can lead to 
> *+duplicate messages+*.
>  
> Setup details:
>  * 1 zookeeper, 5 brokers
>  * Producer: Synchronous
>  * Topic: 1 partition, replication factor - 3, min isr - 2
>  
> Say First replica (Leader), Second replica and Third replica are the three 
> replicas of the topic.
>  
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>  
> *Breakdown of step 'd':*
>  # Just before producer sends next message, killed third replica with kill -9 
> (Leader takes time ~5sec to detect that the broker is down).
>  # Producer sent a message to leader.
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same message to consumer. *Thus sending duplicates.*
>  
>  
> *Logs:*
>  # 2-3 seconds before producer sends next message, killed third replica with 
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
>  __ 
>  # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = 
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>  
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
> _{{{_
>  _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size 
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [], 
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
>  __ 
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition t229-0_
> _}}}_
>  
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
> _{{{_
>  _java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> t229-0:12 ms_
> _has passed since batch creation_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_
> _Caused by: