Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2025-01-03 Thread via GitHub


lhotari closed pull request #23525: [improve][client][PIP-389] Add a producer 
config to improve compression performance
URL: https://github.com/apache/pulsar/pull/23525


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2024-11-28 Thread via GitHub


liangyepianzhou commented on code in PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#discussion_r1862155795


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##
@@ -169,9 +169,20 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() {
 }
 
 int uncompressedSize = 
batchedMessageMetadataAndPayload.readableBytes();
-ByteBuf compressedPayload = 
compressor.encode(batchedMessageMetadataAndPayload);
-batchedMessageMetadataAndPayload.release();
-if (compressionType != CompressionType.NONE) {
+ByteBuf compressedPayload;
+boolean isCompressed = false;
+if (!isBrokerTwoPhaseCompactor && producer != null){
+if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) {
+compressedPayload = 
producer.applyCompression(batchedMessageMetadataAndPayload);
+isCompressed = true;
+} else {
+compressedPayload = batchedMessageMetadataAndPayload;
+}
+} else {
+compressedPayload = 
compressor.encode(batchedMessageMetadataAndPayload);
+batchedMessageMetadataAndPayload.release();
+}
+if (compressionType != CompressionType.NONE && isCompressed) {
 messageMetadata.setCompression(compressionType);
 messageMetadata.setUncompressedSize(uncompressedSize);

Review Comment:
   done



##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##
@@ -141,7 +141,7 @@ public boolean add(MessageImpl msg, SendCallback 
callback) {
 return isBatchFull();
 }
 
-protected ByteBuf getCompressedBatchMetadataAndPayload() {
+protected ByteBuf getCompressedBatchMetadataAndPayload(boolean 
isBrokerTwoPhaseCompactor) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2024-11-28 Thread via GitHub


liangyepianzhou commented on code in PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#discussion_r1862156169


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##
@@ -252,7 +263,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
 if (messages.size() == 1) {
 messageMetadata.clear();
 messageMetadata.copyFrom(messages.get(0).getMessageBuilder());
-ByteBuf encryptedPayload = 
producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload());
+ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
+getCompressedBatchMetadataAndPayload(false));

Review Comment:
   done



##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##
@@ -283,7 +295,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
 lowestSequenceId = -1L;
 return op;
 }
-ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload());
+ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
+getCompressedBatchMetadataAndPayload(false));

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2024-11-28 Thread via GitHub


