[ https://issues.apache.org/jira/browse/KAFKA-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375968#comment-16375968 ]
ASF GitHub Bot commented on KAFKA-3427: --------------------------------------- hachikuji closed pull request #1128: KAFKA-3427 - Broker should return correct version of FetchResponse on exception URL: https://github.com/apache/kafka/pull/1128 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 04ca157717b..191c6274929 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -148,7 +148,9 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, case (topicAndPartition, data) => (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty)) } - val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData) + + val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] + val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, fetchRequest.versionId) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse))) } diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index ce78f78168d..0c7510e82a7 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -128,7 +128,8 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, } override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) { + val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] + if(produceRequest.requiredAcks == 0) { requestChannel.closeConnection(request.processor, request) } else { @@ -136,7 +137,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, case (topicAndPartition, data) => (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) } - val errorResponse = ProducerResponse(correlationId, producerResponseStatus) + val errorResponse = ProducerResponse(correlationId, producerResponseStatus, produceRequest.versionId) requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > broker can return incorrect version of fetch response when the broker hits an > unknown exception > ----------------------------------------------------------------------------------------------- > > Key: KAFKA-3427 > URL: https://issues.apache.org/jira/browse/KAFKA-3427 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.9.0.1, 0.10.0.0 > Reporter: Jun Rao > Assignee: Jun Rao > Priority: Blocker > Fix For: 0.10.0.0 > > > In FetchResponse.handleError(), we generate FetchResponse like the following, > which always defaults to version 0 of the response. > FetchResponse(correlationId, fetchResponsePartitionData) -- This message was sent by Atlassian JIRA (v7.6.3#76005)