mimaison commented on code in PR #20672: URL: https://github.com/apache/kafka/pull/20672#discussion_r2429320859
########## clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryContext.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.telemetry; + +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; + +/** + * {@code ClientTelemetryContext} provides context information for client telemetry requests, + * including the push interval and authorization details. + */ +public interface ClientTelemetryContext { + + /** + * Returns the interval at which the client pushes telemetry metrics to the broker. + * This value can be used by metrics exporters to determine when metrics should be + * considered stale or expired. + * + * @return the push interval in milliseconds + */ + int pushIntervalMs(); + + /** + * Returns the authorization context for the client request. + * + * @return the client request context for the corresponding {@code PushTelemetryRequest} API call + */ + AuthorizableRequestContext authorizableRequestContext(); +} Review Comment: Let's have a new line. Same in the other files ########## clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetry.java: ########## @@ -22,7 +22,12 @@ /** * A {@link MetricsReporter} may implement this interface to indicate support for collecting client * telemetry on the server side. + * + * @deprecated Since 4.1.0, use {@link ClientTelemetryExporterProvider} instead. This interface will be Review Comment: 4.1.0 is out already, let's try to get this in 4.2.0 ########## clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryContext.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.telemetry; + +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; + +/** + * {@code ClientTelemetryContext} provides context information for client telemetry requests, + * including the push interval and authorization details. Review Comment: `authorization details` is kind of confusing is not quite correct. AuthorizableRequestContext is a generic request context. ########## clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetry.java: ########## @@ -22,7 +22,12 @@ /** * A {@link MetricsReporter} may implement this interface to indicate support for collecting client * telemetry on the server side. + * + * @deprecated Since 4.1.0, use {@link ClientTelemetryExporterProvider} instead. This interface will be + * removed in Kafka 5.0.0. The new interface provides a {@link ClientTelemetryExporter} + * which includes additional context such as the push interval. */ +@Deprecated Review Comment: We can use `@Deprecated(since = "4.2", forRemoval = true)` ########## clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryContext.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.telemetry; + +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; + +/** + * {@code ClientTelemetryContext} provides context information for client telemetry requests, + * including the push interval and authorization details. + */ +public interface ClientTelemetryContext { + + /** + * Returns the interval at which the client pushes telemetry metrics to the broker. Review Comment: Also like in the KIP I'd mention this is the interval from the subscription. ########## clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryContext.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.telemetry; + +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; + +/** + * {@code ClientTelemetryContext} provides context information for client telemetry requests, + * including the push interval and authorization details. + */ +public interface ClientTelemetryContext { + + /** + * Returns the interval at which the client pushes telemetry metrics to the broker. Review Comment: Maybe we should mention that for the initial metric push and pushes following a subscription update and/or error a jitter (within x0.5 and x1.5) is applied to the interval ########## server/src/test/java/org/apache/kafka/server/metrics/ClientTelemetryInterfaceCompatibilityTest.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientMetricsReceiver; +import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientTelemetryExporter; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientTelemetryInterfaceCompatibilityTest { + + private ClientTelemetryPlugin clientTelemetryPlugin; + + @BeforeEach + public void setUp() { + clientTelemetryPlugin = new ClientTelemetryPlugin(); + } + + @Test + public void testDeprecatedClientTelemetryReceiverInterface() throws UnknownHostException { + // Test that the deprecated ClientTelemetryReceiver interface still works + TestClientMetricsReceiver receiver = new TestClientMetricsReceiver(); + + assertTrue(clientTelemetryPlugin.isEmpty()); + clientTelemetryPlugin.add(receiver); + assertFalse(clientTelemetryPlugin.isEmpty()); + + assertEquals(0, receiver.exportMetricsInvokedCount); + assertTrue(receiver.metricsData.isEmpty()); + + byte[] metrics = "test-metrics-deprecated".getBytes(StandardCharsets.UTF_8); + clientTelemetryPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(), + new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 5000); + + // Verify deprecated receiver was called + assertEquals(1, receiver.exportMetricsInvokedCount); + assertEquals(1, receiver.metricsData.size()); + assertEquals(ByteBuffer.wrap(metrics), receiver.metricsData.get(0)); + } + + @Test + public void testNewClientTelemetryExporterInterface() throws UnknownHostException { + // Test that the new ClientTelemetryExporter interface works + TestClientTelemetryExporter exporter = new TestClientTelemetryExporter(); + + assertTrue(clientTelemetryPlugin.isEmpty()); + clientTelemetryPlugin.add(exporter); + assertFalse(clientTelemetryPlugin.isEmpty()); + + assertEquals(0, exporter.exportMetricsInvokedCount); + assertTrue(exporter.metricsData.isEmpty()); + assertTrue(exporter.pushIntervals.isEmpty()); + + byte[] metrics = "test-metrics-new".getBytes(StandardCharsets.UTF_8); + int pushIntervalMs = 10000; + clientTelemetryPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(), + new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), pushIntervalMs); + + // Verify new exporter was called with push interval + assertEquals(1, exporter.exportMetricsInvokedCount); + assertEquals(1, exporter.metricsData.size()); + assertEquals(ByteBuffer.wrap(metrics), exporter.metricsData.get(0)); + assertEquals(1, exporter.pushIntervals.size()); + assertEquals(pushIntervalMs, exporter.pushIntervals.get(0)); + } + + @Test + public void testBothInterfacesCoexist() throws UnknownHostException { + // Test that both deprecated and new interfaces can coexist + TestClientMetricsReceiver receiver = new TestClientMetricsReceiver(); + TestClientTelemetryExporter exporter = new TestClientTelemetryExporter(); + + clientTelemetryPlugin.add(receiver); + clientTelemetryPlugin.add(exporter); + assertFalse(clientTelemetryPlugin.isEmpty()); + + byte[] metrics = "test-metrics-both".getBytes(StandardCharsets.UTF_8); + int pushIntervalMs = 15000; + clientTelemetryPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(), + new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), pushIntervalMs); + + // Verify both were called + assertEquals(1, receiver.exportMetricsInvokedCount); + assertEquals(1, receiver.metricsData.size()); + assertEquals(ByteBuffer.wrap(metrics), receiver.metricsData.get(0)); + + assertEquals(1, exporter.exportMetricsInvokedCount); + assertEquals(1, exporter.metricsData.size()); + assertEquals(ByteBuffer.wrap(metrics), exporter.metricsData.get(0)); + assertEquals(pushIntervalMs, exporter.pushIntervals.get(0)); + } + + @Test + public void testMultipleDeprecatedReceivers() throws UnknownHostException { + // Test that multiple deprecated receivers can be registered + TestClientMetricsReceiver receiver1 = new TestClientMetricsReceiver(); + TestClientMetricsReceiver receiver2 = new TestClientMetricsReceiver(); + + clientTelemetryPlugin.add(receiver1); + clientTelemetryPlugin.add(receiver2); + + byte[] metrics = "test-metrics-multiple".getBytes(StandardCharsets.UTF_8); + clientTelemetryPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(), + new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 5000); + + // Verify both receivers were called + assertEquals(1, receiver1.exportMetricsInvokedCount); + assertEquals(1, receiver2.exportMetricsInvokedCount); + assertEquals(ByteBuffer.wrap(metrics), receiver1.metricsData.get(0)); + assertEquals(ByteBuffer.wrap(metrics), receiver2.metricsData.get(0)); + } + + @Test + public void testMultipleNewExporters() throws UnknownHostException { + // Test that multiple new exporters can be registered + TestClientTelemetryExporter exporter1 = new TestClientTelemetryExporter(); + TestClientTelemetryExporter exporter2 = new TestClientTelemetryExporter(); + + clientTelemetryPlugin.add(exporter1); + clientTelemetryPlugin.add(exporter2); + + byte[] metrics = "test-metrics-multiple-new".getBytes(StandardCharsets.UTF_8); + int pushIntervalMs = 20000; + clientTelemetryPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(), + new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), pushIntervalMs); + + // Verify both exporters were called + assertEquals(1, exporter1.exportMetricsInvokedCount); + assertEquals(1, exporter2.exportMetricsInvokedCount); + assertEquals(ByteBuffer.wrap(metrics), exporter1.metricsData.get(0)); + assertEquals(ByteBuffer.wrap(metrics), exporter2.metricsData.get(0)); + assertEquals(pushIntervalMs, exporter1.pushIntervals.get(0)); + assertEquals(pushIntervalMs, exporter2.pushIntervals.get(0)); + } + + @Test + public void testNullAndEmptyMetricsPayload() throws UnknownHostException { + // Test that null and empty metrics are passed through to exporters + // (ClientMetricsManager is responsible for filtering these out before calling the plugin) Review Comment: As the comment hints this is not really testing ClientMetricsReceiver nor ClientTelemetryExporter logic, so I'm not sure this test is relevant. ########## server/src/test/java/org/apache/kafka/server/metrics/ClientTelemetryInterfaceCompatibilityTest.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientMetricsReceiver; +import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientTelemetryExporter; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientTelemetryInterfaceCompatibilityTest { + + private ClientTelemetryPlugin clientTelemetryPlugin; + + @BeforeEach + public void setUp() { + clientTelemetryPlugin = new ClientTelemetryPlugin(); + } + + @Test + public void testDeprecatedClientTelemetryReceiverInterface() throws UnknownHostException { + // Test that the deprecated ClientTelemetryReceiver interface still works + TestClientMetricsReceiver receiver = new TestClientMetricsReceiver(); + + assertTrue(clientTelemetryPlugin.isEmpty()); + clientTelemetryPlugin.add(receiver); + assertFalse(clientTelemetryPlugin.isEmpty()); + + assertEquals(0, receiver.exportMetricsInvokedCount); + assertTrue(receiver.metricsData.isEmpty()); + + byte[] metrics = "test-metrics-deprecated".getBytes(StandardCharsets.UTF_8); + clientTelemetryPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(), + new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 5000); + + // Verify deprecated receiver was called + assertEquals(1, receiver.exportMetricsInvokedCount); + assertEquals(1, receiver.metricsData.size()); + assertEquals(ByteBuffer.wrap(metrics), receiver.metricsData.get(0)); + } + + @Test + public void testNewClientTelemetryExporterInterface() throws UnknownHostException { + // Test that the new ClientTelemetryExporter interface works + TestClientTelemetryExporter exporter = new TestClientTelemetryExporter(); + + assertTrue(clientTelemetryPlugin.isEmpty()); + clientTelemetryPlugin.add(exporter); + assertFalse(clientTelemetryPlugin.isEmpty()); + + assertEquals(0, exporter.exportMetricsInvokedCount); + assertTrue(exporter.metricsData.isEmpty()); + assertTrue(exporter.pushIntervals.isEmpty()); + + byte[] metrics = "test-metrics-new".getBytes(StandardCharsets.UTF_8); + int pushIntervalMs = 10000; + clientTelemetryPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(), + new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), pushIntervalMs); + + // Verify new exporter was called with push interval + assertEquals(1, exporter.exportMetricsInvokedCount); + assertEquals(1, exporter.metricsData.size()); + assertEquals(ByteBuffer.wrap(metrics), exporter.metricsData.get(0)); + assertEquals(1, exporter.pushIntervals.size()); + assertEquals(pushIntervalMs, exporter.pushIntervals.get(0)); + } + + @Test + public void testBothInterfacesCoexist() throws UnknownHostException { + // Test that both deprecated and new interfaces can coexist + TestClientMetricsReceiver receiver = new TestClientMetricsReceiver(); + TestClientTelemetryExporter exporter = new TestClientTelemetryExporter(); + + clientTelemetryPlugin.add(receiver); + clientTelemetryPlugin.add(exporter); + assertFalse(clientTelemetryPlugin.isEmpty()); + + byte[] metrics = "test-metrics-both".getBytes(StandardCharsets.UTF_8); + int pushIntervalMs = 15000; + clientTelemetryPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(), + new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), pushIntervalMs); + + // Verify both were called + assertEquals(1, receiver.exportMetricsInvokedCount); + assertEquals(1, receiver.metricsData.size()); + assertEquals(ByteBuffer.wrap(metrics), receiver.metricsData.get(0)); + + assertEquals(1, exporter.exportMetricsInvokedCount); + assertEquals(1, exporter.metricsData.size()); + assertEquals(ByteBuffer.wrap(metrics), exporter.metricsData.get(0)); + assertEquals(pushIntervalMs, exporter.pushIntervals.get(0)); + } + + @Test + public void testMultipleDeprecatedReceivers() throws UnknownHostException { + // Test that multiple deprecated receivers can be registered + TestClientMetricsReceiver receiver1 = new TestClientMetricsReceiver(); + TestClientMetricsReceiver receiver2 = new TestClientMetricsReceiver(); + + clientTelemetryPlugin.add(receiver1); + clientTelemetryPlugin.add(receiver2); + + byte[] metrics = "test-metrics-multiple".getBytes(StandardCharsets.UTF_8); + clientTelemetryPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(), + new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 5000); + + // Verify both receivers were called + assertEquals(1, receiver1.exportMetricsInvokedCount); + assertEquals(1, receiver2.exportMetricsInvokedCount); + assertEquals(ByteBuffer.wrap(metrics), receiver1.metricsData.get(0)); + assertEquals(ByteBuffer.wrap(metrics), receiver2.metricsData.get(0)); + } + + @Test + public void testMultipleNewExporters() throws UnknownHostException { + // Test that multiple new exporters can be registered + TestClientTelemetryExporter exporter1 = new TestClientTelemetryExporter(); + TestClientTelemetryExporter exporter2 = new TestClientTelemetryExporter(); + + clientTelemetryPlugin.add(exporter1); + clientTelemetryPlugin.add(exporter2); + + byte[] metrics = "test-metrics-multiple-new".getBytes(StandardCharsets.UTF_8); + int pushIntervalMs = 20000; + clientTelemetryPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(), + new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), pushIntervalMs); + + // Verify both exporters were called + assertEquals(1, exporter1.exportMetricsInvokedCount); + assertEquals(1, exporter2.exportMetricsInvokedCount); + assertEquals(ByteBuffer.wrap(metrics), exporter1.metricsData.get(0)); + assertEquals(ByteBuffer.wrap(metrics), exporter2.metricsData.get(0)); + assertEquals(pushIntervalMs, exporter1.pushIntervals.get(0)); + assertEquals(pushIntervalMs, exporter2.pushIntervals.get(0)); + } + + @Test + public void testNullAndEmptyMetricsPayload() throws UnknownHostException { + // Test that null and empty metrics are passed through to exporters + // (ClientMetricsManager is responsible for filtering these out before calling the plugin) + TestClientMetricsReceiver receiver = new TestClientMetricsReceiver(); + TestClientTelemetryExporter exporter = new TestClientTelemetryExporter(); + + clientTelemetryPlugin.add(receiver); + clientTelemetryPlugin.add(exporter); + + // Export with null metrics - exporters are still called + clientTelemetryPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(), + new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(null), true).build(), 5000); + + // Verify both receiver and exporter were called (plugin doesn't filter) + assertEquals(1, receiver.exportMetricsInvokedCount); + assertEquals(1, exporter.exportMetricsInvokedCount); + + // Export with empty ByteBuffer (0 bytes) + clientTelemetryPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(), + new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.allocate(0)), true).build(), 5000); + + // Verify exporters were called again with empty buffer + assertEquals(2, receiver.exportMetricsInvokedCount); + assertEquals(2, exporter.exportMetricsInvokedCount); + assertEquals(0, receiver.metricsData.get(1).remaining()); + assertEquals(0, exporter.metricsData.get(1).remaining()); + } + + @Test + public void testPushIntervalPropagation() throws UnknownHostException { Review Comment: Again I'm unconvinced this test adds much value. Other tests already check the interval is passed. ########## server/src/test/java/org/apache/kafka/server/metrics/ClientTelemetryInterfaceCompatibilityTest.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientMetricsReceiver; +import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientTelemetryExporter; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientTelemetryInterfaceCompatibilityTest { Review Comment: These tests look very different to the ones in `ClientMetricsTelemetryPluginTest`. Can you clarify how they differ? ########## server/src/test/java/org/apache/kafka/server/metrics/ClientTelemetryInterfaceCompatibilityTest.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientMetricsReceiver; +import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientTelemetryExporter; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientTelemetryInterfaceCompatibilityTest { Review Comment: I'm not sure we need `Compatibility` in the class name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
