[
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17034540#comment-17034540
]
Stanislav Kozlovski commented on KAFKA-9280:
--------------------------------------------
[~vikram484] this shouldn't happen because the leader waits for a second fetch
request that proves that the follower has that offset.
e.g high watermark is 1000. leader and follower are both at 1000. The follower
dies but managed to send a fetch request - FetchRequest\{from=1000} in flight.
Meanwhile the producer produces offset 1001 with acks=all. The leader will not
acknowledge that produce request until all in-sync followers issue a
FetchRequest with a `from` value of at least 1001.
Does that make sense?
> Duplicate messages are observed in ACK mode ALL
> -----------------------------------------------
>
> Key: KAFKA-9280
> URL: https://issues.apache.org/jira/browse/KAFKA-9280
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 2.2.1
> Reporter: VIkram
> Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before
> receiving the acknowledgements from other replicas. This can lead to
> *+duplicate messages+*.
>
> Setup details:
> * 1 zookeeper, 5 brokers
> * Producer: Synchronous
> * Topic: 1 partition, replication factor - 3, min isr - 2
>
> Say First replica (Leader), Second replica and Third replica are the three
> replicas of the topic.
>
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>
> *Breakdown of step 'd':*
> # Just before producer sends next message, killed third replica with kill -9
> (Leader takes time ~5sec to detect that the broker is down).
> # Producer sent a message to leader.
> # Before the leader knows that third replica is down, it accepts the message
> from producer.
> # Leader forwards the message to third replica.
> # Before receiving ACK from third replica, leader sent the message to
> consumer.
> # Leader doesn't get an ACK from third replica.
> # Now leader detects that third replica is down and throws
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> # Now leader stops accepting messages from producer.
> # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION
> after timeout (2min in our case) .
> # So far, producer believes that the message was not received by leader
> whereas the consumer actually received it.
> # Now producer retries sending the same message. (In our application it is
> the next integer we send).
> # Now when second/third replica is up, leader accepts the message and sends
> the same message to consumer. *Thus sending duplicates.*
>
>
> *Logs:*
> # 2-3 seconds before producer sends next message, killed third replica with
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
> __
> # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message:
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers =
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>
> # Before the leader knows that third replica is down, it accepts the message
> from producer.
> # Leader forwards the message to third replica.
> # Before receiving ACK from third replica, leader sent the message to
> consumer.
> _{{{_
> _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0,
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [],
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
> __
> # Leader doesn't get an ACK from third replica.
> # Now leader detects that third replica is down and throws
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2
> for partition t229-0_
> _}}}_
>
> # Now leader stops accepting messages from producer.
> # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION
> after timeout (2min in our case) .
> _{{{_
> _java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
> t229-0:120000 ms_
> _has passed since batch creation_
> _at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
> _at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_
> _at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_
> _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
> record(s) for t229-0:120000 ms has passed since batch creation_
> _}}}_
>
> # So far, producer believes that the message was not received by leader
> whereas the consumer actually received it.
> # Now producer retries sending the same message. (In our application it is
> the next integer we send).
> # Now when second/third replica is up, leader accepts the message and sends
> the same to consumer. Thus sending duplicates.
>
> Ideally, in ACK mode all it is expected that leader sends message to consumer
> only after it receives ack from all other replicas. But this is not happening.
>
> +*Question*+
> 1) In ack =all case, Does leader send message to consumer only after all
> in-sync followers receive the message?
> (or)
> will it send message to consumer and then wait for followers acknowledgement?
>
> +*Observation*+
> For a topic with replication factor > 1, We did a test to measure round trip
> time (client1 -> kafka -> client2 -> kafka -> client1) of messages with both
> acks = 1 and acks = all , and observed latency to be same in both cases. Is
> this expected?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)