Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2024-01-26 Thread via GitHub


lhotari commented on PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1912038581

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2024-01-26 Thread via GitHub


lhotari merged PR #21798:
URL: https://github.com/apache/pulsar/pull/21798


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2024-01-26 Thread via GitHub


dao-jun commented on PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1912023486

   @lhotari  does all the conversations resolved?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2024-01-25 Thread via GitHub


codecov-commenter commented on PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1911527458

   ## 
[Codecov](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   Attention: `1 lines` in your changes are missing coverage. Please review.
   > Comparison is base 
[(`b14fcb4`)](https://app.codecov.io/gh/apache/pulsar/commit/b14fcb47f20398bdf5b698e158346dce19f0edc6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 73.60% compared to head 
[(`963088d`)](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 73.65%.
   > Report is 6 commits behind head on master.
   
   Additional details and impacted files
   
   
   [![Impacted file tree 
graph](https://app.codecov.io/gh/apache/pulsar/pull/21798/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #21798  +/-   ##
   
   + Coverage 73.60%   73.65%   +0.05% 
   - Complexity3241132456  +45 
   
 Files  1861 1861  
 Lines138674   138724  +50 
 Branches  1518215188   +6 
   
   + Hits 102070   102180 +110 
   + Misses2870228660  -42 
   + Partials   7902 7884  -18 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pulsar/pull/21798/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[inttests](https://app.codecov.io/gh/apache/pulsar/pull/21798/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `24.10% <27.45%> (-0.10%)` | :arrow_down: |
   | 
[systests](https://app.codecov.io/gh/apache/pulsar/pull/21798/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `23.70% <23.52%> (-0.02%)` | :arrow_down: |
   | 
[unittests](https://app.codecov.io/gh/apache/pulsar/pull/21798/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `72.92% <98.03%> (+0.04%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   | 
[Files](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[...org/apache/pulsar/broker/ServiceConfiguration.java](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3B1bHNhci9icm9rZXIvU2VydmljZUNvbmZpZ3VyYXRpb24uamF2YQ==)
 | `99.39% <100.00%> (+<0.01%)` | :arrow_up: |
   | 
[...pulsar/broker/admin/impl/PersistentTopicsBase.java](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9hZG1pbi9pbXBsL1BlcnNpc3RlbnRUb3BpY3NCYXNlLmphdmE=)
 | `65.17% <100.00%> (-0.01%)` | :arrow_down: |
   | 
[...rg/apache/pulsar/broker/service/AbstractTopic.java](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0Fic3RyYWN0VG9waWMuamF2YQ==)
 | `87.98% <100.00%> (+0.10%)` | :arrow_up: |
   | 
[.../common/policies/data/DelayedDeliveryPolicies.java](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC1hZG1pbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3B1bHNhci9jb21tb24vcG9saWNpZXMvZGF0YS9EZWxheWVkRGVsaXZlcnlQb2xpY2llcy5qYXZh)
 | `100

Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2024-01-22 Thread via GitHub


KevinLiLu commented on PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1904575390

   > @KevinLiLu Please resolve the conflicts
   
   @dao-jun Thanks for pointing this out. I have resolved the conflict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2024-01-22 Thread via GitHub


dao-jun commented on PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1903772280

   @KevinLiLu Please resolve the conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2024-01-18 Thread via GitHub


KevinLiLu commented on PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1899818824

   Hi @lhotari, apologies for the churn but could you re-trigger the workflow? 
I missed the maxDelay parameter in a couple Cmd classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2024-01-17 Thread via GitHub


KevinLiLu commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1457012662


##
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java:
##
@@ -26,10 +26,12 @@
 public interface DelayedDeliveryPolicies {
 long getTickTime();
 boolean isActive();
+long getMaxDeliveryDelayInMillis();
 
 interface Builder {
 Builder tickTime(long tickTime);
 Builder active(boolean active);
+Builder maxDeliveryDelayInMillis(long maxDeliveryDelayInMillis);

Review Comment:
   @lhotari I like this suggestion a lot although it might be better to pick it 
up as a separate PIP. I can see many other configurations benefiting from this 
change so it might be worth it to add support for all configurations at once.
   
   Just throwing out some thoughts here:
   1. We could maintain backwards compatibility by adding a new set of 
configurations (one per existing timed config) which excludes the time portion 
from the config key. Example new config: `delayedDeliveryMaxDelay=PT1H15M30S`. 
We would have to handle the case in which the user has provided both formats 
(either throw exception or just pick one).
   2. Some people were concerned about the overhead cost of converting time 
(ex. my original proposal used seconds instead of millis for readability) so we 
may want to have the underlying implementation store the millis value to avoid 
making extra calculations. I see that `Duration` only stores seconds/nanos so 
calling `Duration#toMillis()` would involve an extra calculation to convert 
second/nanos to millis.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2024-01-17 Thread via GitHub


KevinLiLu commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435884732


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
 }
 }
 
+if (topic.isPersistent()) {
+PersistentTopic pTopic = (PersistentTopic) topic;
+if (pTopic.isDelayedDeliveryEnabled()) {
+long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+if (maxDeliveryDelayInMs > 0) {
+headersAndPayload.markReaderIndex();
+MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+headersAndPayload.resetReaderIndex();

Review Comment:
   A small optimization is to move the check to the `PersistentTopic` class 
(right before adding to the ledger) as delayed delivery is only enabled for 
persistent topics so we can skip the `if (topic.isPersistent())` check.
   
   We can add the check right after the maxMessageSize check:
   
https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3656-L3660
   
   ~~Is there another place you had in mind?~~ (see my next comment)



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
 }
 }
 
+if (topic.isPersistent()) {
+PersistentTopic pTopic = (PersistentTopic) topic;
+if (pTopic.isDelayedDeliveryEnabled()) {
+long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+if (maxDeliveryDelayInMs > 0) {
+headersAndPayload.markReaderIndex();
+MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+headersAndPayload.resetReaderIndex();

Review Comment:
   @lhotari Bumping my above message!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2024-01-15 Thread via GitHub


Jason918 commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1452883473


##
pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java:
##
@@ -1368,9 +1368,9 @@ public void topicPolicies() throws Exception {
 
 cmdTopics.run(split("get-delayed-delivery 
persistent://myprop/clust/ns1/ds1 -g"));
 
verify(mockGlobalTopicsPolicies).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
 false);
-cmdTopics.run(split("set-delayed-delivery 
persistent://myprop/clust/ns1/ds1 -t 10s --enable -g"));
+cmdTopics.run(split("set-delayed-delivery 
persistent://myprop/clust/ns1/ds1 -t 10s --enable -md 5s-g"));

Review Comment:
   ```suggestion
   cmdTopics.run(split("set-delayed-delivery 
persistent://myprop/clust/ns1/ds1 -t 10s --enable -md 5s -g"));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2024-01-11 Thread via GitHub


KevinLiLu commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435896433


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
 }
 }
 
+if (topic.isPersistent()) {
+PersistentTopic pTopic = (PersistentTopic) topic;
+if (pTopic.isDelayedDeliveryEnabled()) {
+long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+if (maxDeliveryDelayInMs > 0) {
+headersAndPayload.markReaderIndex();
+MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+headersAndPayload.resetReaderIndex();

Review Comment:
   An alternative which completely avoids impacting the publish flow is to 
perform this check when dispatching the message to the consumers in 
`PersistentDispatcherMultipleConsumers`: 
https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L1061-L1062
   
   If the broker sees a `deliverAtTime` which exceeds the configured max, it 
can override the `deliverAtTime` to be the max possible value based on the 
config (so that it equals `publishTime + maxDeliveryDelayInMs`)
   
   The disadvantage with this approach is the producer will not get any 
indication that their proposed `deliverAtTime` was changed which can cause some 
confusion/churn. But maybe this is acceptable because the 
`delayedDeliveryEnabled` config works the same way (if delayed delivery is 
disabled, message is just delivered immediately and producer never receives an 
error).
   
   This approach does have some major advantages:
   1. Publish performance is not impacted
   2. Consumer using retry/dlq feature will never get stuck (see issue I 
mentioned 
[here](https://github.com/apache/pulsar/blob/master/pip/pip-315.md#consumer-impact))
   
   @lhotari what are your thoughts on this alternative implementation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2024-01-11 Thread via GitHub


lhotari commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1449280741


##
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java:
##
@@ -26,10 +26,12 @@
 public interface DelayedDeliveryPolicies {
 long getTickTime();
 boolean isActive();
+long getMaxDeliveryDelayInMillis();
 
 interface Builder {
 Builder tickTime(long tickTime);
 Builder active(boolean active);
+Builder maxDeliveryDelayInMillis(long maxDeliveryDelayInMillis);

Review Comment:
   I should have made this comment already in the PIP phase, but I'll bring it 
up anyways. :) 
   
   Have you considered using `Duration` as the type here? I know that we have 
multiple ways of dealing with durations and long values are commonly used in 
Pulsar APIs, but perhaps we could improve on this. There's a benefit in using 
Duration in public APIs since it makes it clear what the unit is and there's 
less chances for mistakes. I think that internally it would be fine to store it 
in a long.
   
   btw. It would be nice to have support for parsing durations in configuration 
files so that there wouldn't be a need to calculate things in milliseconds when 
configuring `delayedDeliveryMaxDelayInMillis` in broker.conf, but that would 
first require adding support for configuring Duration in configuration files. 
It would be great if [ISO-8601 Duration format would be 
supported](https://en.wikipedia.org/wiki/ISO_8601#Durations). For example 
"PT1H15M30S" would mean 1 hour 15 minutes and 30 seconds. "PT0.001S" would mean 
1 millisecond. It should be fairly simple to add support for creating 
java.time.Duration instances to the configuration framework by adding converter 
methods to org.apache.pulsar.common.util.FieldParser class in pulsar-common 
module. java.time.Duration#parse supports parsing ISO-8601 duration format.
   
   The switch to use java.time.Duration as the primary type for durations in 
Pulsar would be something that I can bring up to discussion on the dev mailing 
list. This is something that I have planned to do earlier and now I noticed 
that it would be great to finally move towards this direction.
   
   I'm also fine in the current way if there's a need to get this completed 
asap as-is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2023-12-24 Thread via GitHub


KevinLiLu commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435896433


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
 }
 }
 
+if (topic.isPersistent()) {
+PersistentTopic pTopic = (PersistentTopic) topic;
+if (pTopic.isDelayedDeliveryEnabled()) {
+long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+if (maxDeliveryDelayInMs > 0) {
+headersAndPayload.markReaderIndex();
+MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+headersAndPayload.resetReaderIndex();

Review Comment:
   An alternative which completely avoids impacting the publish flow is to 
perform this check when dispatching the message to the consumers in 
`PersistentDispatcherMultipleConsumers`: 
https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L1061-L1062
   
   If the broker sees a `deliverAtTime` which exceeds the configured max, it 
can override the `deliverAtTime` to be the max possible value based on the 
config (so that it equals `publishTime + maxDeliveryDelayInMs`)
   
   The disadvantage with this approach is the producer will not get any 
indication that their proposed `deliverAtTime` was changed which can cause some 
confusion/churn. But maybe this is acceptable because the 
`delayedDeliveryEnabled` config works the same way (if delayed delivery is 
disabled, message is just delivered immediately and producer never receives an 
error).
   
   This approach does have some major advantages:
   1. Publish performance is not impacted
   2. Consumer using retry/dlq feature will never get stuck (see issue I 
mentioned 
[here](https://github.com/apache/pulsar/blob/master/pip/pip-315.md#consumer-impact))
   
   @lhotari what are your thoughts on this proposed change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2023-12-24 Thread via GitHub


KevinLiLu commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435884732


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
 }
 }
 
+if (topic.isPersistent()) {
+PersistentTopic pTopic = (PersistentTopic) topic;
+if (pTopic.isDelayedDeliveryEnabled()) {
+long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+if (maxDeliveryDelayInMs > 0) {
+headersAndPayload.markReaderIndex();
+MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+headersAndPayload.resetReaderIndex();

Review Comment:
   A small optimization is to move the check to the `PersistentTopic` class 
(right before adding to the ledger) as delayed delivery is only enabled for 
persistent topics so we can skip the `if (topic.isPersistent())` check.
   
   We can add the check right after the maxMessageSize check:
   
https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3656-L3660
   
   ~~Is there another place you had in mind?~~ (see my next comment)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2023-12-24 Thread via GitHub


KevinLiLu commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435896433


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
 }
 }
 
+if (topic.isPersistent()) {
+PersistentTopic pTopic = (PersistentTopic) topic;
+if (pTopic.isDelayedDeliveryEnabled()) {
+long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+if (maxDeliveryDelayInMs > 0) {
+headersAndPayload.markReaderIndex();
+MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+headersAndPayload.resetReaderIndex();

Review Comment:
   An alternative which completely avoids impacting the publish flow is to 
perform this check when dispatching the message to the consumers in 
`PersistentDispatcherMultipleConsumers`: 
https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L1048-L1062
   
   If the broker sees a `deliverAtTime` which exceeds the configured max, it 
can override the `deliverAtTime` to be the max possible value based on the 
config (so that it equals `publishTime + maxDeliveryDelayInMs`)
   
   The disadvantage with this approach is the producer will not get any 
indication that their proposed `deliverAtTime` was changed which can cause some 
confusion/churn. But maybe this is acceptable because the 
`delayedDeliveryEnabled` config works the same way (if delayed delivery is 
disabled, message is just delivered immediately and producer never receives an 
error).
   
   This approach does have some major advantages:
   1. Publish performance is not impacted
   2. Consumer using retry/dlq feature will never get stuck (see issue I 
mentioned 
[here](https://github.com/apache/pulsar/blob/master/pip/pip-315.md#consumer-impact))
   
   @lhotari what are your thoughts on this proposed change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2023-12-24 Thread via GitHub


KevinLiLu commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435884732


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
 }
 }
 
+if (topic.isPersistent()) {
+PersistentTopic pTopic = (PersistentTopic) topic;
+if (pTopic.isDelayedDeliveryEnabled()) {
+long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+if (maxDeliveryDelayInMs > 0) {
+headersAndPayload.markReaderIndex();
+MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+headersAndPayload.resetReaderIndex();

Review Comment:
   A small optimization is to move the check to the `PersistentTopic` class 
(right before adding to the ledger) as delayed delivery is only enabled for 
persistent topics so we can skip the `if (topic.isPersistent())` check.
   
   We can add the check right after the maxMessageSize check:
   
https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3656-L3660
   
   Is there another place you had in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2023-12-24 Thread via GitHub


KevinLiLu commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435884732


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
 }
 }
 
+if (topic.isPersistent()) {
+PersistentTopic pTopic = (PersistentTopic) topic;
+if (pTopic.isDelayedDeliveryEnabled()) {
+long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+if (maxDeliveryDelayInMs > 0) {
+headersAndPayload.markReaderIndex();
+MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+headersAndPayload.resetReaderIndex();

Review Comment:
   A small optimization is to move the check to the `PersistentTopic` class 
(right before adding to the ledger) as delayed delivery is only enabled for 
persistent topics so we can skip the `if (topic.isPersistent())` check.
   
   We can add the check right after the maxMessageSize check:
   
https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3656-L3660.
   
   Is there another place you had in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2023-12-24 Thread via GitHub


KevinLiLu commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435873119


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
 }
 }
 
+if (topic.isPersistent()) {
+PersistentTopic pTopic = (PersistentTopic) topic;
+if (pTopic.isDelayedDeliveryEnabled()) {
+long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+if (maxDeliveryDelayInMs > 0) {
+headersAndPayload.markReaderIndex();
+MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+headersAndPayload.resetReaderIndex();
+if (msgMetadata.hasDeliverAtTime()
+&& msgMetadata.getDeliverAtTime() - 
msgMetadata.getPublishTime() > maxDeliveryDelayInMs) {
+cnx.execute(() -> {
+cnx.getCommandSender().sendSendError(producerId, 
sequenceId,
+ServerError.NotAllowedError,
+String.format("Exceeds max allowed 
delivery delay of %s milliseconds",
+maxDeliveryDelayInMs));
+cnx.completedSendOperation(false, 
headersAndPayload.readableBytes());
+});
+return false;
+}

Review Comment:
   The other error checks in the `checkAndStartPublish` method (right above my 
change) are calling `cnx.getCommandSender().sendSendError(...)` and then 
`cnx.completedSendOperation(...)` immediately.
   
   For instance, the encryption error check (when encryption is required but 
the producer does not encrypt the message):
   
https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java#L254-L263



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2023-12-24 Thread via GitHub


KevinLiLu commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435873119


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
 }
 }
 
+if (topic.isPersistent()) {
+PersistentTopic pTopic = (PersistentTopic) topic;
+if (pTopic.isDelayedDeliveryEnabled()) {
+long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+if (maxDeliveryDelayInMs > 0) {
+headersAndPayload.markReaderIndex();
+MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+headersAndPayload.resetReaderIndex();
+if (msgMetadata.hasDeliverAtTime()
+&& msgMetadata.getDeliverAtTime() - 
msgMetadata.getPublishTime() > maxDeliveryDelayInMs) {
+cnx.execute(() -> {
+cnx.getCommandSender().sendSendError(producerId, 
sequenceId,
+ServerError.NotAllowedError,
+String.format("Exceeds max allowed 
delivery delay of %s milliseconds",
+maxDeliveryDelayInMs));
+cnx.completedSendOperation(false, 
headersAndPayload.readableBytes());
+});
+return false;
+}

Review Comment:
   The other error checks in the `checkAndStartPublish` method (right above my 
change) are calling `cnx.getCommandSender().sendSendError(...)` and then 
`cnx.completedSendOperation(...)` immediately.
   
   For instance, the encryption error check (when encryption is required but 
the producer does not encrypt the message):
   
https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java#L248-L264



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2023-12-24 Thread via GitHub


lhotari commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435807296


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
 }
 }
 
+if (topic.isPersistent()) {
+PersistentTopic pTopic = (PersistentTopic) topic;
+if (pTopic.isDelayedDeliveryEnabled()) {
+long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+if (maxDeliveryDelayInMs > 0) {
+headersAndPayload.markReaderIndex();
+MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+headersAndPayload.resetReaderIndex();
+if (msgMetadata.hasDeliverAtTime()
+&& msgMetadata.getDeliverAtTime() - 
msgMetadata.getPublishTime() > maxDeliveryDelayInMs) {
+cnx.execute(() -> {
+cnx.getCommandSender().sendSendError(producerId, 
sequenceId,
+ServerError.NotAllowedError,
+String.format("Exceeds max allowed 
delivery delay of %s milliseconds",
+maxDeliveryDelayInMs));
+cnx.completedSendOperation(false, 
headersAndPayload.readableBytes());
+});
+return false;
+}

Review Comment:
   I'm not sure about it, but there's a chance that completedSendOperation 
should only be called if the message was accepted for sending by returning true 
from this method. 



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
 }
 }
 
+if (topic.isPersistent()) {
+PersistentTopic pTopic = (PersistentTopic) topic;
+if (pTopic.isDelayedDeliveryEnabled()) {
+long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+if (maxDeliveryDelayInMs > 0) {
+headersAndPayload.markReaderIndex();
+MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+headersAndPayload.resetReaderIndex();

Review Comment:
   This has a cost so it would be good to have this check later in the send 
processing so that there wouldn't be an extra parsing step. Did you check if it 
would be possible to put this logic is later phase of sending?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]

2023-12-24 Thread via GitHub


github-actions[bot] commented on PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1868467862

   @KevinLiLu Please add the following content to your PR description and 
select a checkbox:
   ```
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org