Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
lhotari closed pull request #23525: [improve][client][PIP-389] Add a producer config to improve compression performance URL: https://github.com/apache/pulsar/pull/23525 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
liangyepianzhou commented on code in PR #23525: URL: https://github.com/apache/pulsar/pull/23525#discussion_r1862155795 ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java: ## @@ -169,9 +169,20 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() { } int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); -ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); -batchedMessageMetadataAndPayload.release(); -if (compressionType != CompressionType.NONE) { +ByteBuf compressedPayload; +boolean isCompressed = false; +if (!isBrokerTwoPhaseCompactor && producer != null){ +if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) { +compressedPayload = producer.applyCompression(batchedMessageMetadataAndPayload); +isCompressed = true; +} else { +compressedPayload = batchedMessageMetadataAndPayload; +} +} else { +compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); +batchedMessageMetadataAndPayload.release(); +} +if (compressionType != CompressionType.NONE && isCompressed) { messageMetadata.setCompression(compressionType); messageMetadata.setUncompressedSize(uncompressedSize); Review Comment: done ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java: ## @@ -141,7 +141,7 @@ public boolean add(MessageImpl msg, SendCallback callback) { return isBatchFull(); } -protected ByteBuf getCompressedBatchMetadataAndPayload() { +protected ByteBuf getCompressedBatchMetadataAndPayload(boolean isBrokerTwoPhaseCompactor) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
liangyepianzhou commented on code in PR #23525: URL: https://github.com/apache/pulsar/pull/23525#discussion_r1862156169 ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java: ## @@ -252,7 +263,8 @@ public OpSendMsg createOpSendMsg() throws IOException { if (messages.size() == 1) { messageMetadata.clear(); messageMetadata.copyFrom(messages.get(0).getMessageBuilder()); -ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); +ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, +getCompressedBatchMetadataAndPayload(false)); Review Comment: done ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java: ## @@ -283,7 +295,8 @@ public OpSendMsg createOpSendMsg() throws IOException { lowestSequenceId = -1L; return op; } -ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); +ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, +getCompressedBatchMetadataAndPayload(false)); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
liangyepianzhou commented on code in PR #23525: URL: https://github.com/apache/pulsar/pull/23525#discussion_r1862155395 ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java: ## @@ -505,22 +506,27 @@ public void sendAsync(Message message, SendCallback callback) { boolean compressed = false; // Batch will be compressed when closed // If a message has a delayed delivery time, we'll always send it individually -if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) { -compressedPayload = applyCompression(payload); -compressed = true; +if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime( { +if (payload.readableBytes() < conf.getCompressMinMsgBodySize()) { -// validate msg-size (For batching this will be check at the batch completion size) -int compressedSize = compressedPayload.readableBytes(); -if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) { -compressedPayload.release(); -String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : ""; -PulsarClientException.InvalidMessageException invalidMessageException = -new PulsarClientException.InvalidMessageException( -format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds" -+ " %d bytes", -producerName, topic, compressedStr, compressedSize, getMaxMessageSize())); -completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); -return; +} else { +compressedPayload = applyCompression(payload); +compressed = true; + +// validate msg-size (For batching this will be check at the batch completion size) +int compressedSize = compressedPayload.readableBytes(); +if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) { +compressedPayload.release(); +String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : ""; Review Comment: done ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java: ## @@ -169,9 +169,20 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() { } int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); -ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); -batchedMessageMetadataAndPayload.release(); -if (compressionType != CompressionType.NONE) { +ByteBuf compressedPayload; +boolean isCompressed = false; +if (!isBrokerTwoPhaseCompactor && producer != null){ +if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
lhotari commented on code in PR #23525: URL: https://github.com/apache/pulsar/pull/23525#discussion_r1862065209 ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java: ## @@ -505,22 +506,27 @@ public void sendAsync(Message message, SendCallback callback) { boolean compressed = false; // Batch will be compressed when closed // If a message has a delayed delivery time, we'll always send it individually -if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) { -compressedPayload = applyCompression(payload); -compressed = true; +if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime( { +if (payload.readableBytes() < conf.getCompressMinMsgBodySize()) { -// validate msg-size (For batching this will be check at the batch completion size) -int compressedSize = compressedPayload.readableBytes(); -if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) { -compressedPayload.release(); -String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : ""; -PulsarClientException.InvalidMessageException invalidMessageException = -new PulsarClientException.InvalidMessageException( -format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds" -+ " %d bytes", -producerName, topic, compressedStr, compressedSize, getMaxMessageSize())); -completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); -return; +} else { +compressedPayload = applyCompression(payload); +compressed = true; + +// validate msg-size (For batching this will be check at the batch completion size) +int compressedSize = compressedPayload.readableBytes(); +if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) { +compressedPayload.release(); +String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : ""; Review Comment: would it be useful to also show the compression type? ```suggestion String compressedStr = conf.getCompressionType() != CompressionType.NONE ? ("compressed (" + conf.getCompressionType() + ")") : "uncompressed"; ``` ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java: ## @@ -169,9 +169,20 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() { } int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); -ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); -batchedMessageMetadataAndPayload.release(); -if (compressionType != CompressionType.NONE) { +ByteBuf compressedPayload; +boolean isCompressed = false; +if (!isBrokerTwoPhaseCompactor && producer != null){ +if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) { +compressedPayload = producer.applyCompression(batchedMessageMetadataAndPayload); +isCompressed = true; +} else { +compressedPayload = batchedMessageMetadataAndPayload; +} +} else { +compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); +batchedMessageMetadataAndPayload.release(); +} +if (compressionType != CompressionType.NONE && isCompressed) { messageMetadata.setCompression(compressionType); messageMetadata.setUncompressedSize(uncompressedSize); Review Comment: move these lines after the code where compression is applied (`producer.applyCompression`). ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java: ## @@ -141,7 +141,7 @@ public boolean add(MessageImpl msg, SendCallback callback) { return isBatchFull(); } -protected ByteBuf getCompressedBatchMetadataAndPayload() { +protected ByteBuf getCompressedBatchMetadataAndPayload(boolean isBrokerTwoPhaseCompactor) { Review Comment: Leaking implementation details that compaction doesn't compress messages is not very great. it would be better to rename `isBrokerTwoPhaseCompactor` to `allowCompression` and inverse the value. Add an override that passes `true` as the default value for `allowCompression`. ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java: ## @@ -252,7 +263,8 @@ public OpSendMsg createOpSendMsg() throws IOException { if (messages.size() ==
Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
lhotari commented on code in PR #23525: URL: https://github.com/apache/pulsar/pull/23525#discussion_r1862056976 ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java: ## @@ -38,11 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.Getter; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.MessageIdAdv; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SchemaSerializationException; Review Comment: This change is against the Pulsar code style. IDE instructions: https://pulsar.apache.org/contribute/setup-ide/#configure-code-style ## pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java: ## @@ -18,24 +18,21 @@ */ package org.apache.pulsar.client.impl; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.ServerCnx; -import org.apache.pulsar.client.api.BatcherBuilder; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConsumerBase; -import org.apache.pulsar.client.api.SubscriptionType; Review Comment: This change is against the Pulsar code style. IDE instructions: https://pulsar.apache.org/contribute/setup-ide/#configure-code-style -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
liangyepianzhou commented on PR #23525: URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2505915718 @lhotari Do you have time to review the code changes for [pip-389](https://lists.apache.org/thread/xv7x3vmycxzsrhbdo7vmssh8lxxzyxd5) that have been approved on the mailing list? Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
lhotari commented on PR #23525: URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2467599901 > @lhotari I try to optimize it yesterday. but I found that Pulsar did not use `CompositeByteBuf` when sending messages. Single sending uses `io.netty.buffer.UnpooledHeapByteBufmemory`, while batch sending uses `io.netty.buffer.PooledUnsafeDirectByteBufmemory`. They all have memory addresses or > array, so the current implementation already uses zero copy. It seems that there is no need to optimize? > And the example you gave, `CompressionCodecSnappyJNI.java`, is a compression class used in testing. It seems that there is no need to compress it. @liangyepianzhou I created #23586 to clarify the possible optimization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
liangyepianzhou commented on PR #23525: URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2467552702 > @liangyepianzhou Regarding performance optimizations for compression in Pulsar, there's also work that should be done. For example for gzip compression/decompression this is very inefficient: > > https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java#L60-L85 > > . > Another detail is that the current implementation isn't using "zero copy" approaches that are available. > For example in Snappy: > https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecSnappyJNI.java#L34-L64 > > > In BookKeeper, I added zero-copy for calculating checksums in [apache/bookkeeper#4196](https://github.com/apache/bookkeeper/pull/4196). The ByteBufVisitor approach could be used to avoid copying source buffers to an extra nio buffer. > Calling Netty's io.netty.buffer.CompositeByteBuf#nioBuffer will allocate a new nio ByteBuffer in the heap and copy the content there. That's not very great from performance perspective, especially when we want to reduce allocations and garbage. With the ByteBufVisitor approach it's possible to read the source direct byte buffers without extra copies. > **Have you considered in addressing this performance issue in the Pulsar message compression solution?** @lhotari I try to optimize it yesterday. but I found that Pulsar did not use `CompositeByteBuf` when sending messages. Single sending uses `io.netty.buffer.UnpooledHeapByteBufmemory`, while batch sending uses `io.netty.buffer.PooledUnsafeDirectByteBufmemory`. They all have memory addresses or array, so the current implementation already uses zero copy. It seems that there is no need to optimize? And the example you gave, `CompressionCodecSnappyJNI.java`, is a compression class used in testing. It seems that there is no need to compress it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
liangyepianzhou commented on PR #23525: URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2447421884 > > > Have you considered in addressing this performance issue in the Pulsar message compression solution? > > > > > > Sounds good, maybe I can try optimizing it in other PRs > > +1, In the Pulsar code base, we have a special module called `microbench` for microbenchmarks with JMH, https://github.com/apache/pulsar/tree/master/microbench . Testing performance with JMH could be useful for such improvements. Thanks for the reminder. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
lhotari commented on PR #23525: URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2447327865 > > Have you considered in addressing this performance issue in the Pulsar message compression solution? > > Sounds good, maybe I can try optimizing it in other PRs +1, In the Pulsar code base, we have a special module called `microbench` for microbenchmarks with JMH, https://github.com/apache/pulsar/tree/master/microbench . Testing performance with JMH could be useful for such improvements. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
liangyepianzhou commented on PR #23525: URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2447309463 >Have you considered in addressing this performance issue in the Pulsar message compression solution? Sounds good, maybe I can try optimizing it in other PRs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
lhotari commented on PR #23525: URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2447010383 @liangyepianzhou Regarding performance optimizations for compression in Pulsar, there's also work that should be done. For example for gzip compression/decompression this is very inefficient: https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java#L60-L85 . Another detail is that the current implementation isn't using "zero copy" approaches that are available. For example in Snappy: https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecSnappyJNI.java#L34-L64 In BookKeeper, I added zero-copy for calculating checksums in https://github.com/apache/bookkeeper/pull/4196. The ByteBufVisitor approach could be used to avoid copying source buffers to an extra nio buffer. Calling Netty's io.netty.buffer.CompositeByteBuf#nioBuffer will allocate a new nio ByteBuffer in the heap and copy the content there. That's not very great from performance perspective, especially when we want to reduce allocations and garbage. With the ByteBufVisitor approach it's possible to read the source direct byte buffers without extra copies. **Have you considered in addressing this performance issue in the Pulsar message compression solution?** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]
lhotari commented on code in PR #23525: URL: https://github.com/apache/pulsar/pull/23525#discussion_r1822521896 ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java: ## @@ -189,6 +189,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable { ) private CompressionType compressionType = CompressionType.NONE; +private int compressMinMsgBodySize = 4 * 1024; // 4kb Review Comment: Thanks for sharing the reasoning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org