Repository: kafka Updated Branches: refs/heads/trunk a64fe2ed8 -> 0cf770800
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 067fc27..cc65003 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -484,7 +484,7 @@ public class RequestResponseTest { @Test public void testControlledShutdownResponse() { ControlledShutdownResponse response = createControlledShutdownResponse(); - short version = ApiKeys.CONTROLLED_SHUTDOWN_KEY.latestVersion(); + short version = ApiKeys.CONTROLLED_SHUTDOWN.latestVersion(); Struct struct = response.toStruct(version); ByteBuffer buffer = toBuffer(struct); ControlledShutdownResponse deserialized = ControlledShutdownResponse.parse(buffer, version); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index d41d61a..c59f2c9 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -60,6 +60,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import javax.security.auth.Subject; +import javax.security.auth.login.Configuration; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -74,9 +76,6 @@ import java.util.List; import java.util.Map; import java.util.Random; -import javax.security.auth.Subject; -import javax.security.auth.login.Configuration; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -953,7 +952,7 @@ public class SaslAuthenticatorTest { @Override protected ApiVersionsResponse apiVersionsResponse() { - List<ApiVersion> apiVersions = new ArrayList<>(ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions()); + List<ApiVersion> apiVersions = new ArrayList<>(ApiVersionsResponse.defaultApiVersionsResponse().apiVersions()); for (Iterator<ApiVersion> it = apiVersions.iterator(); it.hasNext(); ) { ApiVersion apiVersion = it.next(); if (apiVersion.apiKey == ApiKeys.SASL_AUTHENTICATE.id) { http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 6976f7c..58e5543 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -242,7 +242,7 @@ class RequestSendThread(val controllerId: Int, if (clientResponse != null) { val requestHeader = clientResponse.requestHeader val api = requestHeader.apiKey - if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA_KEY) + if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA) throw new KafkaException(s"Unexpected apiKey received: $apiKey") val response = clientResponse.responseBody @@ -455,7 +455,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge } updateMetadataRequestBrokerSet.foreach { broker => - controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, updateMetadataRequest, null) + controller.sendRequest(broker, ApiKeys.UPDATE_METADATA, updateMetadataRequest, null) } updateMetadataRequestBrokerSet.clear() updateMetadataRequestPartitionInfoMap.clear() http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index c2ebbad..aba0249 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -36,7 +36,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.types.Type._ -import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct} +import org.apache.kafka.common.protocol.types._ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala index efd315b..2c7178e 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala @@ -20,7 +20,7 @@ import kafka.common.{KafkaException, MessageFormatter} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.types.Type._ -import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct} +import org.apache.kafka.common.protocol.types._ import java.io.PrintStream import java.nio.ByteBuffer import java.nio.charset.StandardCharsets http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/main/scala/kafka/log/ProducerStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 4c3d1a1..69d4e36 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -17,7 +17,7 @@ package kafka.log import java.io._ -import java.nio.{BufferUnderflowException, ByteBuffer} +import java.nio.ByteBuffer import java.nio.file.Files import kafka.common.KafkaException http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 2d6370a..1128fd3 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -28,7 +28,7 @@ import kafka.server.QuotaId import kafka.utils.{Logging, NotNothing} import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.network.Send -import org.apache.kafka.common.protocol.{ApiKeys, Protocol} +import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Time @@ -71,7 +71,7 @@ object RequestChannel extends Logging { //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. - if (!Protocol.requiresDelayedDeallocation(header.apiKey.id)) { + if (!header.apiKey.requiresDelayedAllocation) { releaseBuffer() } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 62e8abf..e07e689 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -102,8 +102,8 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.METADATA => handleTopicMetadataRequest(request) case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request) - case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request) - case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request) + case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request) + case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request) case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request) case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request) case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request) @@ -1126,7 +1126,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListGroupsRequest(request: RequestChannel.Request) { if (!authorize(request.session, Describe, Resource.ClusterResource)) { sendResponseMaybeThrottle(request, requestThrottleMs => - ListGroupsResponse.fromError(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED)) + request.body[ListGroupsRequest].getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { val (error, groups) = groupCoordinator.handleListGroups() val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index dcd2038..7aeffb5 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -117,7 +117,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.OFFSET_COMMIT -> classOf[requests.OffsetCommitResponse], ApiKeys.OFFSET_FETCH -> classOf[requests.OffsetFetchResponse], ApiKeys.FIND_COORDINATOR -> classOf[FindCoordinatorResponse], - ApiKeys.UPDATE_METADATA_KEY -> classOf[requests.UpdateMetadataResponse], + ApiKeys.UPDATE_METADATA -> classOf[requests.UpdateMetadataResponse], ApiKeys.JOIN_GROUP -> classOf[JoinGroupResponse], ApiKeys.SYNC_GROUP -> classOf[SyncGroupResponse], ApiKeys.DESCRIBE_GROUPS -> classOf[DescribeGroupsResponse], @@ -125,7 +125,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.LEAVE_GROUP -> classOf[LeaveGroupResponse], ApiKeys.LEADER_AND_ISR -> classOf[requests.LeaderAndIsrResponse], ApiKeys.STOP_REPLICA -> classOf[requests.StopReplicaResponse], - ApiKeys.CONTROLLED_SHUTDOWN_KEY -> classOf[requests.ControlledShutdownResponse], + ApiKeys.CONTROLLED_SHUTDOWN -> classOf[requests.ControlledShutdownResponse], ApiKeys.CREATE_TOPICS -> classOf[CreateTopicsResponse], ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse], ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse], @@ -152,7 +152,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData.asScala.find(_._1 == tp).get._2), ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error), ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => resp.error), - ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.error), + ApiKeys.UPDATE_METADATA -> ((resp: requests.UpdateMetadataResponse) => resp.error), ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error), ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error), ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => resp.groups.get(group).error), @@ -160,7 +160,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error), ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1 == tp).get._2), ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses.asScala.find(_._1 == tp).get._2), - ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ((resp: requests.ControlledShutdownResponse) => resp.error), + ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: requests.ControlledShutdownResponse) => resp.error), ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors.asScala.find(_._1 == createTopic).get._2.error), ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors.asScala.find(_._1 == deleteTopic).get._2), ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.get(tp).error), @@ -190,7 +190,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.OFFSET_COMMIT -> (topicReadAcl ++ groupReadAcl), ApiKeys.OFFSET_FETCH -> (topicReadAcl ++ groupDescribeAcl), ApiKeys.FIND_COORDINATOR -> (topicReadAcl ++ groupDescribeAcl ++ transactionalIdDescribeAcl), - ApiKeys.UPDATE_METADATA_KEY -> clusterAcl, + ApiKeys.UPDATE_METADATA -> clusterAcl, ApiKeys.JOIN_GROUP -> groupReadAcl, ApiKeys.SYNC_GROUP -> groupReadAcl, ApiKeys.DESCRIBE_GROUPS -> groupDescribeAcl, @@ -198,7 +198,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.LEAVE_GROUP -> groupReadAcl, ApiKeys.LEADER_AND_ISR -> clusterAcl, ApiKeys.STOP_REPLICA -> clusterAcl, - ApiKeys.CONTROLLED_SHUTDOWN_KEY -> clusterAcl, + ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl, ApiKeys.CREATE_TOPICS -> clusterCreateAcl, ApiKeys.DELETE_TOPICS -> topicDeleteAcl, ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl, @@ -293,7 +293,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId, Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol, ListenerName.forSecurityProtocol(securityProtocol))).asJava, null)).asJava - val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion + val version = ApiKeys.UPDATE_METADATA.latestVersion new requests.UpdateMetadataRequest.Builder(version, brokerId, Int.MaxValue, partitionState, brokers).build() } @@ -373,7 +373,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest, ApiKeys.FIND_COORDINATOR -> createFindCoordinatorRequest, - ApiKeys.UPDATE_METADATA_KEY -> createUpdateMetadataRequest, + ApiKeys.UPDATE_METADATA -> createUpdateMetadataRequest, ApiKeys.JOIN_GROUP -> createJoinGroupRequest, ApiKeys.SYNC_GROUP -> createSyncGroupRequest, ApiKeys.DESCRIBE_GROUPS -> createDescribeGroupsRequest, @@ -382,7 +382,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.LEAVE_GROUP -> leaveGroupRequest, ApiKeys.LEADER_AND_ISR -> leaderAndIsrRequest, ApiKeys.STOP_REPLICA -> stopReplicaRequest, - ApiKeys.CONTROLLED_SHUTDOWN_KEY -> controlledShutdownRequest, + ApiKeys.CONTROLLED_SHUTDOWN -> controlledShutdownRequest, ApiKeys.CREATE_TOPICS -> createTopicsRequest, ApiKeys.DELETE_TOPICS -> deleteTopicsRequest, ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest, http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala index c52020c..83f7111 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala @@ -28,7 +28,7 @@ import scala.collection.JavaConverters._ object ApiVersionsRequestTest { def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse) { assertEquals("API keys in ApiVersionsResponse must match API keys supported by broker.", ApiKeys.values.length, apiVersionsResponse.apiVersions.size) - for (expectedApiVersion: ApiVersion <- ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions.asScala) { + for (expectedApiVersion: ApiVersion <- ApiVersionsResponse.defaultApiVersionsResponse().apiVersions.asScala) { val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey) assertNotNull(s"API key ${actualApiVersion.apiKey} is supported by broker, but not received in ApiVersionsResponse.", actualApiVersion) assertEquals("API key must be supported by the broker.", expectedApiVersion.apiKey, actualApiVersion.apiKey) http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala index 262686a..3ba7cd5 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala @@ -18,7 +18,7 @@ package kafka.server import org.apache.kafka.common.requests.ApiVersionsResponse -import org.apache.kafka.common.protocol.{ApiKeys, Protocol} +import org.apache.kafka.common.protocol.ApiKeys import org.junit.Assert._ import org.junit.Test @@ -26,25 +26,25 @@ class ApiVersionsTest { @Test def testApiVersions(): Unit = { - val apiVersions = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions + val apiVersions = ApiVersionsResponse.defaultApiVersionsResponse().apiVersions assertEquals("API versions for all API keys must be maintained.", apiVersions.size, ApiKeys.values().length) for (key <- ApiKeys.values) { - val version = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersion(key.id) + val version = ApiVersionsResponse.defaultApiVersionsResponse().apiVersion(key.id) assertNotNull(s"Could not find ApiVersion for API ${key.name}", version) assertEquals(s"Incorrect min version for Api ${key.name}.", version.minVersion, key.oldestVersion) assertEquals(s"Incorrect max version for Api ${key.name}.", version.maxVersion, key.latestVersion) // Check if versions less than min version are indeed set as null, i.e., deprecated. for (i <- 0 until version.minVersion) { - assertNull(s"Request version $i for API ${version.apiKey} must be null.", Protocol.REQUESTS(version.apiKey)(i)) - assertNull(s"Response version $i for API ${version.apiKey} must be null.", Protocol.RESPONSES(version.apiKey)(i)) + assertNull(s"Request version $i for API ${version.apiKey} must be null.", key.requestSchemas(i)) + assertNull(s"Response version $i for API ${version.apiKey} must be null.", key.responseSchemas(i)) } // Check if versions between min and max versions are non null, i.e., valid. for (i <- version.minVersion.toInt to version.maxVersion) { - assertNotNull(s"Request version $i for API ${version.apiKey} must not be null.", Protocol.REQUESTS(version.apiKey)(i)) - assertNotNull(s"Response version $i for API ${version.apiKey} must not be null.", Protocol.RESPONSES(version.apiKey)(i)) + assertNotNull(s"Request version $i for API ${version.apiKey} must not be null.", key.requestSchemas(i)) + assertNotNull(s"Response version $i for API ${version.apiKey} must not be null.", key.responseSchemas(i)) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index de3ccdb..6b3c6c0 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -69,7 +69,7 @@ class MetadataCacheTest { new TopicPartition(topic0, 1) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, 1, 1, asList(1, 0), zkVersion, asList(1, 2, 0, 4), asList()), new TopicPartition(topic1, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, 2, 2, asList(2, 1), zkVersion, asList(2, 1, 3), asList())) - val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion + val version = ApiKeys.UPDATE_METADATA.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) @@ -127,7 +127,7 @@ class MetadataCacheTest { val partitionStates = Map( new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asList(0), asList())) - val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion + val version = ApiKeys.UPDATE_METADATA.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) @@ -171,7 +171,7 @@ class MetadataCacheTest { val partitionStates = Map( new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas, asList())) - val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion + val version = ApiKeys.UPDATE_METADATA.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) @@ -231,7 +231,7 @@ class MetadataCacheTest { val partitionStates = Map( new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas, asList())) - val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion + val version = ApiKeys.UPDATE_METADATA.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) @@ -283,7 +283,7 @@ class MetadataCacheTest { val isr = asList[Integer](0, 1) val partitionStates = Map( new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas, asList())) - val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion + val version = ApiKeys.UPDATE_METADATA.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) @@ -316,7 +316,7 @@ class MetadataCacheTest { val isr = asList[Integer](0, 1) val partitionStates = Map( new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas, asList())) - val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion + val version = ApiKeys.UPDATE_METADATA.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index bb9f82e..1a3b5c2 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -187,16 +187,16 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.STOP_REPLICA => new StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava) - case ApiKeys.UPDATE_METADATA_KEY => + case ApiKeys.UPDATE_METADATA => val partitionState = Map(tp -> new UpdateMetadataRequest.PartitionState( Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, Seq.empty[Integer].asJava)).asJava val securityProtocol = SecurityProtocol.PLAINTEXT val brokers = Set(new UpdateMetadataRequest.Broker(brokerId, Seq(new UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol, ListenerName.forSecurityProtocol(securityProtocol))).asJava, null)).asJava - new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA_KEY.latestVersion, brokerId, Int.MaxValue, partitionState, brokers) + new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, brokerId, Int.MaxValue, partitionState, brokers) - case ApiKeys.CONTROLLED_SHUTDOWN_KEY => + case ApiKeys.CONTROLLED_SHUTDOWN => new ControlledShutdownRequest.Builder(brokerId) case ApiKeys.OFFSET_COMMIT =>