liangyepianzhou commented on code in PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#discussion_r1862155395


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##
@@ -505,22 +506,27 @@ public void sendAsync(Message message, SendCallback 
callback) {
 boolean compressed = false;
 // Batch will be compressed when closed
 // If a message has a delayed delivery time, we'll always send it 
individually
-if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {
-compressedPayload = applyCompression(payload);
-compressed = true;
+if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime( {
+if (payload.readableBytes() < conf.getCompressMinMsgBodySize()) {
 
-// validate msg-size (For batching this will be check at the batch 
completion size)
-int compressedSize = compressedPayload.readableBytes();
-if (compressedSize > getMaxMessageSize() && 
!this.conf.isChunkingEnabled()) {
-compressedPayload.release();
-String compressedStr = conf.getCompressionType() != 
CompressionType.NONE ? "Compressed" : "";
-PulsarClientException.InvalidMessageException 
invalidMessageException =
-new PulsarClientException.InvalidMessageException(
-format("The producer %s of the topic %s sends 
a %s message with %d bytes that exceeds"
-+ " %d bytes",
-producerName, topic, compressedStr, compressedSize, 
getMaxMessageSize()));
-completeCallbackAndReleaseSemaphore(uncompressedSize, 
callback, invalidMessageException);
-return;
+} else {
+compressedPayload = applyCompression(payload);
+compressed = true;
+
+// validate msg-size (For batching this will be check at the 
batch completion size)
+int compressedSize = compressedPayload.readableBytes();
+if (compressedSize > getMaxMessageSize() && 
!this.conf.isChunkingEnabled()) {
+compressedPayload.release();
+String compressedStr = conf.getCompressionType() != 
CompressionType.NONE ? "Compressed" : "";

Review Comment:
   done



##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##
@@ -169,9 +169,20 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() {
 }
 
 int uncompressedSize = 
batchedMessageMetadataAndPayload.readableBytes();
-ByteBuf compressedPayload = 
compressor.encode(batchedMessageMetadataAndPayload);
-batchedMessageMetadataAndPayload.release();
-if (compressionType != CompressionType.NONE) {
+ByteBuf compressedPayload;
+boolean isCompressed = false;
+if (!isBrokerTwoPhaseCompactor && producer != null){
+if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2024-11-28 Thread via GitHub


lhotari commented on code in PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#discussion_r1862065209


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##
@@ -505,22 +506,27 @@ public void sendAsync(Message message, SendCallback 
callback) {
 boolean compressed = false;
 // Batch will be compressed when closed
 // If a message has a delayed delivery time, we'll always send it 
individually
-if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {
-compressedPayload = applyCompression(payload);
-compressed = true;
+if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime( {
+if (payload.readableBytes() < conf.getCompressMinMsgBodySize()) {
 
-// validate msg-size (For batching this will be check at the batch 
completion size)
-int compressedSize = compressedPayload.readableBytes();
-if (compressedSize > getMaxMessageSize() && 
!this.conf.isChunkingEnabled()) {
-compressedPayload.release();
-String compressedStr = conf.getCompressionType() != 
CompressionType.NONE ? "Compressed" : "";
-PulsarClientException.InvalidMessageException 
invalidMessageException =
-new PulsarClientException.InvalidMessageException(
-format("The producer %s of the topic %s sends 
a %s message with %d bytes that exceeds"
-+ " %d bytes",
-producerName, topic, compressedStr, compressedSize, 
getMaxMessageSize()));
-completeCallbackAndReleaseSemaphore(uncompressedSize, 
callback, invalidMessageException);
-return;
+} else {
+compressedPayload = applyCompression(payload);
+compressed = true;
+
+// validate msg-size (For batching this will be check at the 
batch completion size)
+int compressedSize = compressedPayload.readableBytes();
+if (compressedSize > getMaxMessageSize() && 
!this.conf.isChunkingEnabled()) {
+compressedPayload.release();
+String compressedStr = conf.getCompressionType() != 
CompressionType.NONE ? "Compressed" : "";

Review Comment:
   would it be useful to also show the compression type?
   ```suggestion
   String compressedStr = conf.getCompressionType() != 
CompressionType.NONE ? ("compressed (" + conf.getCompressionType() + ")") : 
"uncompressed";
   ```



##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##
@@ -169,9 +169,20 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() {
 }
 
 int uncompressedSize = 
batchedMessageMetadataAndPayload.readableBytes();
-ByteBuf compressedPayload = 
compressor.encode(batchedMessageMetadataAndPayload);
-batchedMessageMetadataAndPayload.release();
-if (compressionType != CompressionType.NONE) {
+ByteBuf compressedPayload;
+boolean isCompressed = false;
+if (!isBrokerTwoPhaseCompactor && producer != null){
+if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) {
+compressedPayload = 
producer.applyCompression(batchedMessageMetadataAndPayload);
+isCompressed = true;
+} else {
+compressedPayload = batchedMessageMetadataAndPayload;
+}
+} else {
+compressedPayload = 
compressor.encode(batchedMessageMetadataAndPayload);
+batchedMessageMetadataAndPayload.release();
+}
+if (compressionType != CompressionType.NONE && isCompressed) {
 messageMetadata.setCompression(compressionType);
 messageMetadata.setUncompressedSize(uncompressedSize);

Review Comment:
   move these lines after the code where compression is applied 
(`producer.applyCompression`).



##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##
@@ -141,7 +141,7 @@ public boolean add(MessageImpl msg, SendCallback 
callback) {
 return isBatchFull();
 }
 
-protected ByteBuf getCompressedBatchMetadataAndPayload() {
+protected ByteBuf getCompressedBatchMetadataAndPayload(boolean 
isBrokerTwoPhaseCompactor) {

Review Comment:
   Leaking implementation details that compaction doesn't compress messages is 
not very great. it would be better to rename `isBrokerTwoPhaseCompactor` to 
`allowCompression` and inverse the value. Add an override that passes `true` as 
the default value for `allowCompression`.



##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##
@@ -252,7 +263,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
 if (messages.size() == 

Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2024-11-28 Thread via GitHub


lhotari commented on code in PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#discussion_r1862056976


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java:
##
@@ -38,11 +38,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.Getter;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.MessageIdAdv;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SchemaSerializationException;

Review Comment:
   This change is against the Pulsar code style. IDE instructions: 
https://pulsar.apache.org/contribute/setup-ide/#configure-code-style



##
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java:
##
@@ -18,24 +18,21 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.*;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.ServerCnx;
-import org.apache.pulsar.client.api.BatcherBuilder;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.SubscriptionType;

Review Comment:
   This change is against the Pulsar code style. IDE instructions: 
https://pulsar.apache.org/contribute/setup-ide/#configure-code-style



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2024-11-28 Thread via GitHub


liangyepianzhou commented on PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2505915718

   @lhotari Do you have time to review the code changes for 
[pip-389](https://lists.apache.org/thread/xv7x3vmycxzsrhbdo7vmssh8lxxzyxd5) 
that have been approved on the mailing list?
   Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2024-11-11 Thread via GitHub


lhotari commented on PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2467599901

   > @lhotari I try to optimize it yesterday. but I found that Pulsar did not 
use `CompositeByteBuf` when sending messages. Single sending uses 
`io.netty.buffer.UnpooledHeapByteBufmemory`, while batch sending uses 
`io.netty.buffer.PooledUnsafeDirectByteBufmemory`. They all have memory 
addresses or
   > array, so the current implementation already uses zero copy. It seems that 
there is no need to optimize?
   > And the example you gave, `CompressionCodecSnappyJNI.java`, is a 
compression class used in testing. It seems that there is no need to compress 
it.
   
   @liangyepianzhou I created #23586 to clarify the possible optimization.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2024-11-11 Thread via GitHub


liangyepianzhou commented on PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2467552702

   > @liangyepianzhou Regarding performance optimizations for compression in 
Pulsar, there's also work that should be done. For example for gzip 
compression/decompression this is very inefficient:
   > 
   > 
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java#L60-L85
   > 
   > .
   > Another detail is that the current implementation isn't using "zero copy" 
approaches that are available.
   > For example in Snappy:
   > 
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecSnappyJNI.java#L34-L64
   > 
   > 
   > In BookKeeper, I added zero-copy for calculating checksums in 
[apache/bookkeeper#4196](https://github.com/apache/bookkeeper/pull/4196). The 
ByteBufVisitor approach could be used to avoid copying source buffers to an 
extra nio buffer.
   > Calling Netty's io.netty.buffer.CompositeByteBuf#nioBuffer will allocate a 
new nio ByteBuffer in the heap and copy the content there. That's not very 
great from performance perspective, especially when we want to reduce 
allocations and garbage. With the ByteBufVisitor approach it's possible to read 
the source direct byte buffers without extra copies.
   > **Have you considered in addressing this performance issue in the Pulsar 
message compression solution?**
   
   @lhotari I try to optimize it yesterday.  but I found that Pulsar did not 
use `CompositeByteBuf` when sending messages. Single sending uses 
`io.netty.buffer.UnpooledHeapByteBufmemory`, while batch sending uses 
`io.netty.buffer.PooledUnsafeDirectByteBufmemory`. They all have memory 
addresses or
   array, so the current implementation already uses zero copy.  It seems that 
there is no need to optimize?
   And the example you gave, `CompressionCodecSnappyJNI.java`, is a compression 
class used in testing. It seems that there is no need to compress it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2024-10-30 Thread via GitHub


liangyepianzhou commented on PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2447421884

   > > > Have you considered in addressing this performance issue in the Pulsar 
message compression solution?
   > > 
   > > 
   > > Sounds good, maybe I can try optimizing it in other PRs
   > 
   > +1, In the Pulsar code base, we have a special module called `microbench` 
for microbenchmarks with JMH, 
https://github.com/apache/pulsar/tree/master/microbench . Testing performance 
with JMH could be useful for such improvements.
   
   Thanks for the reminder.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2024-10-30 Thread via GitHub


lhotari commented on PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2447327865

   > > Have you considered in addressing this performance issue in the Pulsar 
message compression solution?
   > 
   > Sounds good, maybe I can try optimizing it in other PRs
   
   +1, In the Pulsar code base, we have a special module called `microbench` 
for microbenchmarks with JMH, 
https://github.com/apache/pulsar/tree/master/microbench . Testing performance 
with JMH could be useful for such improvements.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2024-10-30 Thread via GitHub


liangyepianzhou commented on PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2447309463

   >Have you considered in addressing this performance issue in the Pulsar 
message compression solution?
   
   Sounds good, maybe I can try optimizing it in other PRs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2024-10-30 Thread via GitHub


lhotari commented on PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#issuecomment-2447010383

   @liangyepianzhou Regarding performance optimizations for compression in 
Pulsar, there's also work that should be done.
   For example for gzip compression/decompression this is very inefficient: 
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java#L60-L85
 . 
   Another detail is that the current implementation isn't using "zero copy" 
approaches that are available.
   For example in Snappy:
   
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecSnappyJNI.java#L34-L64
   In BookKeeper, I added zero-copy for calculating checksums in 
https://github.com/apache/bookkeeper/pull/4196. The ByteBufVisitor approach 
could be used to avoid copying source buffers to an extra nio buffer.
   Calling Netty's io.netty.buffer.CompositeByteBuf#nioBuffer will allocate a 
new nio ByteBuffer in the heap and copy the content there. That's not very 
great from performance perspective, especially when we want to reduce 
allocations and garbage. With the ByteBufVisitor approach it's possible to read 
the source direct byte buffers without extra copies.
   **Have you considered in addressing this performance issue in the Pulsar 
message compression solution?**
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client][PIP-389] Add a producer config to improve compression performance [pulsar]

2024-10-30 Thread via GitHub


lhotari commented on code in PR #23525:
URL: https://github.com/apache/pulsar/pull/23525#discussion_r1822521896


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java:
##
@@ -189,6 +189,8 @@ public class ProducerConfigurationData implements 
Serializable, Cloneable {
 )
 private CompressionType compressionType = CompressionType.NONE;
 
+private int compressMinMsgBodySize = 4 * 1024; // 4kb

Review Comment:
   Thanks for sharing the reasoning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org