apoorvmittal10 commented on code in PR #20144: URL: https://github.com/apache/kafka/pull/20144#discussion_r2237671415
########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ########## @@ -181,13 +182,24 @@ public static boolean validateRequiredResourceLabels(Map<String, String> metadat return validateResourceLabel(metadata, MetricsContext.NAMESPACE); } - public static CompressionType preferredCompressionType(List<CompressionType> acceptedCompressionTypes) { - if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { - // Broker is providing the compression types in order of preference. Grab the - // first one. - return acceptedCompressionTypes.get(0); - } - return CompressionType.NONE; + /** + * Determines the preferred compression type from broker-accepted types, avoiding unsupported ones. + * + * @param acceptedCompressionTypes the list of compression types accepted by the broker in order + * of preference (must not be null, use empty list if no compression is accepted) + * @param unsupportedCompressionTypes the set of compression types that should be avoided due to + * missing libraries or previous failures (must not be null) + * @return the preferred compression type to use, or {@link CompressionType#NONE} if no acceptable + * compression type is available + * @throws NullPointerException if acceptedCompressionTypes or unsupportedCompressionTypes is null Review Comment: Having NPE in @throws and being documented seem a bit inappropriate. You have already mentioned that non-null values are expected for these params. ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ########## @@ -713,14 +716,20 @@ private Optional<Builder<?>> createPushRequest(ClientTelemetrySubscription local return Optional.empty(); } - CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); + CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(), unsupportedCompressionTypes); ByteBuffer compressedPayload; try { compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); - } catch (Throwable e) { - log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); - compressedPayload = ByteBuffer.wrap(payload.toByteArray()); - compressionType = CompressionType.NONE; + } catch (Throwable e) { + if (e instanceof IOException || e instanceof NoClassDefFoundError || Review Comment: For my understanding, why we do not have `e.getCause() instanceof IOException` and only for NoClassDefFoundError? And from where do we expect the `NoClassDefFoundError` will be wrapped in some other exception? ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ########## @@ -713,14 +716,23 @@ private Optional<Builder<?>> createPushRequest(ClientTelemetrySubscription local return Optional.empty(); } - CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); + CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(), unsupportedCompressionTypes); ByteBuffer compressedPayload; try { compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); - } catch (Throwable e) { - log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); + } catch (IOException e) { + log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType, e); compressedPayload = ByteBuffer.wrap(payload.toByteArray()); compressionType = CompressionType.NONE; + } catch (Throwable e) { + if (e instanceof NoClassDefFoundError || e.getCause() instanceof NoClassDefFoundError) { + log.debug("Compression library {} not found, sending uncompressed data", compressionType, e); + unsupportedCompressionTypes.add(compressionType); Review Comment: Then why it's not for other exception types i.e. if there is some other exception other thant IOException and NoClassDefFoundError then it might be the exception re-occurs next time. Or are we expecting that the next push might successfully compress for the compression type it failed last time? ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ########## @@ -181,13 +182,24 @@ public static boolean validateRequiredResourceLabels(Map<String, String> metadat return validateResourceLabel(metadata, MetricsContext.NAMESPACE); } - public static CompressionType preferredCompressionType(List<CompressionType> acceptedCompressionTypes) { - if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { - // Broker is providing the compression types in order of preference. Grab the - // first one. - return acceptedCompressionTypes.get(0); - } - return CompressionType.NONE; + /** + * Determines the preferred compression type from broker-accepted types, avoiding unsupported ones. + * + * @param acceptedCompressionTypes the list of compression types accepted by the broker in order + * of preference (must not be null, use empty list if no compression is accepted) + * @param unsupportedCompressionTypes the set of compression types that should be avoided due to + * missing libraries or previous failures (must not be null) + * @return the preferred compression type to use, or {@link CompressionType#NONE} if no acceptable + * compression type is available + * @throws NullPointerException if acceptedCompressionTypes or unsupportedCompressionTypes is null + */ + public static CompressionType preferredCompressionType(List<CompressionType> acceptedCompressionTypes, Set<CompressionType> unsupportedCompressionTypes) { + // Broker is providing the compression types in order of preference. Grab the + // first one that's not unsupported. Review Comment: ```suggestion // first one that's supported. ``` ########## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java: ########## @@ -713,14 +716,20 @@ private Optional<Builder<?>> createPushRequest(ClientTelemetrySubscription local return Optional.empty(); } - CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); + CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes(), unsupportedCompressionTypes); ByteBuffer compressedPayload; try { compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); - } catch (Throwable e) { - log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); - compressedPayload = ByteBuffer.wrap(payload.toByteArray()); - compressionType = CompressionType.NONE; + } catch (Throwable e) { + if (e instanceof IOException || e instanceof NoClassDefFoundError || + e.getCause() instanceof NoClassDefFoundError) { + log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); + unsupportedCompressionTypes.add(compressionType); + compressedPayload = ByteBuffer.wrap(payload.toByteArray()); + compressionType = CompressionType.NONE; + } else { + throw new RuntimeException("Unexpected compression error", e); Review Comment: What will happen with the producer thread when this exception is thrown? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org