mimaison commented on code in PR #21873:
URL: https://github.com/apache/kafka/pull/21873#discussion_r3203137915
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/common/SocketServerMemoryPoolTest.java:
##########
@@ -50,42 +51,31 @@
})
public class SocketServerMemoryPoolTest {
@ClusterTest
- public void testProduceRequestWithUnsupportedVersion(ClusterInstance
clusterInstance) throws Exception {
- short unsupportedVersion = Short.MAX_VALUE;
- byte[] rawRequestBytes = buildRawRequest(
- ApiKeys.PRODUCE.id,
- unsupportedVersion,
- /* correlationId */ 1,
- /* clientId */ "test-unsupported-version",
- new byte[10000]
- );
+ public void testRequestWithUnsupportedVersion(ClusterInstance
clusterInstance) throws Exception {
+ RequestHeader header =
IntegrationTestUtils.nextRequestHeader(ApiKeys.PRODUCE, Short.MAX_VALUE);
+ ByteBuffer buffer = RequestUtils.serialize(header.data(),
header.headerVersion(), new ProduceRequestData(), header.apiVersion());
+ byte[] rawRequestBytes = buffer.array();
sendAndAssert(clusterInstance, rawRequestBytes);
}
@ClusterTest
- public void testProduceRequestWithCorruptBody(ClusterInstance
clusterInstance) throws Exception {
- short validVersion = 3;
- byte[] corruptBody = new byte[10000];
- for (int i = 0; i < corruptBody.length; i++) {
- corruptBody[i] = (byte) 0xFF; // The corrupt body (0xFF ... 0xFF)
makes Schema.read() throw SchemaException.
+ public void testRequestWithCorruptBody(ClusterInstance clusterInstance)
throws Exception {
+ RequestHeader header =
IntegrationTestUtils.nextRequestHeader(ApiKeys.PRODUCE,
ApiKeys.PRODUCE.latestVersion());
+ ByteBuffer buffer = RequestUtils.serialize(header.data(),
header.headerVersion(), new ProduceRequestData(), header.apiVersion());
+ byte[] rawRequestBytes = buffer.array();
+
+ // corrupt body but leave header valid
Review Comment:
Should we assert that `rawRequestBytes.length` is larger than
`header.size()` so that we effectively enter the `for` loop.
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/common/SocketServerMemoryPoolTest.java:
##########
@@ -96,48 +86,16 @@ private void sendAndAssert(ClusterInstance clusterInstance,
byte[] rawRequestByt
assertEquals(initialMemoryPoolAvailable, finalMemoryPoolAvailable);
}
- // This test uses reflection to read the SocketServer memoryPool
availableMemory.
- // The metric "MemoryPoolAvailable" from Yammer Metrics default registry
- // can be overwritten in a @ClusterTest as the registry is a singleton.
- long getMemoryPoolAvailable(ClusterInstance clusterInstance) throws
Exception {
- KafkaBroker broker =
clusterInstance.aliveBrokers().values().iterator().next();
- SocketServer socketServer = broker.socketServer();
- Field memoryPoolField =
socketServer.getClass().getDeclaredField("memoryPool");
- memoryPoolField.setAccessible(true);
- MemoryPool memoryPool = (MemoryPool) memoryPoolField.get(socketServer);
- return memoryPool.availableMemory();
+ private SocketServer getSocketServer(ClusterInstance clusterInstance) {
+ return
clusterInstance.brokers().get(TestKitDefaults.BROKER_ID_OFFSET).socketServer();
}
- /**
- * Builds a raw Kafka request excluding the frame length
- *
- * <p>Wire layout:
- * <pre>
- * 4 bytes – frame length (payload size, not including these 4 bytes)
- *
- * 2 bytes – api_key
- * 2 bytes – api_version
- * 4 bytes – correlation_id
- * 2 bytes – client_id string length
- * N bytes – client_id (UTF-8)
- * X bytes - request body
- * </pre>
- */
- private static byte[] buildRawRequest(short apiKey, short apiVersion, int
correlationId, String clientId, byte[] body) {
- byte[] clientIdBytes = clientId.getBytes(StandardCharsets.UTF_8);
-
- // Header: api_key(2) + api_version(2) + correlation_id(4) +
client_id_len(2) + client_id
- int headerSize = 2 + 2 + 4 + 2 + clientIdBytes.length;
- int payloadSize = headerSize + body.length;
+ private int getBrokerBoundPort(ClusterInstance clusterInstance) {
Review Comment:
If there's a good reason why we can't use `brokerBoundPorts()`, we have the
`boundPort()` method on `BrokerServer` to retrieve the port. That method uses
`socketServer` itself so we don't have to.
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/common/SocketServerMemoryPoolTest.java:
##########
@@ -50,42 +51,31 @@
})
public class SocketServerMemoryPoolTest {
@ClusterTest
- public void testProduceRequestWithUnsupportedVersion(ClusterInstance
clusterInstance) throws Exception {
- short unsupportedVersion = Short.MAX_VALUE;
- byte[] rawRequestBytes = buildRawRequest(
- ApiKeys.PRODUCE.id,
- unsupportedVersion,
- /* correlationId */ 1,
- /* clientId */ "test-unsupported-version",
- new byte[10000]
- );
+ public void testRequestWithUnsupportedVersion(ClusterInstance
clusterInstance) throws Exception {
+ RequestHeader header =
IntegrationTestUtils.nextRequestHeader(ApiKeys.PRODUCE, Short.MAX_VALUE);
+ ByteBuffer buffer = RequestUtils.serialize(header.data(),
header.headerVersion(), new ProduceRequestData(), header.apiVersion());
+ byte[] rawRequestBytes = buffer.array();
sendAndAssert(clusterInstance, rawRequestBytes);
}
@ClusterTest
- public void testProduceRequestWithCorruptBody(ClusterInstance
clusterInstance) throws Exception {
- short validVersion = 3;
- byte[] corruptBody = new byte[10000];
- for (int i = 0; i < corruptBody.length; i++) {
- corruptBody[i] = (byte) 0xFF; // The corrupt body (0xFF ... 0xFF)
makes Schema.read() throw SchemaException.
+ public void testRequestWithCorruptBody(ClusterInstance clusterInstance)
throws Exception {
+ RequestHeader header =
IntegrationTestUtils.nextRequestHeader(ApiKeys.PRODUCE,
ApiKeys.PRODUCE.latestVersion());
+ ByteBuffer buffer = RequestUtils.serialize(header.data(),
header.headerVersion(), new ProduceRequestData(), header.apiVersion());
+ byte[] rawRequestBytes = buffer.array();
+
+ // corrupt body but leave header valid
+ for (int i = header.size(); i < rawRequestBytes.length; i++) {
+ rawRequestBytes[i] = (byte) 0xFF;
}
-
- byte[] rawRequestBytes = buildRawRequest(
- ApiKeys.PRODUCE.id,
- validVersion,
- /* correlationId */ 2,
- /* clientId */ "test-corrupt-body",
- corruptBody
- );
-
sendAndAssert(clusterInstance, rawRequestBytes);
}
private void sendAndAssert(ClusterInstance clusterInstance, byte[]
rawRequestBytes) throws Exception {
long initialMemoryPoolAvailable =
getMemoryPoolAvailable(clusterInstance);
- try (Socket socket =
IntegrationTestUtils.connect(clusterInstance.brokerBoundPorts().get(0))) {
+ try (Socket socket =
IntegrationTestUtils.connect(getBrokerBoundPort(clusterInstance))) {
Review Comment:
Why can't we use `clusterInstance.brokerBoundPorts().get(0)` anymore?
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/common/SocketServerMemoryPoolTest.java:
##########
@@ -96,48 +86,16 @@ private void sendAndAssert(ClusterInstance clusterInstance,
byte[] rawRequestByt
assertEquals(initialMemoryPoolAvailable, finalMemoryPoolAvailable);
}
- // This test uses reflection to read the SocketServer memoryPool
availableMemory.
- // The metric "MemoryPoolAvailable" from Yammer Metrics default registry
- // can be overwritten in a @ClusterTest as the registry is a singleton.
- long getMemoryPoolAvailable(ClusterInstance clusterInstance) throws
Exception {
- KafkaBroker broker =
clusterInstance.aliveBrokers().values().iterator().next();
- SocketServer socketServer = broker.socketServer();
- Field memoryPoolField =
socketServer.getClass().getDeclaredField("memoryPool");
- memoryPoolField.setAccessible(true);
- MemoryPool memoryPool = (MemoryPool) memoryPoolField.get(socketServer);
- return memoryPool.availableMemory();
+ private SocketServer getSocketServer(ClusterInstance clusterInstance) {
+ return
clusterInstance.brokers().get(TestKitDefaults.BROKER_ID_OFFSET).socketServer();
}
- /**
- * Builds a raw Kafka request excluding the frame length
- *
- * <p>Wire layout:
- * <pre>
- * 4 bytes – frame length (payload size, not including these 4 bytes)
- *
- * 2 bytes – api_key
- * 2 bytes – api_version
- * 4 bytes – correlation_id
- * 2 bytes – client_id string length
- * N bytes – client_id (UTF-8)
- * X bytes - request body
- * </pre>
- */
- private static byte[] buildRawRequest(short apiKey, short apiVersion, int
correlationId, String clientId, byte[] body) {
- byte[] clientIdBytes = clientId.getBytes(StandardCharsets.UTF_8);
-
- // Header: api_key(2) + api_version(2) + correlation_id(4) +
client_id_len(2) + client_id
- int headerSize = 2 + 2 + 4 + 2 + clientIdBytes.length;
- int payloadSize = headerSize + body.length;
+ private int getBrokerBoundPort(ClusterInstance clusterInstance) {
+ return
getSocketServer(clusterInstance).boundPort(ListenerName.normalised(TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME));
Review Comment:
Again if we can't use `brokerBoundPorts()`, we should use
`clusterInstance.clientListener()` to retrieve the listener name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]