Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-21 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1851850338


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-20 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848784878


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-20 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1851042799


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-20 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848316881


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-19 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848761990


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-19 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848801031


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-19 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848778811


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-19 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848690290


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-19 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848656151


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-19 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848644061


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-19 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848630742


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-19 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848623529


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-19 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848616412


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-19 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848614466


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-19 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848605217


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-19 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1848319059


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-15 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1843861334


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-15 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1843792236


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-15 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1843788844


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-15 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1843787963


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-15 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1843754643


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-15 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1843747449


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-13 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1841010651


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-13 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840962720


##
pip/pip-385.md:
##
@@ -0,0 +1,348 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [PIP-38X: Add rate limit semantics to pulsar 
protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol)
+- [Background knowledge](#background-knowledge)
+* [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+* [In Scope](#in-scope)
+* [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+* [New binary protocol commands](#new-binary-protocol-commands)
+* [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+* [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
+* [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+* [Lower Protocol Client](#lower-protocol-client)
+* [Lower Protocol Server](#lower-protocol-server)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The current approach may look perfectly fine when looking at 

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-13 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840983604


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-13 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840988981


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-13 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840987318


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-13 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840964791


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-13 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840962720


##
pip/pip-385.md:
##
@@ -0,0 +1,348 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [PIP-38X: Add rate limit semantics to pulsar 
protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol)
+- [Background knowledge](#background-knowledge)
+* [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+* [In Scope](#in-scope)
+* [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+* [New binary protocol commands](#new-binary-protocol-commands)
+* [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+* [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
+* [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+* [Lower Protocol Client](#lower-protocol-client)
+* [Lower Protocol Server](#lower-protocol-server)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The current approach may look perfectly fine when looking at 

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-13 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840763064


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-13 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840763064


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-13 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840147289


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The cu

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-13 Thread via GitHub


lhotari commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1840134837


##
pip/pip-385.md:
##
@@ -0,0 +1,398 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [Background knowledge](#background-knowledge)
+  * [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+  * [In Scope](#in-scope)
+  * [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+  * [New binary protocol commands](#new-binary-protocol-commands)
+  * [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+  * [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
++ [Blocking messages to be sent during 
throttling](#blocking-messages-to-be-sent-during-throttling)
++ [Client side rate limit exception](#client-side-rate-limit-exception)
+  * [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+  * [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+  * [Lower Protocol Client](#lower-protocol-client)
+  * [Lower Protocol Server](#lower-protocol-server)
+- [Alternatives](#alternatives)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The c

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-12 Thread via GitHub


nodece commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1837620602


##
pip/pip-385.md:
##
@@ -0,0 +1,348 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [PIP-38X: Add rate limit semantics to pulsar 
protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol)
+- [Background knowledge](#background-knowledge)
+* [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+* [In Scope](#in-scope)
+* [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+* [New binary protocol commands](#new-binary-protocol-commands)
+* [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+* [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
+* [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+* [Lower Protocol Client](#lower-protocol-client)
+* [Lower Protocol Server](#lower-protocol-server)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The current approach may look perfectly fine when looking at 

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-12 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1837635568


##
pip/pip-385.md:
##
@@ -0,0 +1,348 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [PIP-38X: Add rate limit semantics to pulsar 
protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol)
+- [Background knowledge](#background-knowledge)
+* [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+* [In Scope](#in-scope)
+* [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+* [New binary protocol commands](#new-binary-protocol-commands)
+* [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+* [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
+* [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+* [Lower Protocol Client](#lower-protocol-client)
+* [Lower Protocol Server](#lower-protocol-server)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The current approach may look perfectly fine when looking at 

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-11 Thread via GitHub


nodece commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1837620602


##
pip/pip-385.md:
##
@@ -0,0 +1,348 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [PIP-38X: Add rate limit semantics to pulsar 
protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol)
+- [Background knowledge](#background-knowledge)
+* [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+* [In Scope](#in-scope)
+* [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+* [New binary protocol commands](#new-binary-protocol-commands)
+* [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+* [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
+* [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+* [Lower Protocol Client](#lower-protocol-client)
+* [Lower Protocol Server](#lower-protocol-server)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The current approach may look perfectly fine when looking at 

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-07 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1833728666


##
pip/pip-385.md:
##
@@ -0,0 +1,348 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [PIP-38X: Add rate limit semantics to pulsar 
protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol)
+- [Background knowledge](#background-knowledge)
+* [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+* [In Scope](#in-scope)
+* [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+* [New binary protocol commands](#new-binary-protocol-commands)
+* [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+* [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
+* [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+* [Lower Protocol Client](#lower-protocol-client)
+* [Lower Protocol Server](#lower-protocol-server)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The current approach may look perfectly fine when looking at 

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-04 Thread via GitHub


grssam commented on PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#issuecomment-2456268584

   
   > Sure, I was suggesting to send a failure `ServerError.Throttled` to 
producer as it is just a notification from server to client. and let client 
receive this error during publish and do necessary backoff.
   
   Ah got it now. So there are a few reasons for this not being a "response" to 
a send request :
   * `CommandSendError` command doesn't have the provision to notify the time 
for which the producer is throttled. Adding that optional field just for one of 
the `ServerError` enums doesn't seem right.
   * Due to the distributed nature of a producer, relying on communication as a 
response to a `Send` request might not work out. Suppose 10 producers connected 
to a topic, one of them might not even have produced yet but the quota might 
have breached.. So we need to send communication to all 10 producers to 
throttle themselves.
   * Throttling doesn't actually lead to failures - If the client has 
reasonable timeouts (default being 30s is more than reasonable) then even with 
throttling, the message produce will pass. Moreover, every producer could be 
joining in with a different client sentTimeout value.. Thus, it is not possible 
in the current protocol approach to manage or figure out whethar a throttling 
on the broker side would actually lead to client side error at all. Thus, this 
is decoupled from send response.

   
   > Umm.. That should not be true because we can increase this version when we 
want to check client's compatibility and this should be the example of this 
usecase.
   
   I suspect this is done because not every client wanted to implement a 
feature present in protocol version X, but still wanted to implement something 
present in version Y where X < Y.. thus this independent approach of "supported 
feature flag" was introduced. 
[This](https://apache-pulsar.slack.com/archives/C5ZSVEN4E/p1726845929631179) is 
the slack discussion thread if you want to join in the conversation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-04 Thread via GitHub


rdhabalia commented on PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#issuecomment-2456199156

   >  Could you please elaborate a bit more? Not 100% sure what you mean here.. 
Are you saying that we simply treat all timeouts as throttling issue and return 
THROTTLE_ERROR_CODE instead of TIMEOUT without the need of server sending any 
info out to the client (via the new command?)
   
   Sure, I was suggesting to send a failure `ServerError.Throttled` to producer 
as it is just a notification from server to client. and let client receive this 
error during publish and do necessary backoff.
   
   >> Actually, i checked around this on slack. seems like feature flags are 
the new approach of handling this and protocol version only bumps up in case of 
major changes in the protocol.
   
   Umm.. That should not be true because we can increase this version when we 
want to check client's compatibility and this should be the example of this 
usecase.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-11-03 Thread via GitHub


grssam commented on PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#issuecomment-2453555618

   
   > Umm.. I am still not clear. Can't we send THROTTLE_ERROR_CODE and handle 
it instead of introducing new command?
   
   Could you please elaborate a bit more? Not 100% sure what you mean here.. 
Are you saying that we simply treat all timeouts as throttling issue and return 
THROTTLE_ERROR_CODE instead of TIMEOUT without the need of server sending any 
info out to the client (via the new command?)

   > Correct. In that case, check client-version at broker side and don't 
trigger this workflow if client doesn't support the version. So, we don't need 
ACK response back from client. For information: check `ServerCnx.java -> int 
clientProtocolVersion = connect.getProtocolVersion();`
   
   Actually, i checked around this on slack. seems like feature flags are the 
new approach of handling this and protocol version only bumps up in case of 
major changes in the protocol. 
   In fact, I am actually using this feature flag approach to not even bother 
sending this command to clients who have not implemented handling of this new 
command.
   
   The concern still remains about bad actors - intentional or unintentional. 
   Intentional - a client says it supports the handling of this throttle 
command but still misbehaves due to code bug, or any other reason.
   Unintentional - TCP is a 2 way stream (or internally, it opens 2 channels).. 
there can be a choking of outbound messages from broker, thus the throttle 
command gets held up and clients continue to bombard the broker.
   Both of the above situations lead to noisy neighbor issue on the broker side 
and the client side alike. It essentially leads to no rate limiting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-10-31 Thread via GitHub


rdhabalia commented on PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#issuecomment-2450918009

   >> This behavior is unchanged overall. while the client is potentially 
preventing some of the messages sitting in the pending queue to be even sent 
out to the TCP pipe
   
   Umm.. I am still not clear. Can't we send THROTTLE_ERROR_CODE and handle it 
instead of introducing new command?
   
   
   >> The only reason this throttle ack receipt is introduced is to handle 
older clients, 
   
   Correct. In that case, check client-version at broker side and don't trigger 
this workflow if client doesn't support the version. So, we don't need ACK 
response back from client. For information: check `ServerCnx.java -> int 
clientProtocolVersion = connect.getProtocolVersion();`
   
   >> This is done on the server side.. so if the frequency/unit of token 
bucket refill is different than 1 second, it will be accommodated accordingly. 
The client doesn't make any assumptions on this frequency/unit, 
   
   got it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-10-31 Thread via GitHub


grssam commented on PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#issuecomment-2449401411

   @rdhabalia Please find replies inline below
   
   
   >  1. what happens when broker sends `Throttle` Command to client?
   > 
   > 
   >   * how does it impact client application? what happens when client-app 
calls `producer.sendAsync()` -> if client lib stops sending messages then 
client will eventually gets queue-full-execption?
   
   Yes, the client holds those messages in a new queue until the throttle time 
has passed. During this time, it is possible that queue full exception may also 
come if the user has configured that flag.
   
   This is explained in detail in this section of the PIP - 
https://github.com/apache/pulsar/pull/23398/files#diff-22a644af9af6cae8684b378ec31740275d87a6d96e7f823a63ded1ba86b2581cR265
   
   Please let me know if the details need to be improved further.
   
   
   >  * Or client will eventually get time-out exceptions for all the messages 
sitting into the queue>? I guess that could be the likely behavior and in that 
case, does it make sense to send publish-error-code instead `throttle-command` 
from broker. So, broker can send `ERROR_CODE.THROTTLING` and client can just 
handle it with same behavior as it has received `throttle-command` so, we might 
not need a separate command here and it can be achievable with an error-code.
   
   This behavior is unchanged overall. while the client is potentially 
preventing some of the messages sitting in the pending queue to be even sent 
out to the TCP pipe, the general behavior remains same with respect to timeout 
that anything in pending queue gets timedout if SendReceipt isn't received from 
server. The only difference is that now we don't throw a timeout exception if 
we know the client was being throttled for majority of the `messageSendTimeout` 
duration explained more in this section 
https://github.com/apache/pulsar/pull/23398/files#diff-22a644af9af6cae8684b378ec31740275d87a6d96e7f823a63ded1ba86b2581cR280
   
   
   
   >  2. I think client doesn't have to send ACK for Throttle command back to 
broker as it's not useful to broker and it seems it's one side notification 
only.
   
   The only reason this throttle ack receipt is introduced is to handle older 
clients, or clients which haven't implemented this logic of stopping further 
production for the duration of the throttle time. This is important because if 
there is no throttle ack receipt , the server assumes that its a client that 
doesn't understand this new protocol and thus, the server will switch back to 
the older method of throttling - i.e. pausing TCP channel read.
   
   Let me know if that makes sense.
   
   
   
   > 3. How does client perform backoff? PIP says 1 second but server could 
have rate-limiting with different unit as well? and it doesn't make sense to 
configure on the client side? 
   
   Are you referring to the throttling time calculation section - 
https://github.com/apache/pulsar/pull/23398/files#diff-22a644af9af6cae8684b378ec31740275d87a6d96e7f823a63ded1ba86b2581cR234
 
   
   This is done on the server side.. so if the frequency/unit of token bucket 
refill is different than 1 second, it will be accommodated accordingly. The 
client doesn't make any assumptions on this frequency/unit, it simply blocks 
further produce until the time specified in the command.
   
   > are we also considering exponential backoff at client (starting with 10ms) 
and let client retry and see if producer can publish more messages or broker 
will again send throttle cmd/error.
   
   I thought about this, but fundamentally, since we are using uniformly 
filling token bucket design for rate limiting, it would be very difficult to 
figure out what kind of exponential back-off to use for sending back the 
throttling time back to the client. Moreover, since on server side, its a 
uniform token bucket, sending exponential back-off based time back to client 
may result in the client over-throttling than needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-10-30 Thread via GitHub


rdhabalia commented on PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#issuecomment-2448363480

   I have few questions:
   
   1. what happens when broker sends `Throttle` Command to client?
   - how does it impact client application? what happens when client-app calls 
`producer.sendAsync()` -> if client lib stops sending messages then client will 
eventually gets queue-full-execption?
   - Or client will eventually get time-out exceptions for all the messages 
sitting into the queue>? I guess that could be the likely behavior and in that 
case, does it make sense to send publish-error-code instead `throttle-command` 
from broker. So, broker can send `ERROR_CODE.THROTTLING` and client can just 
handle it with same behavior as it has received `throttle-command` so, we might 
not need a separate command here and it can be achievable with an error-code.
   
   
   2. I think client doesn't have to send ACK for Throttle command back to 
broker as it's not useful to broker and it seems it's one side notification 
only.
   
   3. How does client perform backoff? PIP says 1 second but server could have 
rate-limiting with different unit as well? and it doesn't make sense to 
configure on the client side? are we also considering exponential backoff at 
client (starting with 10ms) and let client retry and see if producer can 
publish more messages or broker will again send throttle cmd/error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-10-09 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1794737770


##
pip/pip-385.md:
##
@@ -0,0 +1,348 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [PIP-38X: Add rate limit semantics to pulsar 
protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol)
+- [Background knowledge](#background-knowledge)
+* [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+* [In Scope](#in-scope)
+* [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+* [New binary protocol commands](#new-binary-protocol-commands)
+* [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+* [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
+* [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+* [Lower Protocol Client](#lower-protocol-client)
+* [Lower Protocol Server](#lower-protocol-server)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The current approach may look perfectly fine when looking at 

Re: [PR] [improve][pip] PIP-385 Add rate limit semantics to pulsar protocol and Java Client [pulsar]

2024-10-09 Thread via GitHub


grssam commented on code in PR #23398:
URL: https://github.com/apache/pulsar/pull/23398#discussion_r1794737158


##
pip/pip-385.md:
##
@@ -0,0 +1,348 @@
+# PIP-385: Add rate limit semantics to pulsar protocol and Java client
+
+
+  Table of Contents
+
+- [PIP-38X: Add rate limit semantics to pulsar 
protocol](#pip-38x-add-rate-limit-semantics-to-pulsar-protocol)
+- [Background knowledge](#background-knowledge)
+* [Challenges with the current 
approach](#challenges-with-the-current-approach)
+- [Motivation](#motivation)
+- [Goals](#goals)
+* [In Scope](#in-scope)
+* [Out of Scope](#out-of-scope)
+- [High Level Design](#high-level-design)
+* [New binary protocol commands](#new-binary-protocol-commands)
+* [Java client changes](#java-client-changes)
+- [Detailed Design](#detailed-design)
+* [High-level Implementation Details](#high-level-implementation-details)
++ [Broker Changes](#broker-changes)
++ [Determining the throttling duration for 
clients](#determining-the-throttling-duration-for-clients)
++ [Java Client Changes](#java-client-changes-1)
+* [Public-facing Changes](#public-facing-changes)
++ [Binary Protocol](#binary-protocol)
++ [Java Client](#java-client)
++ [Configuration](#configuration)
++ [Metrics](#metrics)
+- [Backward & Forward Compatibility](#backward-forward-compatibility)
+* [Upgrade / Downgrade / Rollback](#upgrade-downgrade-rollback)
+* [Lower Protocol Client](#lower-protocol-client)
+* [Lower Protocol Server](#lower-protocol-server)
+- [Links](#links)
+
+
+
+# Background knowledge
+
+Being a multi tenant system, pulsar supports quality of service constructs 
like topic quotas in bytes per second and
+qps. On top of this, the fact that one broker has only certain limited 
resources, it has to additionally implement some
+other controls to limit the resources usage, like how much message buffer it 
has, etc.
+
+As such, pulsar induces throttling at multiple levels. Just looking at publish 
level throttling, here are the various
+levers that we can configure in pulsar which enables us to rate limit a 
producer, topic or an entire connection from a
+client:
+
+* At the core of it, we can set topic level publish rate in bytes and/or 
messages per second.
+* We can create a resource group (combination of one or more namespaces or 
tenants) and set a publish-rate for that
+  resource group.
+* We can set a broker config to throttle based on pending messages at a 
connection level.
+  See 
[maxPendingPublishRequestsPerConnection](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L750)
+* We can set a broker config to throttle based on message buffer size at a 
thread level.
+  See 
[maxMessagePublishBufferSizeInMB](https://github.com/apache/pulsar/blob/4b3b273c1c57741f9f9da2118eb4ec5dfeee2220/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L1431C17-L1431C49)
+* We can set a broker level maximum publish rate per broker in bytes and/or 
messages.
+
+Currently, the way pulsar uses these levers and enforces these limits is by 
pausing reading further messages from an
+established connection for a topic. This is transparent to the clients, and 
they continue to publish further messages
+with an increased observed latency. Once the publish-rates are within the 
limits, broker resumes reading from the
+connection.
+
+Here is a small illustration to demonstrate the situation:
+
+```mermaid
+%%{init: {"mirrorActors": false, "rightAngles": false} }%%
+sequenceDiagram
+Client->>Server: CreateProducer(reqId, myTopic)
+Note right of Server: Check Authorization
+Server-->>Client: ProducerSuccess(reqId, producerName)
+Activate Client
+Activate Server
+Client->>Server: Send(1, message1)
+Client->>Server: Send(2, message2)
+Server-->>Client: SendReceipt(1, msgId1)
+Client->>Server: Send(3, message3)
+Client->>Server: Send(4, message4)
+Note right of Server: Topic breaching quota
+Activate Server
+note right of Server: TCP channel read paused
+Client-xServer: Send(5, message5)
+Server-->>Client: SendReceipt(2,msgId2)
+Server-->>Client: SendReceipt(3,msgId3)
+Client-xServer: Send(6, message6)
+Server-->>Client: SendReceipt(4,msgId4)
+note right of Server: TCP channel read resumed
+deactivate Server
+Server-->>Server: read message 5
+Server-->>Server: read message 6
+Client->>Server: Send(7, message7)
+Server-->>Client: SendReceipt(5,msgId5)
+Server-->>Client: SendReceipt(6,msgId6)
+Server-->>Client: SendReceipt(7,msgId7)
+
+Client->>Server: CloseProducer(producerId, reqId)
+Server-->>-Client: Success(reqId)
+deactivate Client
+```
+
+## Challenges with the current approach
+
+The current approach may look perfectly fine when looking at