mimaison commented on code in PR #21873:
URL: https://github.com/apache/kafka/pull/21873#discussion_r3203137915


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/common/SocketServerMemoryPoolTest.java:
##########
@@ -50,42 +51,31 @@
 })
 public class SocketServerMemoryPoolTest {
     @ClusterTest
-    public void testProduceRequestWithUnsupportedVersion(ClusterInstance 
clusterInstance) throws Exception {
-        short unsupportedVersion = Short.MAX_VALUE;
-        byte[] rawRequestBytes = buildRawRequest(
-                ApiKeys.PRODUCE.id,
-                unsupportedVersion,
-                /* correlationId */ 1,
-                /* clientId     */ "test-unsupported-version",
-                new byte[10000]
-        );
+    public void testRequestWithUnsupportedVersion(ClusterInstance 
clusterInstance) throws Exception {
+        RequestHeader header = 
IntegrationTestUtils.nextRequestHeader(ApiKeys.PRODUCE, Short.MAX_VALUE);
+        ByteBuffer buffer = RequestUtils.serialize(header.data(), 
header.headerVersion(), new ProduceRequestData(), header.apiVersion());
+        byte[] rawRequestBytes = buffer.array();
 
         sendAndAssert(clusterInstance, rawRequestBytes);
     }
 
     @ClusterTest
-    public void testProduceRequestWithCorruptBody(ClusterInstance 
clusterInstance) throws Exception {
-        short validVersion = 3;
-        byte[] corruptBody = new byte[10000];
-        for (int i = 0; i < corruptBody.length; i++) {
-            corruptBody[i] = (byte) 0xFF; // The corrupt body (0xFF ... 0xFF) 
makes Schema.read() throw SchemaException.
+    public void testRequestWithCorruptBody(ClusterInstance clusterInstance) 
throws Exception {
+        RequestHeader header = 
IntegrationTestUtils.nextRequestHeader(ApiKeys.PRODUCE, 
ApiKeys.PRODUCE.latestVersion());
+        ByteBuffer buffer = RequestUtils.serialize(header.data(), 
header.headerVersion(), new ProduceRequestData(), header.apiVersion());
+        byte[] rawRequestBytes = buffer.array();
+
+        // corrupt body but leave header valid

Review Comment:
   Should we assert that `rawRequestBytes.length` is larger than 
`header.size()` so that we effectively enter the `for` loop.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/common/SocketServerMemoryPoolTest.java:
##########
@@ -96,48 +86,16 @@ private void sendAndAssert(ClusterInstance clusterInstance, 
byte[] rawRequestByt
         assertEquals(initialMemoryPoolAvailable, finalMemoryPoolAvailable);
     }
 
-    // This test uses reflection to read the SocketServer memoryPool 
availableMemory.
-    // The metric "MemoryPoolAvailable" from Yammer Metrics default registry
-    // can be overwritten in a @ClusterTest as the registry is a singleton.
-    long getMemoryPoolAvailable(ClusterInstance clusterInstance) throws 
Exception {
-        KafkaBroker broker = 
clusterInstance.aliveBrokers().values().iterator().next();
-        SocketServer socketServer = broker.socketServer();
-        Field memoryPoolField = 
socketServer.getClass().getDeclaredField("memoryPool");
-        memoryPoolField.setAccessible(true);
-        MemoryPool memoryPool = (MemoryPool) memoryPoolField.get(socketServer);
-        return memoryPool.availableMemory();
+    private SocketServer getSocketServer(ClusterInstance clusterInstance) {
+        return 
clusterInstance.brokers().get(TestKitDefaults.BROKER_ID_OFFSET).socketServer();
     }
 
-    /**
-     * Builds a raw Kafka request excluding the frame length
-     *
-     * <p>Wire layout:
-     * <pre>
-     *   4 bytes – frame length (payload size, not including these 4 bytes)
-     *
-     *   2 bytes – api_key
-     *   2 bytes – api_version
-     *   4 bytes – correlation_id
-     *   2 bytes – client_id string length
-     *   N bytes – client_id (UTF-8)
-     *   X bytes - request body
-     * </pre>
-     */
-    private static byte[] buildRawRequest(short apiKey, short apiVersion, int 
correlationId, String clientId, byte[] body) {
-        byte[] clientIdBytes = clientId.getBytes(StandardCharsets.UTF_8);
-
-        // Header: api_key(2) + api_version(2) + correlation_id(4) + 
client_id_len(2) + client_id
-        int headerSize = 2 + 2 + 4 + 2 + clientIdBytes.length;
-        int payloadSize = headerSize + body.length;
+    private int getBrokerBoundPort(ClusterInstance clusterInstance) {

Review Comment:
   If there's a good reason why we can't use `brokerBoundPorts()`, we have the 
`boundPort()` method on `BrokerServer` to retrieve the port. That method uses 
`socketServer` itself so we don't have to.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/common/SocketServerMemoryPoolTest.java:
##########
@@ -50,42 +51,31 @@
 })
 public class SocketServerMemoryPoolTest {
     @ClusterTest
-    public void testProduceRequestWithUnsupportedVersion(ClusterInstance 
clusterInstance) throws Exception {
-        short unsupportedVersion = Short.MAX_VALUE;
-        byte[] rawRequestBytes = buildRawRequest(
-                ApiKeys.PRODUCE.id,
-                unsupportedVersion,
-                /* correlationId */ 1,
-                /* clientId     */ "test-unsupported-version",
-                new byte[10000]
-        );
+    public void testRequestWithUnsupportedVersion(ClusterInstance 
clusterInstance) throws Exception {
+        RequestHeader header = 
IntegrationTestUtils.nextRequestHeader(ApiKeys.PRODUCE, Short.MAX_VALUE);
+        ByteBuffer buffer = RequestUtils.serialize(header.data(), 
header.headerVersion(), new ProduceRequestData(), header.apiVersion());
+        byte[] rawRequestBytes = buffer.array();
 
         sendAndAssert(clusterInstance, rawRequestBytes);
     }
 
     @ClusterTest
-    public void testProduceRequestWithCorruptBody(ClusterInstance 
clusterInstance) throws Exception {
-        short validVersion = 3;
-        byte[] corruptBody = new byte[10000];
-        for (int i = 0; i < corruptBody.length; i++) {
-            corruptBody[i] = (byte) 0xFF; // The corrupt body (0xFF ... 0xFF) 
makes Schema.read() throw SchemaException.
+    public void testRequestWithCorruptBody(ClusterInstance clusterInstance) 
throws Exception {
+        RequestHeader header = 
IntegrationTestUtils.nextRequestHeader(ApiKeys.PRODUCE, 
ApiKeys.PRODUCE.latestVersion());
+        ByteBuffer buffer = RequestUtils.serialize(header.data(), 
header.headerVersion(), new ProduceRequestData(), header.apiVersion());
+        byte[] rawRequestBytes = buffer.array();
+
+        // corrupt body but leave header valid
+        for (int i = header.size(); i < rawRequestBytes.length; i++) {
+            rawRequestBytes[i] = (byte) 0xFF;
         }
-
-        byte[] rawRequestBytes = buildRawRequest(
-                ApiKeys.PRODUCE.id,
-                validVersion,
-                /* correlationId */ 2,
-                /* clientId     */ "test-corrupt-body",
-                corruptBody
-        );
-
         sendAndAssert(clusterInstance, rawRequestBytes);
     }
 
     private void sendAndAssert(ClusterInstance clusterInstance, byte[] 
rawRequestBytes) throws Exception {
         long initialMemoryPoolAvailable = 
getMemoryPoolAvailable(clusterInstance);
 
-        try (Socket socket = 
IntegrationTestUtils.connect(clusterInstance.brokerBoundPorts().get(0))) {
+        try (Socket socket = 
IntegrationTestUtils.connect(getBrokerBoundPort(clusterInstance))) {

Review Comment:
   Why can't we use `clusterInstance.brokerBoundPorts().get(0)` anymore?



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/common/SocketServerMemoryPoolTest.java:
##########
@@ -96,48 +86,16 @@ private void sendAndAssert(ClusterInstance clusterInstance, 
byte[] rawRequestByt
         assertEquals(initialMemoryPoolAvailable, finalMemoryPoolAvailable);
     }
 
-    // This test uses reflection to read the SocketServer memoryPool 
availableMemory.
-    // The metric "MemoryPoolAvailable" from Yammer Metrics default registry
-    // can be overwritten in a @ClusterTest as the registry is a singleton.
-    long getMemoryPoolAvailable(ClusterInstance clusterInstance) throws 
Exception {
-        KafkaBroker broker = 
clusterInstance.aliveBrokers().values().iterator().next();
-        SocketServer socketServer = broker.socketServer();
-        Field memoryPoolField = 
socketServer.getClass().getDeclaredField("memoryPool");
-        memoryPoolField.setAccessible(true);
-        MemoryPool memoryPool = (MemoryPool) memoryPoolField.get(socketServer);
-        return memoryPool.availableMemory();
+    private SocketServer getSocketServer(ClusterInstance clusterInstance) {
+        return 
clusterInstance.brokers().get(TestKitDefaults.BROKER_ID_OFFSET).socketServer();
     }
 
-    /**
-     * Builds a raw Kafka request excluding the frame length
-     *
-     * <p>Wire layout:
-     * <pre>
-     *   4 bytes – frame length (payload size, not including these 4 bytes)
-     *
-     *   2 bytes – api_key
-     *   2 bytes – api_version
-     *   4 bytes – correlation_id
-     *   2 bytes – client_id string length
-     *   N bytes – client_id (UTF-8)
-     *   X bytes - request body
-     * </pre>
-     */
-    private static byte[] buildRawRequest(short apiKey, short apiVersion, int 
correlationId, String clientId, byte[] body) {
-        byte[] clientIdBytes = clientId.getBytes(StandardCharsets.UTF_8);
-
-        // Header: api_key(2) + api_version(2) + correlation_id(4) + 
client_id_len(2) + client_id
-        int headerSize = 2 + 2 + 4 + 2 + clientIdBytes.length;
-        int payloadSize = headerSize + body.length;
+    private int getBrokerBoundPort(ClusterInstance clusterInstance) {
+        return 
getSocketServer(clusterInstance).boundPort(ListenerName.normalised(TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME));

Review Comment:
   Again if we can't use `brokerBoundPorts()`, we should use 
`clusterInstance.clientListener()` to retrieve the listener name. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to