mjsax commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1446041813
##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##########
@@ -175,16 +182,42 @@ public static boolean
validateRequiredResourceLabels(Map<String, String> metadat
}
public static CompressionType
preferredCompressionType(List<CompressionType> acceptedCompressionTypes) {
- // TODO: Support compression in client telemetry.
+ if (acceptedCompressionTypes != null &&
!acceptedCompressionTypes.isEmpty()) {
+ // Broker is providing the compression types in order of
preference. Grab the
+ // first one.
+ return acceptedCompressionTypes.get(0);
+ }
return CompressionType.NONE;
}
public static ByteBuffer compress(byte[] raw, CompressionType
compressionType) {
- // TODO: Support compression in client telemetry.
- if (compressionType == CompressionType.NONE) {
- return ByteBuffer.wrap(raw);
- } else {
- throw new UnsupportedOperationException("Compression is not
supported");
+ try (ByteBufferOutputStream compressedOut = new
ByteBufferOutputStream(512)) {
+ try (OutputStream out =
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+ out.write(raw);
+ out.flush();
+ }
+ compressedOut.buffer().flip();
+ return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));
+ } catch (IOException e) {
+ throw new KafkaException("Failed to compress metrics data", e);
Review Comment:
In it intentional to crash for this case? Or should we send data
uncompressed if anything goes wrong?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]