apoorvmittal10 commented on code in PR #20144:
URL: https://github.com/apache/kafka/pull/20144#discussion_r2237671415


##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##########
@@ -181,13 +182,24 @@ public static boolean 
validateRequiredResourceLabels(Map<String, String> metadat
         return validateResourceLabel(metadata, MetricsContext.NAMESPACE);
     }
 
-    public static CompressionType 
preferredCompressionType(List<CompressionType> acceptedCompressionTypes) {
-        if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
-            // Broker is providing the compression types in order of 
preference. Grab the
-            // first one.
-            return acceptedCompressionTypes.get(0);
-        }
-        return CompressionType.NONE;
+    /**
+     * Determines the preferred compression type from broker-accepted types, 
avoiding unsupported ones.
+     * 
+     * @param acceptedCompressionTypes the list of compression types accepted 
by the broker in order 
+     *                                of preference (must not be null, use 
empty list if no compression is accepted)
+     * @param unsupportedCompressionTypes the set of compression types that 
should be avoided due to 
+     *                                   missing libraries or previous 
failures (must not be null)
+     * @return the preferred compression type to use, or {@link 
CompressionType#NONE} if no acceptable
+     *         compression type is available
+     * @throws NullPointerException if acceptedCompressionTypes or 
unsupportedCompressionTypes is null

Review Comment:
   Having NPE in @throws and being documented seem a bit inappropriate. You 
have already mentioned that non-null values are expected for these params.



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##########
@@ -713,14 +716,20 @@ private Optional<Builder<?>> 
createPushRequest(ClientTelemetrySubscription local
                 return Optional.empty();
             }
 
-            CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
+            CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(),
 unsupportedCompressionTypes);
             ByteBuffer compressedPayload;
             try {
                 compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
-            } catch (Throwable e) {
-                log.debug("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);
-                compressedPayload = ByteBuffer.wrap(payload.toByteArray());
-                compressionType = CompressionType.NONE;
+            } catch (Throwable e) { 
+                if (e instanceof IOException || e instanceof 
NoClassDefFoundError || 

Review Comment:
   For my understanding, why we do not have `e.getCause() instanceof 
IOException` and only for NoClassDefFoundError? And from where do we expect the 
`NoClassDefFoundError` will be wrapped in some other exception?



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##########
@@ -713,14 +716,23 @@ private Optional<Builder<?>> 
createPushRequest(ClientTelemetrySubscription local
                 return Optional.empty();
             }
 
-            CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
+            CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(),
 unsupportedCompressionTypes);
             ByteBuffer compressedPayload;
             try {
                 compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
-            } catch (Throwable e) {
-                log.debug("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);
+            } catch (IOException e) {
+                log.debug("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType, e);
                 compressedPayload = ByteBuffer.wrap(payload.toByteArray());
                 compressionType = CompressionType.NONE;
+            } catch (Throwable e) { 
+                if (e instanceof NoClassDefFoundError || e.getCause() 
instanceof NoClassDefFoundError) {
+                    log.debug("Compression library {} not found, sending 
uncompressed data", compressionType, e);
+                    unsupportedCompressionTypes.add(compressionType);

Review Comment:
   Then why it's not for other exception types i.e. if there is some other 
exception other thant IOException and NoClassDefFoundError then it might be the 
exception re-occurs next time. Or are we expecting that the next push might 
successfully compress for the compression type it failed last time?



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##########
@@ -181,13 +182,24 @@ public static boolean 
validateRequiredResourceLabels(Map<String, String> metadat
         return validateResourceLabel(metadata, MetricsContext.NAMESPACE);
     }
 
-    public static CompressionType 
preferredCompressionType(List<CompressionType> acceptedCompressionTypes) {
-        if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
-            // Broker is providing the compression types in order of 
preference. Grab the
-            // first one.
-            return acceptedCompressionTypes.get(0);
-        }
-        return CompressionType.NONE;
+    /**
+     * Determines the preferred compression type from broker-accepted types, 
avoiding unsupported ones.
+     * 
+     * @param acceptedCompressionTypes the list of compression types accepted 
by the broker in order 
+     *                                of preference (must not be null, use 
empty list if no compression is accepted)
+     * @param unsupportedCompressionTypes the set of compression types that 
should be avoided due to 
+     *                                   missing libraries or previous 
failures (must not be null)
+     * @return the preferred compression type to use, or {@link 
CompressionType#NONE} if no acceptable
+     *         compression type is available
+     * @throws NullPointerException if acceptedCompressionTypes or 
unsupportedCompressionTypes is null
+     */
+    public static CompressionType 
preferredCompressionType(List<CompressionType> acceptedCompressionTypes, 
Set<CompressionType> unsupportedCompressionTypes) {
+        // Broker is providing the compression types in order of preference. 
Grab the
+        // first one that's not unsupported.

Review Comment:
   ```suggestion
           // first one that's supported.
   ```



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java:
##########
@@ -713,14 +716,20 @@ private Optional<Builder<?>> 
createPushRequest(ClientTelemetrySubscription local
                 return Optional.empty();
             }
 
-            CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
+            CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(),
 unsupportedCompressionTypes);
             ByteBuffer compressedPayload;
             try {
                 compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
-            } catch (Throwable e) {
-                log.debug("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);
-                compressedPayload = ByteBuffer.wrap(payload.toByteArray());
-                compressionType = CompressionType.NONE;
+            } catch (Throwable e) { 
+                if (e instanceof IOException || e instanceof 
NoClassDefFoundError || 
+                        e.getCause() instanceof NoClassDefFoundError) {
+                    log.debug("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);
+                    unsupportedCompressionTypes.add(compressionType);
+                    compressedPayload = ByteBuffer.wrap(payload.toByteArray());
+                    compressionType = CompressionType.NONE;
+                } else {
+                    throw new RuntimeException("Unexpected compression error", 
e);

Review Comment:
   What will happen with the producer thread when this exception is thrown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to