Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1851850338 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848784878 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1851042799 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848316881 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848761990 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848801031 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848778811 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848690290 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848656151 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848644061 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848630742 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848623529 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848616412 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848614466 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848605217 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848319059 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1843861334 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1843792236 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1843788844 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1843787963 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1843754643 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1843747449 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1841010651 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840962720 ## pip/pip-385.md: ## @@ -0,0 +1,348 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [PIP-38X: Add rate limit semantics to pulsar protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol) +- [Background knowledge](#background-knowledge) +* [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) +* [In Scope](#in-scope) +* [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) +* [New binary protocol commands](#new-binary-protocol-commands) +* [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) +* [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) +* [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) +* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) +* [Lower Protocol Client](#lower-protocol-client) +* [Lower Protocol Server](#lower-protocol-server) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The current approach may look perfectly fine when looking at
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840983604 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840988981 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840987318 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840964791 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840962720 ## pip/pip-385.md: ## @@ -0,0 +1,348 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [PIP-38X: Add rate limit semantics to pulsar protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol) +- [Background knowledge](#background-knowledge) +* [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) +* [In Scope](#in-scope) +* [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) +* [New binary protocol commands](#new-binary-protocol-commands) +* [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) +* [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) +* [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) +* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) +* [Lower Protocol Client](#lower-protocol-client) +* [Lower Protocol Server](#lower-protocol-server) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The current approach may look perfectly fine when looking at
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840763064 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840763064 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840147289 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The cu
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
lhotari commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840134837 ## pip/pip-385.md: ## @@ -0,0 +1,398 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [Background knowledge](#background-knowledge) + * [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) + * [In Scope](#in-scope) + * [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) + * [New binary protocol commands](#new-binary-protocol-commands) + * [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) + * [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) ++ [Blocking messages to be sent during throttling](#blocking-messages-to-be-sent-during-throttling) ++ [Client side rate limit exception](#client-side-rate-limit-exception) + * [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) + * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) + * [Lower Protocol Client](#lower-protocol-client) + * [Lower Protocol Server](#lower-protocol-server) +- [Alternatives](#alternatives) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The c
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
nodece commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1837620602 ## pip/pip-385.md: ## @@ -0,0 +1,348 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [PIP-38X: Add rate limit semantics to pulsar protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol) +- [Background knowledge](#background-knowledge) +* [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) +* [In Scope](#in-scope) +* [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) +* [New binary protocol commands](#new-binary-protocol-commands) +* [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) +* [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) +* [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) +* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) +* [Lower Protocol Client](#lower-protocol-client) +* [Lower Protocol Server](#lower-protocol-server) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The current approach may look perfectly fine when looking at
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1837635568 ## pip/pip-385.md: ## @@ -0,0 +1,348 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [PIP-38X: Add rate limit semantics to pulsar protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol) +- [Background knowledge](#background-knowledge) +* [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) +* [In Scope](#in-scope) +* [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) +* [New binary protocol commands](#new-binary-protocol-commands) +* [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) +* [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) +* [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) +* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) +* [Lower Protocol Client](#lower-protocol-client) +* [Lower Protocol Server](#lower-protocol-server) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The current approach may look perfectly fine when looking at
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
nodece commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1837620602 ## pip/pip-385.md: ## @@ -0,0 +1,348 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [PIP-38X: Add rate limit semantics to pulsar protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol) +- [Background knowledge](#background-knowledge) +* [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) +* [In Scope](#in-scope) +* [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) +* [New binary protocol commands](#new-binary-protocol-commands) +* [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) +* [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) +* [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) +* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) +* [Lower Protocol Client](#lower-protocol-client) +* [Lower Protocol Server](#lower-protocol-server) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The current approach may look perfectly fine when looking at
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1833728666 ## pip/pip-385.md: ## @@ -0,0 +1,348 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [PIP-38X: Add rate limit semantics to pulsar protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol) +- [Background knowledge](#background-knowledge) +* [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) +* [In Scope](#in-scope) +* [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) +* [New binary protocol commands](#new-binary-protocol-commands) +* [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) +* [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) +* [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) +* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) +* [Lower Protocol Client](#lower-protocol-client) +* [Lower Protocol Server](#lower-protocol-server) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The current approach may look perfectly fine when looking at
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on PR #23398: URL: https://github.com/apache/pulsar/pull/23398#issuecomment-2456268584 > Sure, I was suggesting to send a failure `ServerError.Throttled` to producer as it is just a notification from server to client. and let client receive this error during publish and do necessary backoff. Ah got it now. So there are a few reasons for this not being a "response" to a send request : * `CommandSendError` command doesn't have the provision to notify the time for which the producer is throttled. Adding that optional field just for one of the `ServerError` enums doesn't seem right. * Due to the distributed nature of a producer, relying on communication as a response to a `Send` request might not work out. Suppose 10 producers connected to a topic, one of them might not even have produced yet but the quota might have breached.. So we need to send communication to all 10 producers to throttle themselves. * Throttling doesn't actually lead to failures - If the client has reasonable timeouts (default being 30s is more than reasonable) then even with throttling, the message produce will pass. Moreover, every producer could be joining in with a different client sentTimeout value.. Thus, it is not possible in the current protocol approach to manage or figure out whethar a throttling on the broker side would actually lead to client side error at all. Thus, this is decoupled from send response. > Umm.. That should not be true because we can increase this version when we want to check client's compatibility and this should be the example of this usecase. I suspect this is done because not every client wanted to implement a feature present in protocol version X, but still wanted to implement something present in version Y where X < Y.. thus this independent approach of "supported feature flag" was introduced. [This](https://apache-pulsar.slack.com/archives/C5ZSVEN4E/p1726845929631179) is the slack discussion thread if you want to join in the conversation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
rdhabalia commented on PR #23398: URL: https://github.com/apache/pulsar/pull/23398#issuecomment-2456199156 > Could you please elaborate a bit more? Not 100% sure what you mean here.. Are you saying that we simply treat all timeouts as throttling issue and return THROTTLE_ERROR_CODE instead of TIMEOUT without the need of server sending any info out to the client (via the new command?) Sure, I was suggesting to send a failure `ServerError.Throttled` to producer as it is just a notification from server to client. and let client receive this error during publish and do necessary backoff. >> Actually, i checked around this on slack. seems like feature flags are the new approach of handling this and protocol version only bumps up in case of major changes in the protocol. Umm.. That should not be true because we can increase this version when we want to check client's compatibility and this should be the example of this usecase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on PR #23398: URL: https://github.com/apache/pulsar/pull/23398#issuecomment-2453555618 > Umm.. I am still not clear. Can't we send THROTTLE_ERROR_CODE and handle it instead of introducing new command? Could you please elaborate a bit more? Not 100% sure what you mean here.. Are you saying that we simply treat all timeouts as throttling issue and return THROTTLE_ERROR_CODE instead of TIMEOUT without the need of server sending any info out to the client (via the new command?) > Correct. In that case, check client-version at broker side and don't trigger this workflow if client doesn't support the version. So, we don't need ACK response back from client. For information: check `ServerCnx.java -> int clientProtocolVersion = connect.getProtocolVersion();` Actually, i checked around this on slack. seems like feature flags are the new approach of handling this and protocol version only bumps up in case of major changes in the protocol. In fact, I am actually using this feature flag approach to not even bother sending this command to clients who have not implemented handling of this new command. The concern still remains about bad actors - intentional or unintentional. Intentional - a client says it supports the handling of this throttle command but still misbehaves due to code bug, or any other reason. Unintentional - TCP is a 2 way stream (or internally, it opens 2 channels).. there can be a choking of outbound messages from broker, thus the throttle command gets held up and clients continue to bombard the broker. Both of the above situations lead to noisy neighbor issue on the broker side and the client side alike. It essentially leads to no rate limiting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
rdhabalia commented on PR #23398: URL: https://github.com/apache/pulsar/pull/23398#issuecomment-2450918009 >> This behavior is unchanged overall. while the client is potentially preventing some of the messages sitting in the pending queue to be even sent out to the TCP pipe Umm.. I am still not clear. Can't we send THROTTLE_ERROR_CODE and handle it instead of introducing new command? >> The only reason this throttle ack receipt is introduced is to handle older clients, Correct. In that case, check client-version at broker side and don't trigger this workflow if client doesn't support the version. So, we don't need ACK response back from client. For information: check `ServerCnx.java -> int clientProtocolVersion = connect.getProtocolVersion();` >> This is done on the server side.. so if the frequency/unit of token bucket refill is different than 1 second, it will be accommodated accordingly. The client doesn't make any assumptions on this frequency/unit, got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on PR #23398: URL: https://github.com/apache/pulsar/pull/23398#issuecomment-2449401411 @rdhabalia Please find replies inline below > 1. what happens when broker sends `Throttle` Command to client? > > > * how does it impact client application? what happens when client-app calls `producer.sendAsync()` -> if client lib stops sending messages then client will eventually gets queue-full-execption? Yes, the client holds those messages in a new queue until the throttle time has passed. During this time, it is possible that queue full exception may also come if the user has configured that flag. This is explained in detail in this section of the PIP - https://github.com/apache/pulsar/pull/23398/files#diff-22a644af9af6cae8684b378ec31740275d87a6d96e7f823a63ded1ba86b2581cR265 Please let me know if the details need to be improved further. > * Or client will eventually get time-out exceptions for all the messages sitting into the queue>? I guess that could be the likely behavior and in that case, does it make sense to send publish-error-code instead `throttle-command` from broker. So, broker can send `ERROR_CODE.THROTTLING` and client can just handle it with same behavior as it has received `throttle-command` so, we might not need a separate command here and it can be achievable with an error-code. This behavior is unchanged overall. while the client is potentially preventing some of the messages sitting in the pending queue to be even sent out to the TCP pipe, the general behavior remains same with respect to timeout that anything in pending queue gets timedout if SendReceipt isn't received from server. The only difference is that now we don't throw a timeout exception if we know the client was being throttled for majority of the `messageSendTimeout` duration explained more in this section https://github.com/apache/pulsar/pull/23398/files#diff-22a644af9af6cae8684b378ec31740275d87a6d96e7f823a63ded1ba86b2581cR280 > 2. I think client doesn't have to send ACK for Throttle command back to broker as it's not useful to broker and it seems it's one side notification only. The only reason this throttle ack receipt is introduced is to handle older clients, or clients which haven't implemented this logic of stopping further production for the duration of the throttle time. This is important because if there is no throttle ack receipt , the server assumes that its a client that doesn't understand this new protocol and thus, the server will switch back to the older method of throttling - i.e. pausing TCP channel read. Let me know if that makes sense. > 3. How does client perform backoff? PIP says 1 second but server could have rate-limiting with different unit as well? and it doesn't make sense to configure on the client side? Are you referring to the throttling time calculation section - https://github.com/apache/pulsar/pull/23398/files#diff-22a644af9af6cae8684b378ec31740275d87a6d96e7f823a63ded1ba86b2581cR234 This is done on the server side.. so if the frequency/unit of token bucket refill is different than 1 second, it will be accommodated accordingly. The client doesn't make any assumptions on this frequency/unit, it simply blocks further produce until the time specified in the command. > are we also considering exponential backoff at client (starting with 10ms) and let client retry and see if producer can publish more messages or broker will again send throttle cmd/error. I thought about this, but fundamentally, since we are using uniformly filling token bucket design for rate limiting, it would be very difficult to figure out what kind of exponential back-off to use for sending back the throttling time back to the client. Moreover, since on server side, its a uniform token bucket, sending exponential back-off based time back to client may result in the client over-throttling than needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
rdhabalia commented on PR #23398: URL: https://github.com/apache/pulsar/pull/23398#issuecomment-2448363480 I have few questions: 1. what happens when broker sends `Throttle` Command to client? - how does it impact client application? what happens when client-app calls `producer.sendAsync()` -> if client lib stops sending messages then client will eventually gets queue-full-execption? - Or client will eventually get time-out exceptions for all the messages sitting into the queue>? I guess that could be the likely behavior and in that case, does it make sense to send publish-error-code instead `throttle-command` from broker. So, broker can send `ERROR_CODE.THROTTLING` and client can just handle it with same behavior as it has received `throttle-command` so, we might not need a separate command here and it can be achievable with an error-code. 2. I think client doesn't have to send ACK for Throttle command back to broker as it's not useful to broker and it seems it's one side notification only. 3. How does client perform backoff? PIP says 1 second but server could have rate-limiting with different unit as well? and it doesn't make sense to configure on the client side? are we also considering exponential backoff at client (starting with 10ms) and let client retry and see if producer can publish more messages or broker will again send throttle cmd/error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1794737770 ## pip/pip-385.md: ## @@ -0,0 +1,348 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [PIP-38X: Add rate limit semantics to pulsar protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol) +- [Background knowledge](#background-knowledge) +* [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) +* [In Scope](#in-scope) +* [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) +* [New binary protocol commands](#new-binary-protocol-commands) +* [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) +* [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) +* [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) +* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) +* [Lower Protocol Client](#lower-protocol-client) +* [Lower Protocol Server](#lower-protocol-server) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The current approach may look perfectly fine when looking at
Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]
grssam commented on code in PR #23398: URL: https://github.com/apache/pulsar/pull/23398#discussion_r1794737158 ## pip/pip-385.md: ## @@ -0,0 +1,348 @@ +# PIP-385: Add rate limit semantics to pulsar protocol and Java client + + + Table of Contents + +- [PIP-38X: Add rate limit semantics to pulsar protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol) +- [Background knowledge](#background-knowledge) +* [Challenges with the current approach](#challenges-with-the-current-approach) +- [Motivation](#motivation) +- [Goals](#goals) +* [In Scope](#in-scope) +* [Out of Scope](#out-of-scope) +- [High Level Design](#high-level-design) +* [New binary protocol commands](#new-binary-protocol-commands) +* [Java client changes](#java-client-changes) +- [Detailed Design](#detailed-design) +* [High-level Implementation Details](#high-level-implementation-details) ++ [Broker Changes](#broker-changes) ++ [Determining the throttling duration for clients](#determining-the-throttling-duration-for-clients) ++ [Java Client Changes](#java-client-changes-1) +* [Public-facing Changes](#public-facing-changes) ++ [Binary Protocol](#binary-protocol) ++ [Java Client](#java-client) ++ [Configuration](#configuration) ++ [Metrics](#metrics) +- [Backward & Forward Compatibility](#backward-forward-compatibility) +* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback) +* [Lower Protocol Client](#lower-protocol-client) +* [Lower Protocol Server](#lower-protocol-server) +- [Links](#links) + + + +# Background knowledge + +Being a multi tenant system, pulsar supports quality of service constructs like topic quotas in bytes per second and +qps. On top of this, the fact that one broker has only certain limited resources, it has to additionally implement some +other controls to limit the resources usage, like how much message buffer it has, etc. + +As such, pulsar induces throttling at multiple levels. Just looking at publish level throttling, here are the various +levers that we can configure in pulsar which enables us to rate limit a producer, topic or an entire connection from a +client: + +* At the core of it, we can set topic level publish rate in bytes and/or messages per second. +* We can create a resource group (combination of one or more namespaces or tenants) and set a publish-rate for that + resource group. +* We can set a broker config to throttle based on pending messages at a connection level. + See [maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750) +* We can set a broker config to throttle based on message buffer size at a thread level. + See [maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49) +* We can set a broker level maximum publish rate per broker in bytes and/or messages. + +Currently, the way pulsar uses these levers and enforces these limits is by pausing reading further messages from an +established connection for a topic. This is transparent to the clients, and they continue to publish further messages +with an increased observed latency. Once the publish-rates are within the limits, broker resumes reading from the +connection. + +Here is a small illustration to demonstrate the situation: + +```mermaid +%%{init: {"mirrorActors": false, "rightAngles": false} }%% +sequenceDiagram +Client->>Server: CreateProducer(reqId, myTopic) +Note right of Server: Check Authorization +Server-->>Client: ProducerSuccess(reqId, producerName) +Activate Client +Activate Server +Client->>Server: Send(1, message1) +Client->>Server: Send(2, message2) +Server-->>Client: SendReceipt(1, msgId1) +Client->>Server: Send(3, message3) +Client->>Server: Send(4, message4) +Note right of Server: Topic breaching quota +Activate Server +note right of Server: TCP channel read paused +Client-xServer: Send(5, message5) +Server-->>Client: SendReceipt(2,msgId2) +Server-->>Client: SendReceipt(3,msgId3) +Client-xServer: Send(6, message6) +Server-->>Client: SendReceipt(4,msgId4) +note right of Server: TCP channel read resumed +deactivate Server +Server-->>Server: read message 5 +Server-->>Server: read message 6 +Client->>Server: Send(7, message7) +Server-->>Client: SendReceipt(5,msgId5) +Server-->>Client: SendReceipt(6,msgId6) +Server-->>Client: SendReceipt(7,msgId7) + +Client->>Server: CloseProducer(producerId, reqId) +Server-->>-Client: Success(reqId) +deactivate Client +``` + +## Challenges with the current approach + +The current approach may look perfectly fine when looking at