2018-11-07 10:55:05 UTC - Rodrigo Malacarne: Hello all, is there any difference 
between these two commands?
`./bin/pulsar-admin topics compact 
<persistent://my-tenant/my-namespace/my-topic>`
and
`./bin/pulsar compact-topic --topic 
<persistent://my-tenant/my-namespace/my-topic>`
----
2018-11-07 10:58:53 UTC - Rodrigo Malacarne: Another question... is there any 
way to automatically create a compacted topic? What I observed is, in order to 
create a compact topic, I have not only to set the compaction at the namespace 
level, but also run a manual compaction at least once. Is this procedure 
correct? Or is there any way, as said, to automatically create a compacted 
topic?
----
2018-11-07 11:34:25 UTC - jia zhai: Hi @Rodrigo Malacarne Hope this page 
contains all the information that you need: 
<http://pulsar.apache.org/docs/en/cookbooks-compaction>

&gt; ./bin/pulsar-admin topics compact  &amp; ./bin/pulsar compact-topic
The pulsar-admin tool runs compaction via the Pulsar REST API. To run 
compaction in its own dedicated process, i.e. not through the REST API, you can 
use the pulsar compact-topic command.

Besides the manually trigger use your commands, You could also set the auto 
trigger in namespace level:
```
Tenant administrators can configure a policy for compaction at the namespace 
level. The policy specifies how large the topic backlog can grow before 
compaction is triggered.

For example, to trigger compaction when the backlog reaches 100MB:

$ bin/pulsar-admin namespaces set-compaction-threshold \
  --threshold 100M my-tenant/my-namespace

Copy
Configuring the compaction threshold on a namespace will apply to all topics 
within that namespace.
```
----
2018-11-07 11:38:12 UTC - Rodrigo Malacarne: @jia zhai Thank you very mush... I 
have set the compaction at the namespace level, however the compaction somehow 
didn't work for the topics I've created inside the namespace. That's exactly my 
point ...
----
2018-11-07 11:39:37 UTC - jia zhai: Oh, what is the size in your setting
----
2018-11-07 11:39:47 UTC - Rodrigo Malacarne: 1K
----
2018-11-07 11:40:48 UTC - jia zhai: Would you please help provide more 
information about your whole steps?
----
2018-11-07 11:41:16 UTC - Rodrigo Malacarne: What I did was:
1) `./bin/pulsar-admin namespaces set-compaction-threshold --threshold 1K 
my-tenant/my-namespace`
After that, topics created inside this namespace were NOT automatically 
compacted...
Then I had to manually run
2) `./bin/pulsar-admin topics compact 
<persistent://my-tenant/my-namespace/my-topic>`
----
2018-11-07 11:41:54 UTC - Rodrigo Malacarne: I had to run step 2 for all the 
topics (currently around 70 topics)
----
2018-11-07 11:42:00 UTC - jia zhai: Pulsar consumers and readers need to be 
configured to read from compacted topics.
----
2018-11-07 11:42:08 UTC - jia zhai: Consumer&lt;byte[]&gt; 
compactedTopicConsumer = client.newConsumer()
        .topic(“some-compacted-topic”)
        .readCompacted(true)
        .subscribe();
