Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
lhotari commented on PR #21798: URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1912038581 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
lhotari merged PR #21798: URL: https://github.com/apache/pulsar/pull/21798 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
dao-jun commented on PR #21798: URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1912023486 @lhotari does all the conversations resolved? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
codecov-commenter commented on PR #21798: URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1911527458 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: `1 lines` in your changes are missing coverage. Please review. > Comparison is base [(`b14fcb4`)](https://app.codecov.io/gh/apache/pulsar/commit/b14fcb47f20398bdf5b698e158346dce19f0edc6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 73.60% compared to head [(`963088d`)](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 73.65%. > Report is 6 commits behind head on master. Additional details and impacted files [](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) ```diff @@ Coverage Diff @@ ## master #21798 +/- ## + Coverage 73.60% 73.65% +0.05% - Complexity3241132456 +45 Files 1861 1861 Lines138674 138724 +50 Branches 1518215188 +6 + Hits 102070 102180 +110 + Misses2870228660 -42 + Partials 7902 7884 -18 ``` | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/21798/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/21798/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.10% <27.45%> (-0.10%)` | :arrow_down: | | [systests](https://app.codecov.io/gh/apache/pulsar/pull/21798/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `23.70% <23.52%> (-0.02%)` | :arrow_down: | | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/21798/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `72.92% <98.03%> (+0.04%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. | [Files](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [...org/apache/pulsar/broker/ServiceConfiguration.java](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3B1bHNhci9icm9rZXIvU2VydmljZUNvbmZpZ3VyYXRpb24uamF2YQ==) | `99.39% <100.00%> (+<0.01%)` | :arrow_up: | | [...pulsar/broker/admin/impl/PersistentTopicsBase.java](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9hZG1pbi9pbXBsL1BlcnNpc3RlbnRUb3BpY3NCYXNlLmphdmE=) | `65.17% <100.00%> (-0.01%)` | :arrow_down: | | [...rg/apache/pulsar/broker/service/AbstractTopic.java](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0Fic3RyYWN0VG9waWMuamF2YQ==) | `87.98% <100.00%> (+0.10%)` | :arrow_up: | | [.../common/policies/data/DelayedDeliveryPolicies.java](https://app.codecov.io/gh/apache/pulsar/pull/21798?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWNsaWVudC1hZG1pbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3B1bHNhci9jb21tb24vcG9saWNpZXMvZGF0YS9EZWxheWVkRGVsaXZlcnlQb2xpY2llcy5qYXZh) | `100
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
KevinLiLu commented on PR #21798: URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1904575390 > @KevinLiLu Please resolve the conflicts @dao-jun Thanks for pointing this out. I have resolved the conflict. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
dao-jun commented on PR #21798: URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1903772280 @KevinLiLu Please resolve the conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
KevinLiLu commented on PR #21798: URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1899818824 Hi @lhotari, apologies for the churn but could you re-trigger the workflow? I missed the maxDelay parameter in a couple Cmd classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
KevinLiLu commented on code in PR #21798: URL: https://github.com/apache/pulsar/pull/21798#discussion_r1457012662 ## pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java: ## @@ -26,10 +26,12 @@ public interface DelayedDeliveryPolicies { long getTickTime(); boolean isActive(); +long getMaxDeliveryDelayInMillis(); interface Builder { Builder tickTime(long tickTime); Builder active(boolean active); +Builder maxDeliveryDelayInMillis(long maxDeliveryDelayInMillis); Review Comment: @lhotari I like this suggestion a lot although it might be better to pick it up as a separate PIP. I can see many other configurations benefiting from this change so it might be worth it to add support for all configurations at once. Just throwing out some thoughts here: 1. We could maintain backwards compatibility by adding a new set of configurations (one per existing timed config) which excludes the time portion from the config key. Example new config: `delayedDeliveryMaxDelay=PT1H15M30S`. We would have to handle the case in which the user has provided both formats (either throw exception or just pick one). 2. Some people were concerned about the overhead cost of converting time (ex. my original proposal used seconds instead of millis for readability) so we may want to have the underlying implementation store the millis value to avoid making extra calculations. I see that `Duration` only stores seconds/nanos so calling `Duration#toMillis()` would involve an extra calculation to convert second/nanos to millis. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
KevinLiLu commented on code in PR #21798: URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435884732 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java: ## @@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } } +if (topic.isPersistent()) { +PersistentTopic pTopic = (PersistentTopic) topic; +if (pTopic.isDelayedDeliveryEnabled()) { +long maxDeliveryDelayInMs = pTopic.getDelayedDeliveryMaxDelayInMillis(); +if (maxDeliveryDelayInMs > 0) { +headersAndPayload.markReaderIndex(); +MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); +headersAndPayload.resetReaderIndex(); Review Comment: A small optimization is to move the check to the `PersistentTopic` class (right before adding to the ledger) as delayed delivery is only enabled for persistent topics so we can skip the `if (topic.isPersistent())` check. We can add the check right after the maxMessageSize check: https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3656-L3660 ~~Is there another place you had in mind?~~ (see my next comment) ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java: ## @@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } } +if (topic.isPersistent()) { +PersistentTopic pTopic = (PersistentTopic) topic; +if (pTopic.isDelayedDeliveryEnabled()) { +long maxDeliveryDelayInMs = pTopic.getDelayedDeliveryMaxDelayInMillis(); +if (maxDeliveryDelayInMs > 0) { +headersAndPayload.markReaderIndex(); +MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); +headersAndPayload.resetReaderIndex(); Review Comment: @lhotari Bumping my above message! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
Jason918 commented on code in PR #21798: URL: https://github.com/apache/pulsar/pull/21798#discussion_r1452883473 ## pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java: ## @@ -1368,9 +1368,9 @@ public void topicPolicies() throws Exception { cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1 -g")); verify(mockGlobalTopicsPolicies).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false); -cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable -g")); +cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable -md 5s-g")); Review Comment: ```suggestion cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable -md 5s -g")); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
KevinLiLu commented on code in PR #21798: URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435896433 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java: ## @@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } } +if (topic.isPersistent()) { +PersistentTopic pTopic = (PersistentTopic) topic; +if (pTopic.isDelayedDeliveryEnabled()) { +long maxDeliveryDelayInMs = pTopic.getDelayedDeliveryMaxDelayInMillis(); +if (maxDeliveryDelayInMs > 0) { +headersAndPayload.markReaderIndex(); +MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); +headersAndPayload.resetReaderIndex(); Review Comment: An alternative which completely avoids impacting the publish flow is to perform this check when dispatching the message to the consumers in `PersistentDispatcherMultipleConsumers`: https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L1061-L1062 If the broker sees a `deliverAtTime` which exceeds the configured max, it can override the `deliverAtTime` to be the max possible value based on the config (so that it equals `publishTime + maxDeliveryDelayInMs`) The disadvantage with this approach is the producer will not get any indication that their proposed `deliverAtTime` was changed which can cause some confusion/churn. But maybe this is acceptable because the `delayedDeliveryEnabled` config works the same way (if delayed delivery is disabled, message is just delivered immediately and producer never receives an error). This approach does have some major advantages: 1. Publish performance is not impacted 2. Consumer using retry/dlq feature will never get stuck (see issue I mentioned [here](https://github.com/apache/pulsar/blob/master/pip/pip-315.md#consumer-impact)) @lhotari what are your thoughts on this alternative implementation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
lhotari commented on code in PR #21798: URL: https://github.com/apache/pulsar/pull/21798#discussion_r1449280741 ## pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DelayedDeliveryPolicies.java: ## @@ -26,10 +26,12 @@ public interface DelayedDeliveryPolicies { long getTickTime(); boolean isActive(); +long getMaxDeliveryDelayInMillis(); interface Builder { Builder tickTime(long tickTime); Builder active(boolean active); +Builder maxDeliveryDelayInMillis(long maxDeliveryDelayInMillis); Review Comment: I should have made this comment already in the PIP phase, but I'll bring it up anyways. :) Have you considered using `Duration` as the type here? I know that we have multiple ways of dealing with durations and long values are commonly used in Pulsar APIs, but perhaps we could improve on this. There's a benefit in using Duration in public APIs since it makes it clear what the unit is and there's less chances for mistakes. I think that internally it would be fine to store it in a long. btw. It would be nice to have support for parsing durations in configuration files so that there wouldn't be a need to calculate things in milliseconds when configuring `delayedDeliveryMaxDelayInMillis` in broker.conf, but that would first require adding support for configuring Duration in configuration files. It would be great if [ISO-8601 Duration format would be supported](https://en.wikipedia.org/wiki/ISO_8601#Durations). For example "PT1H15M30S" would mean 1 hour 15 minutes and 30 seconds. "PT0.001S" would mean 1 millisecond. It should be fairly simple to add support for creating java.time.Duration instances to the configuration framework by adding converter methods to org.apache.pulsar.common.util.FieldParser class in pulsar-common module. java.time.Duration#parse supports parsing ISO-8601 duration format. The switch to use java.time.Duration as the primary type for durations in Pulsar would be something that I can bring up to discussion on the dev mailing list. This is something that I have planned to do earlier and now I noticed that it would be great to finally move towards this direction. I'm also fine in the current way if there's a need to get this completed asap as-is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
KevinLiLu commented on code in PR #21798: URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435896433 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java: ## @@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } } +if (topic.isPersistent()) { +PersistentTopic pTopic = (PersistentTopic) topic; +if (pTopic.isDelayedDeliveryEnabled()) { +long maxDeliveryDelayInMs = pTopic.getDelayedDeliveryMaxDelayInMillis(); +if (maxDeliveryDelayInMs > 0) { +headersAndPayload.markReaderIndex(); +MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); +headersAndPayload.resetReaderIndex(); Review Comment: An alternative which completely avoids impacting the publish flow is to perform this check when dispatching the message to the consumers in `PersistentDispatcherMultipleConsumers`: https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L1061-L1062 If the broker sees a `deliverAtTime` which exceeds the configured max, it can override the `deliverAtTime` to be the max possible value based on the config (so that it equals `publishTime + maxDeliveryDelayInMs`) The disadvantage with this approach is the producer will not get any indication that their proposed `deliverAtTime` was changed which can cause some confusion/churn. But maybe this is acceptable because the `delayedDeliveryEnabled` config works the same way (if delayed delivery is disabled, message is just delivered immediately and producer never receives an error). This approach does have some major advantages: 1. Publish performance is not impacted 2. Consumer using retry/dlq feature will never get stuck (see issue I mentioned [here](https://github.com/apache/pulsar/blob/master/pip/pip-315.md#consumer-impact)) @lhotari what are your thoughts on this proposed change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
KevinLiLu commented on code in PR #21798: URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435884732 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java: ## @@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } } +if (topic.isPersistent()) { +PersistentTopic pTopic = (PersistentTopic) topic; +if (pTopic.isDelayedDeliveryEnabled()) { +long maxDeliveryDelayInMs = pTopic.getDelayedDeliveryMaxDelayInMillis(); +if (maxDeliveryDelayInMs > 0) { +headersAndPayload.markReaderIndex(); +MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); +headersAndPayload.resetReaderIndex(); Review Comment: A small optimization is to move the check to the `PersistentTopic` class (right before adding to the ledger) as delayed delivery is only enabled for persistent topics so we can skip the `if (topic.isPersistent())` check. We can add the check right after the maxMessageSize check: https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3656-L3660 ~~Is there another place you had in mind?~~ (see my next comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
KevinLiLu commented on code in PR #21798: URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435896433 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java: ## @@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } } +if (topic.isPersistent()) { +PersistentTopic pTopic = (PersistentTopic) topic; +if (pTopic.isDelayedDeliveryEnabled()) { +long maxDeliveryDelayInMs = pTopic.getDelayedDeliveryMaxDelayInMillis(); +if (maxDeliveryDelayInMs > 0) { +headersAndPayload.markReaderIndex(); +MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); +headersAndPayload.resetReaderIndex(); Review Comment: An alternative which completely avoids impacting the publish flow is to perform this check when dispatching the message to the consumers in `PersistentDispatcherMultipleConsumers`: https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L1048-L1062 If the broker sees a `deliverAtTime` which exceeds the configured max, it can override the `deliverAtTime` to be the max possible value based on the config (so that it equals `publishTime + maxDeliveryDelayInMs`) The disadvantage with this approach is the producer will not get any indication that their proposed `deliverAtTime` was changed which can cause some confusion/churn. But maybe this is acceptable because the `delayedDeliveryEnabled` config works the same way (if delayed delivery is disabled, message is just delivered immediately and producer never receives an error). This approach does have some major advantages: 1. Publish performance is not impacted 2. Consumer using retry/dlq feature will never get stuck (see issue I mentioned [here](https://github.com/apache/pulsar/blob/master/pip/pip-315.md#consumer-impact)) @lhotari what are your thoughts on this proposed change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
KevinLiLu commented on code in PR #21798: URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435884732 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java: ## @@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } } +if (topic.isPersistent()) { +PersistentTopic pTopic = (PersistentTopic) topic; +if (pTopic.isDelayedDeliveryEnabled()) { +long maxDeliveryDelayInMs = pTopic.getDelayedDeliveryMaxDelayInMillis(); +if (maxDeliveryDelayInMs > 0) { +headersAndPayload.markReaderIndex(); +MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); +headersAndPayload.resetReaderIndex(); Review Comment: A small optimization is to move the check to the `PersistentTopic` class (right before adding to the ledger) as delayed delivery is only enabled for persistent topics so we can skip the `if (topic.isPersistent())` check. We can add the check right after the maxMessageSize check: https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3656-L3660 Is there another place you had in mind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
KevinLiLu commented on code in PR #21798: URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435884732 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java: ## @@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } } +if (topic.isPersistent()) { +PersistentTopic pTopic = (PersistentTopic) topic; +if (pTopic.isDelayedDeliveryEnabled()) { +long maxDeliveryDelayInMs = pTopic.getDelayedDeliveryMaxDelayInMillis(); +if (maxDeliveryDelayInMs > 0) { +headersAndPayload.markReaderIndex(); +MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); +headersAndPayload.resetReaderIndex(); Review Comment: A small optimization is to move the check to the `PersistentTopic` class (right before adding to the ledger) as delayed delivery is only enabled for persistent topics so we can skip the `if (topic.isPersistent())` check. We can add the check right after the maxMessageSize check: https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3656-L3660. Is there another place you had in mind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
KevinLiLu commented on code in PR #21798: URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435873119 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java: ## @@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } } +if (topic.isPersistent()) { +PersistentTopic pTopic = (PersistentTopic) topic; +if (pTopic.isDelayedDeliveryEnabled()) { +long maxDeliveryDelayInMs = pTopic.getDelayedDeliveryMaxDelayInMillis(); +if (maxDeliveryDelayInMs > 0) { +headersAndPayload.markReaderIndex(); +MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); +headersAndPayload.resetReaderIndex(); +if (msgMetadata.hasDeliverAtTime() +&& msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs) { +cnx.execute(() -> { +cnx.getCommandSender().sendSendError(producerId, sequenceId, +ServerError.NotAllowedError, +String.format("Exceeds max allowed delivery delay of %s milliseconds", +maxDeliveryDelayInMs)); +cnx.completedSendOperation(false, headersAndPayload.readableBytes()); +}); +return false; +} Review Comment: The other error checks in the `checkAndStartPublish` method (right above my change) are calling `cnx.getCommandSender().sendSendError(...)` and then `cnx.completedSendOperation(...)` immediately. For instance, the encryption error check (when encryption is required but the producer does not encrypt the message): https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java#L254-L263 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
KevinLiLu commented on code in PR #21798: URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435873119 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java: ## @@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } } +if (topic.isPersistent()) { +PersistentTopic pTopic = (PersistentTopic) topic; +if (pTopic.isDelayedDeliveryEnabled()) { +long maxDeliveryDelayInMs = pTopic.getDelayedDeliveryMaxDelayInMillis(); +if (maxDeliveryDelayInMs > 0) { +headersAndPayload.markReaderIndex(); +MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); +headersAndPayload.resetReaderIndex(); +if (msgMetadata.hasDeliverAtTime() +&& msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs) { +cnx.execute(() -> { +cnx.getCommandSender().sendSendError(producerId, sequenceId, +ServerError.NotAllowedError, +String.format("Exceeds max allowed delivery delay of %s milliseconds", +maxDeliveryDelayInMs)); +cnx.completedSendOperation(false, headersAndPayload.readableBytes()); +}); +return false; +} Review Comment: The other error checks in the `checkAndStartPublish` method (right above my change) are calling `cnx.getCommandSender().sendSendError(...)` and then `cnx.completedSendOperation(...)` immediately. For instance, the encryption error check (when encryption is required but the producer does not encrypt the message): https://github.com/apache/pulsar/blob/32f3577a735581096d85aa961d7df45b9ae9b6f9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java#L248-L264 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
lhotari commented on code in PR #21798: URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435807296 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java: ## @@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } } +if (topic.isPersistent()) { +PersistentTopic pTopic = (PersistentTopic) topic; +if (pTopic.isDelayedDeliveryEnabled()) { +long maxDeliveryDelayInMs = pTopic.getDelayedDeliveryMaxDelayInMillis(); +if (maxDeliveryDelayInMs > 0) { +headersAndPayload.markReaderIndex(); +MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); +headersAndPayload.resetReaderIndex(); +if (msgMetadata.hasDeliverAtTime() +&& msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs) { +cnx.execute(() -> { +cnx.getCommandSender().sendSendError(producerId, sequenceId, +ServerError.NotAllowedError, +String.format("Exceeds max allowed delivery delay of %s milliseconds", +maxDeliveryDelayInMs)); +cnx.completedSendOperation(false, headersAndPayload.readableBytes()); +}); +return false; +} Review Comment: I'm not sure about it, but there's a chance that completedSendOperation should only be called if the message was accepted for sending by returning true from this method. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java: ## @@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } } +if (topic.isPersistent()) { +PersistentTopic pTopic = (PersistentTopic) topic; +if (pTopic.isDelayedDeliveryEnabled()) { +long maxDeliveryDelayInMs = pTopic.getDelayedDeliveryMaxDelayInMillis(); +if (maxDeliveryDelayInMs > 0) { +headersAndPayload.markReaderIndex(); +MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); +headersAndPayload.resetReaderIndex(); Review Comment: This has a cost so it would be good to have this check later in the send processing so that there wouldn't be an extra parsing step. Did you check if it would be possible to put this logic is later phase of sending? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] PIP-315: Configurable max delay limit for delayed delivery [pulsar]
github-actions[bot] commented on PR #21798: URL: https://github.com/apache/pulsar/pull/21798#issuecomment-1868467862 @KevinLiLu Please add the following content to your PR description and select a checkbox: ``` - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org