[ 
https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-353:
-----------------------------

    Attachment: kafka-353_v1.patch

Patch is not too big, but a bit tricky. To help with review, here is an
overview:

- DelayedProduce:
  - Contains the logic to determine if it can be unblocked or not. The
    partitionStatus map is necessary to keep track of the local log's
    offset, error status, and whether acks are still pending.  The comments
    in the code should make the remaining logic clear.
  - Handling delayed producer requests brings in dependencies on the fetch
    request purgatory, replica manager and KafkaZookeeper - which is why I
    had to pass in kafkaApis to the DelayedProduce which is a bit ugly.
- KafkaApis:
  - The existing code for handling produce requests would respond when the
    leader persists the data to disk. We still do that if requiredAcks is 0
    or 1. For other values, we now create a DelayedProduce request which
    will be satisfied when requiredAcks followers are caught up.  If the
    |ISR| < requiredAcks then the request will time out.
  - handleFetchRequest: if a request comes from a follower replica, check
    the produceRequestPurgatory and see if DelayedProduce requests can be
    unblocked.
- ReplicaManager
  - only change is to try and unblock DelayedProduce requests to partitions
    for which the leader changed.

Note that even if a request times out, some of the partitions may have been
successfully acked - so even if one partition times out the global error
code is NoError. The receiver must check the errors array to determine if
there are any failures. I think this brings in the need for a
"PartialSuccess" global error code in the ProducerResponse. Thoughts on
this?

I think there was a bug in checking satisfied DelayedFetchRequests: the
checkSatisfied method would take a produceRequest's TopicData and see if the
total number of bytes in that produce request could satisfy the remaining
bytes to be filled in the DelayedFetchRequest. However, that could count
data for partitions that were not part of the DelayedFetchRequest.  This
patch fixes that issue as well - changed the FetchRequestPurgatory key from
TopicData to PartitionData and check on a per-partition basis.

Another potential issue is that the DelayedFetchRequest satisfied counts
using MessageSet's sizeInBytes, which could include incomplete messages - as
opposed to iterating over the valid messages and getting the size. I left
that as is. I think it is debatable which approach is correct in this case.

I added some trace logs at individual request level - e.g., "Produce request
to topic unblocked n delayedfetchrequests". These would be more useful if we
add a uuid to each produce request - I think this idea was tossed around on
the mailing list sometime before. Doesn't even have to be uuid or part of
the produceRequest's wire format - even an atomicLong counter (internal to
the broker) may be helpful- thoughts?

There is this corner case that I think is handled correctly but want to make
sure:
- leader receives a producer request and adds it to the
  ProduceRequestPurgatory.
- leadership changes while it is pending, so the error code for that
  partition is set to NotLeaderErrorForPartitionCode
- leadership changes back to this broker while the DelayedProduce is
  pending.
- In this scenario, the partition remains in the error state.
- I think it is correct because the leader would have become a follower
  (before it became a leader again), and would have truncated its log to the
  intermediate leader's HW.

If requiredAcks == |ISR| and the |ISR| shrinks while the DelayedProduce is
pending, the request may timeout. However, if the |ISR| expands back to its
original size while it is still pending it will get satisfied.

Let me know if you can think of other corner cases that need to be
considered - I wouldn't be surprised if there are quite a few.

I only did limited testing with the ProducerTest.

I think this opens up the following future work (separate jiras):
- Enhance system test to test all corner cases (leader changing while
  request pending, ISR shrinking while request pending, etc.
- ProducerResponse currently uses two separate arrays for errors and
  offsets; and ProducerRequest uses as array of TopicData each of which
  contains an array of PartitionData. It may be a good idea to improve these
  classes to use maps/something else as I had to resort to using find and
  indexOf to locate partition-level data in the original request.
- We should add some mbeans for request purgatory stats - avg. hold time,
  outstanding requests, etc.
- We should try and get rid of sleeps and fix all intermittent test
  failures.

If the above list sounds good I'll file the jiras.

                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to