----
2018-11-07 11:42:22 UTC - Rodrigo Malacarne: All consumers and readers are 
configured for compaction ...
----
2018-11-07 11:43:39 UTC - Rodrigo Malacarne: After step 1, even though 
consumers and readers are correctly configured, the compaction doesn't happen 
somehow.
----
2018-11-07 11:44:05 UTC - Rodrigo Malacarne: Then I have to manually run step 2 
for all topics...
----
2018-11-07 11:44:22 UTC - jia zhai: pulsar-admin namespaces 
get-compaction-threshold tenant/namespace
----
2018-11-07 11:44:33 UTC - jia zhai: Will this command get the right setting 
value?
----
2018-11-07 11:44:39 UTC - jia zhai: 1K
----
2018-11-07 11:44:44 UTC - jia zhai: as in your setting
----
2018-11-07 11:46:21 UTC - jia zhai: 
<https://github.com/apache/pulsar/blob/master/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java>
----
2018-11-07 11:46:59 UTC - jia zhai: Here is the UT for this feature, It is 
running in every PR, Seems recently it is not reporting error.
----
2018-11-07 11:47:35 UTC - jia zhai: line-305 contains a auto compaction test 
case.
----
2018-11-07 11:49:27 UTC - Rodrigo Malacarne: @jia zhai, yes:
----
2018-11-07 11:51:32 UTC - Rodrigo Malacarne: @jia zhai Thank you again ... I'll 
check the Java example.
----
2018-11-07 11:51:58 UTC - jia zhai: It is strange.:joy:
----
2018-11-07 11:52:39 UTC - Rodrigo Malacarne: @jia zhai I know.... :sleepy:
----
2018-11-07 11:55:21 UTC - Rodrigo Malacarne: My question is exactly that ... 
how to automatically create compacted topics. According to the docs (the 
compaction cookbook you linked) the only thing needed to trigger compaction is 
to set the compaction at the namespace level. And that's not what I've 
observed...
----
2018-11-07 12:00:02 UTC - jia zhai: seems the only different is you are using 
1024, while in the tests, it uses 1.
----
2018-11-07 12:07:36 UTC - Rodrigo Malacarne: @jia zhai, You mean instead of 
1024 it should return 1?
----
2018-11-07 12:08:18 UTC - jia zhai: ./bin/pulsar-admin namespaces 
set-compaction-threshold --threshold 1K my-tenant/my-namespace
----
2018-11-07 12:08:28 UTC - jia zhai: you are setting 1k = 1024
----
2018-11-07 12:08:36 UTC - Rodrigo Malacarne: Exactly ...
----
2018-11-07 12:08:40 UTC - Rodrigo Malacarne: I know that ..
----
2018-11-07 12:08:41 UTC - jia zhai: in the test, it sets 1
----
2018-11-07 12:09:06 UTC - Rodrigo Malacarne: And in this case 1 means "1K" ?
----
2018-11-07 12:09:17 UTC - jia zhai: 1 is easier to trigger than 1K
----
2018-11-07 12:09:37 UTC - jia zhai: 1k == 1024
----
2018-11-07 12:09:39 UTC - Rodrigo Malacarne: sure ...
----
2018-11-07 12:10:00 UTC - Rodrigo Malacarne: But my question is: if I set 1, 
then Pulsar knows it's one byte?
----
2018-11-07 12:10:12 UTC - Rodrigo Malacarne: Or should I use "1B" for instance?
----
2018-11-07 12:10:13 UTC - jia zhai: yes
----
2018-11-07 12:10:22 UTC - Rodrigo Malacarne: @jia zhai great!
----
2018-11-07 12:10:27 UTC - jia zhai: 1 == 1byte
----
2018-11-07 12:10:34 UTC - jia zhai: 1k == 1024 bytes
----
2018-11-07 12:10:39 UTC - Rodrigo Malacarne: Amazing!!!!
----
2018-11-07 12:10:43 UTC - Rodrigo Malacarne: Will check now !
----
2018-11-07 12:11:02 UTC - Rodrigo Malacarne: :+1:
----
2018-11-07 12:11:11 UTC - jia zhai: hope it will work
----
2018-11-07 12:11:14 UTC - jia zhai: :joy:
----
2018-11-07 12:11:44 UTC - Rodrigo Malacarne: :grin:
----
2018-11-07 12:42:10 UTC - David Tinker: Is it possible to limit the number of 
un-acknowledged messages Pulsar will deliver to a consumer i.e. stop delivery 
of new messages until some are acknowledged?
----
2018-11-07 12:57:50 UTC - David Tinker: I have tried setting 
maxUnackedMessagesPerConsumer=1000 in broker.conf and 
ConsumerBuilder.receiverQueueSize(1000) and neither seems to have any effect.
----
2018-11-07 13:37:55 UTC - jia zhai: &gt; maxUnackedMessagesPerConsumer
This is the setting
----
2018-11-07 13:41:42 UTC - jia zhai: 
<https://github.com/apache/pulsar/blob/master/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java#L1058>
@David Tinker FYI. Here is a UT contains the steps to verify this. 
testConsumerBlockingWithUnAckedMessages
----
2018-11-08 06:18:23 UTC - David Tinker: It would be nice to be able to control 
message flow from the client but still use MessageListener to be notified when 
new messages arrive. I know I can use Consumer.receive(..) and 
receiverQueueSize instead but then I have to poll thousands of consumers at 
regular intervals. Not as nice as using MessageListener. What about adding 
Consumer.pause(boolean) to suspend sending flow permits to the broker? I think 
this could be easily implemented in ConsumerImpl.increaseAvailablePermits(). I 
could do a PR?
----
2018-11-08 06:20:21 UTC - Sijie Guo: Sounds an interesting idea. You are 
welcome to send a PR. We can discuss the approaches in a PR @David Tinker 
----
2018-11-08 06:20:49 UTC - David Tinker: Will do. Tx.
----
2018-11-08 06:23:13 UTC - Matteo Merli: &gt; It would be nice to be able to 
control message flow from the client but still use MessageListener to be 
notified when new messages arrive

The listener has the same underlying implementation used for receive(). The 
idea is that the flow control is tied to how fast application can call 
receive(). For listener, it’s based on how fast the thread is able to process 
messages.
----
2018-11-08 06:25:12 UTC - Matteo Merli: regarding pausing the listener for a 
specific consumer.. It was actually implemented in C++ client and not in Java 
:slightly_smiling_face: 
<http://pulsar.apache.org/api/cpp/classpulsar_1_1_consumer.html>
----
2018-11-08 06:25:32 UTC - Matteo Merli: `pauseMessageListener()` and 
`resumeMessageListener()`
----
2018-11-08 06:29:25 UTC - David Tinker: I see. So adding the same methods to 
Java Consumer would be the way to go?  I don't want to block in 
MessageListener.receive(..) as that will impact other topics.
----
2018-11-08 06:30:51 UTC - Matteo Merli: Ok, then it would make sense to have 
the same option in Java lib as well
----

Reply via email to