[GitHub] [pulsar-dotpulsar] wangshuyu commented on issue #46: Support - negativeAckRedeliveryDelay and AckTimeout in ConsumerOptions
wangshuyu commented on issue #46: URL: https://github.com/apache/pulsar-dotpulsar/issues/46#issuecomment-1722117419 > Hi @dionjansen It's not something I will be using myself, but I'm open for a discussion about how to implement it and of course to review and merge the PR, if you are up for creating one :-) In the main branch MSTER, there is still no NegativeAcknowledge function, and not confirming messages is a very core function. How can I use this function Do you have any better usage suggestions regarding the ability to not confirm messages -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-dotpulsar] wangshuyu commented on issue #46: Support - negativeAckRedeliveryDelay and AckTimeout in ConsumerOptions
wangshuyu commented on issue #46: URL: https://github.com/apache/pulsar-dotpulsar/issues/46#issuecomment-1722114459 In the main branch MSTER, there is still no NegativeAcknowledge function, and not confirming messages is a very core function. How can I use this function -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on issue #20298: Flaky-test: MultiTopicsReaderTest.testMultiTopic
github-actions[bot] commented on issue #20298: URL: https://github.com/apache/pulsar/issues/20298#issuecomment-1722100600 The issue had no activity for 30 days, mark with Stale label. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on pull request #20970: [improve] [broker] Follower do not need to retrieve load data in zk.
github-actions[bot] commented on PR #20970: URL: https://github.com/apache/pulsar/pull/20970#issuecomment-172211 The pr had no activity for 30 days, mark with Stale label. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-helm-chart] frankjkelly commented on pull request #387: Service accounts creation can be decoupled from PodSecurityPolicy
frankjkelly commented on PR #387: URL: https://github.com/apache/pulsar-helm-chart/pull/387#issuecomment-1721737181 @tisonkun can I ask if you can PTAL? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar-helm-chart] branch master updated: Allow Proxy and Broker HPA to specify scaling policies on scaleUp or scaleDown. (#391)
This is an automated email from the ASF dual-hosted git repository. mmarshall pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-helm-chart.git The following commit(s) were added to refs/heads/master by this push: new 8cb3c18 Allow Proxy and Broker HPA to specify scaling policies on scaleUp or scaleDown. (#391) 8cb3c18 is described below commit 8cb3c18377e547e5de4ca19e63138ca1387aef59 Author: Frank Kelly <62910985+frankjke...@users.noreply.github.com> AuthorDate: Fri Sep 15 15:12:12 2023 -0400 Allow Proxy and Broker HPA to specify scaling policies on scaleUp or scaleDown. (#391) --- charts/pulsar/templates/broker-hpa.yaml | 4 charts/pulsar/templates/proxy-hpa.yaml | 4 charts/pulsar/values.yaml | 2 ++ 3 files changed, 10 insertions(+) diff --git a/charts/pulsar/templates/broker-hpa.yaml b/charts/pulsar/templates/broker-hpa.yaml index f6e49e9..f52850a 100644 --- a/charts/pulsar/templates/broker-hpa.yaml +++ b/charts/pulsar/templates/broker-hpa.yaml @@ -33,6 +33,10 @@ spec: {{- toYaml . | nindent 4 }} {{- end }} minReplicas: {{ .Values.broker.autoscaling.minReplicas }} + {{- with .Values.broker.autoscaling.behavior }} + behavior: +{{- toYaml . | nindent 4 }} + {{- end }} scaleTargetRef: apiVersion: apps/v1 kind: StatefulSet diff --git a/charts/pulsar/templates/proxy-hpa.yaml b/charts/pulsar/templates/proxy-hpa.yaml index 7565de9..587382f 100644 --- a/charts/pulsar/templates/proxy-hpa.yaml +++ b/charts/pulsar/templates/proxy-hpa.yaml @@ -33,6 +33,10 @@ spec: {{- toYaml . | nindent 4 }} {{- end }} minReplicas: {{ .Values.proxy.autoscaling.minReplicas }} + {{- with .Values.proxy.autoscaling.behavior }} + behavior: +{{- toYaml . | nindent 4 }} + {{- end }} scaleTargetRef: apiVersion: apps/v1 kind: StatefulSet diff --git a/charts/pulsar/values.yaml b/charts/pulsar/values.yaml index 34c2898..cbdc1bd 100644 --- a/charts/pulsar/values.yaml +++ b/charts/pulsar/values.yaml @@ -684,6 +684,7 @@ broker: minReplicas: 1 maxReplicas: 3 metrics: ~ +behavior: ~ # This is how prometheus discovers this component podMonitor: enabled: true @@ -818,6 +819,7 @@ proxy: minReplicas: 1 maxReplicas: 3 metrics: ~ +behavior: ~ # This is how prometheus discovers this component podMonitor: enabled: true
[GitHub] [pulsar-helm-chart] michaeljmarshall merged pull request #391: Allow Proxy and Broker HPA to specify scaling policies on scaleUp or …
michaeljmarshall merged PR #391: URL: https://github.com/apache/pulsar-helm-chart/pull/391 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-helm-chart] frankjkelly commented on pull request #391: Allow Proxy and Broker HPA to specify scaling policies on scaleUp or …
frankjkelly commented on PR #391: URL: https://github.com/apache/pulsar-helm-chart/pull/391#issuecomment-1721726183 @michaeljmarshall PTAL as it appears that all CI checks have passed. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #21183: [fix] [broker] Make the new exclusive consumer instead the inactive one faster
poorbarcode commented on code in PR #21183: URL: https://github.com/apache/pulsar/pull/21183#discussion_r1327569100 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java: ## @@ -166,6 +166,10 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) { +Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this); +if (actConsumer != null) { +actConsumer.cnx().checkConnectionLiveness(); +} Review Comment: @codelipenghui > But the new consumer will not retry to connect to the topic, right? Do we need to wait for the connection liveness check to be done? Sure, I did this improve. (Highlight) I have a concern: **Background**: the PR https://github.com/apache/pulsar/pull/20026 changed the method `dispatcher.addConsumer` to an asynchronous method, it broke the lock of `synchronized(dispathcer.this)`, this change only affected the releases larger than `3.0.0`. **Concern**: The improvement "wait for the connection liveness check done" relies on the asynchronous method `dispatcher.addConsumer`. I am thinking about whether to accept the patch https://github.com/apache/pulsar/pull/20026 and fix the broken lock(this would make the logic complex), or revert this patch to make the logic simple. I'd like to know your advice on the concern. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #21183: [fix] [broker] Make the new exclusive consumer instead the inactive one faster
poorbarcode commented on code in PR #21183: URL: https://github.com/apache/pulsar/pull/21183#discussion_r1327569100 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java: ## @@ -166,6 +166,10 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) { +Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this); +if (actConsumer != null) { +actConsumer.cnx().checkConnectionLiveness(); +} Review Comment: @codelipenghui > But the new consumer will not retry to connect to the topic, right? Do we need to wait for the connection liveness check to be done? Sure, I did this improve. (Highlight) I have a concern: **Background**: the PR https://github.com/apache/pulsar/pull/20026 changed the method `dispatcher.addConsumer` to an asynchronous method, it broke the lock of `synchronized(dispathcer.this)`, this change only affected the releases larger than `3.0.0`. **Concern**: The improvement "wait for the connection liveness check done" relies on the asynchronous method `dispatcher.addConsumer`. I am thinking about whether to accept the patch https://github.com/apache/pulsar/pull/20026 and fix the broken lock, or revert this patch to make the logic simple. I'd like to know your advice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #21183: [fix] [broker] Make the new exclusive consumer instead the inactive one faster
poorbarcode commented on code in PR #21183: URL: https://github.com/apache/pulsar/pull/21183#discussion_r1327569100 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java: ## @@ -166,6 +166,10 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) { +Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this); +if (actConsumer != null) { +actConsumer.cnx().checkConnectionLiveness(); +} Review Comment: @codelipenghui > But the new consumer will not retry to connect to the topic, right? Do we need to wait for the connection liveness check to be done? Sure, I did this improve. (Highlight) I have a concern: **Background**: the PR https://github.com/apache/pulsar/pull/20026 changed the method `dispatcher.addConsumer` to an asynchronous method, it broke the lock of `synchronized(dispathcer.this)`, this change only affected the releases larger than `3.0.0`. **Concern**: The improvement "wait for the connection liveness check done" relies on the asynchronous method `dispatcher.addConsumer`. I am thinking about whether to accept the patch https://github.com/apache/pulsar/pull/20026 and fix the broken lock, or revert this patch to make the logic simple. I'd like to know your advice on the concern. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #21183: [fix] [broker] Make the new exclusive consumer instead the inactive one faster
poorbarcode commented on code in PR #21183: URL: https://github.com/apache/pulsar/pull/21183#discussion_r1327569100 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java: ## @@ -166,6 +166,10 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) { +Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this); +if (actConsumer != null) { +actConsumer.cnx().checkConnectionLiveness(); +} Review Comment: @codelipenghui > But the new consumer will not retry to connect to the topic, right? Do we need to wait for the connection liveness check to be done? Sure, I did this improve. (Highlight) I have a concern: **Background**: the PR https://github.com/apache/pulsar/pull/20026 changed the method `dispatcher.addConsumer` to an asynchronous method, it broke the lock of `synchronized(dispathcer.this)`, this change only affected the releases larger than `3.0.0`. **Concern**: The improvement "wait for the connection liveness check done" relies on the asynchronous method `dispatcher.addConsumer`. I am thinking about whether to accept the patch https://github.com/apache/pulsar/pull/20026 and fix the broken lock, or revert this patch to make the logic simple. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #21183: [fix] [broker] Make the new exclusive consumer instead the inactive one faster
poorbarcode commented on code in PR #21183: URL: https://github.com/apache/pulsar/pull/21183#discussion_r1327569100 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java: ## @@ -166,6 +166,10 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) { +Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this); +if (actConsumer != null) { +actConsumer.cnx().checkConnectionLiveness(); +} Review Comment: @codelipenghui > But the new consumer will not retry to connect to the topic, right? Do we need to wait for the connection liveness check to be done? Sure, I did this improve. I have a concern: **Background**: the PR https://github.com/apache/pulsar/pull/20026 changed the method `dispatcher.addConsumer` to an asynchronous method, it broke the lock of `synchronized(dispathcer.this)`, this change only affected the releases larger than `3.0.0`. **Concern**: The improvement "wait for the connection liveness check done" relies on the asynchronous method `dispatcher.addConsumer`. I am thinking about whether to accept the patch https://github.com/apache/pulsar/pull/20026 and fix the broken lock, or revert this patch to make the logic simple. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-helm-chart] michaeljmarshall commented on pull request #391: Allow Proxy and Broker HPA to specify scaling policies on scaleUp or …
michaeljmarshall commented on PR #391: URL: https://github.com/apache/pulsar-helm-chart/pull/391#issuecomment-1721583129 I triggered the failed tests to run again. Please tag me when CI passes, and I'll merge this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar-client-go] branch master updated: connectionTimeout respects net.Dialer default timeout (#1095)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 7cf643be connectionTimeout respects net.Dialer default timeout (#1095) 7cf643be is described below commit 7cf643be20b0eed9b37e168d04884e89f534efdd Author: ming AuthorDate: Fri Sep 15 11:32:28 2023 -0400 connectionTimeout respects net.Dialer default timeout (#1095) --- pulsar/client_impl.go | 7 +++ pulsar/internal/connection.go | 8 +++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 801eab3f..7daf6f62 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -31,7 +31,6 @@ import ( ) const ( - defaultConnectionTimeout = 10 * time.Second defaultOperationTimeout= 30 * time.Second defaultKeepAliveInterval = 30 * time.Second defaultMemoryLimitBytes= 64 * 1024 * 1024 @@ -117,10 +116,10 @@ func newClient(options ClientOptions) (Client, error) { return nil, err } + // the default timeout respects Go's default timeout which is no timeout + // Missing user specified timeout renders 0 values that matches + // net.Dailer's default if time.Duration value is not initialized connectionTimeout := options.ConnectionTimeout - if connectionTimeout.Nanoseconds() == 0 { - connectionTimeout = defaultConnectionTimeout - } operationTimeout := options.OperationTimeout if operationTimeout.Nanoseconds() == 0 { diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index e2ae7ac8..840ecc4f 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -256,7 +256,11 @@ func (c *connection) connect() bool { if c.tlsOptions == nil { // Clear text connection - cnx, err = net.DialTimeout("tcp", c.physicalAddr.Host, c.connectionTimeout) + if c.connectionTimeout.Nanoseconds() > 0 { + cnx, err = net.DialTimeout("tcp", c.physicalAddr.Host, c.connectionTimeout) + } else { + cnx, err = net.Dial("tcp", c.physicalAddr.Host) + } } else { // TLS connection tlsConfig, err = c.getTLSConfig() @@ -265,6 +269,8 @@ func (c *connection) connect() bool { return false } + // time.Duration is initialized to 0 by default, net.Dialer's default timeout is no timeout + // therefore if c.connectionTimeout is 0, it means no timeout d := {Timeout: c.connectionTimeout} cnx, err = tls.DialWithDialer(d, "tcp", c.physicalAddr.Host, tlsConfig) }
[GitHub] [pulsar-client-go] eolivelli merged pull request #1095: [issue 1094] connectionTimeout respects net.Dialer default timeout
eolivelli merged PR #1095: URL: https://github.com/apache/pulsar-client-go/pull/1095 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] eolivelli closed issue #1094: The Dialer in the connection.go should respect OS TCP default
eolivelli closed issue #1094: The Dialer in the connection.go should respect OS TCP default URL: https://github.com/apache/pulsar-client-go/issues/1094 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] zzzming opened a new pull request, #1095: [issue 1094] connectionTimeout respects net.Dialer default timeout
zzzming opened a new pull request, #1095: URL: https://github.com/apache/pulsar-client-go/pull/1095 Fixes #1094 ### Motivation connection.go's net.Dialer gets timeout with the default setting occasionally. The default has been increased by PR #563 but I think increasing the pulsar-client-go library default is not the answer, instead we should respect net.Dialer's default. Although the ConnectionTimeout can be a user specified value at the NewClient() creation, the default is 10 seconds that is hard coded in client_impl.go (https://github.com/apache/pulsar-client-go/blob/master/pulsar/client_impl.go#L34) In fact, the previous value was 5 seconds. It was increased to 10 seconds by this PR https://github.com/apache/pulsar-client-go/pull/563 I believe we should not tweak Go's default, instead to respect the OS default. Here is the Go's net.Dialer comments. It states the TCP timeouts are often around 3 minutes. Ubuntu version I checked is at 2 minutes. ``` type Dialer struct { // Timeout is the maximum amount of time a dial will wait for // a connect to complete. If Deadline is also set, it may fail // earlier. // // The default is no timeout. // // When using TCP and dialing a host name with multiple IP // addresses, the timeout may be divided between them. // // With or without a timeout, the operating system may impose // its own earlier timeout. For instance, TCP timeouts are // often around 3 minutes. Timeout time.Duration ``` In the NON TLS dial, the same timeout is used. Go's net.DialTimeout states the timeout also includes name resolution, if it resolves to multiple IPs, the timeout is shared between each consecutive dial. This could result more time spent on dialing. ``` // DialTimeout acts like Dial but takes a timeout. // // The timeout includes name resolution, if required. // When using TCP, and the host in the address parameter resolves to // multiple IP addresses, the timeout is spread over each consecutive // dial, such that each is given an appropriate fraction of the time // to connect. // // See func Dial for a description of the network and address // parameters. func DialTimeout(network, address string, timeout time.Duration) (Conn, error) { ``` Therefore, I think the default of the client library should respect the OS setting. It means do not set the timeout if an application does not set it. ### Modifications Pass 0 value of time.Duration for ConnectionTimeout as the default for net.Dial ### Verifying this change - [ ] Make sure that the change passes the CI checks. This change is already covered by existing tests, such as any connection created to a broker. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (no) - The public API: (no) - The schema: (no ) - The default values of configurations: (yes) - The wire protocol: (no) ### Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-client-go] zzzming opened a new issue, #1094: The Dialer in the connection.go should respect OS TCP default
zzzming opened a new issue, #1094: URL: https://github.com/apache/pulsar-client-go/issues/1094 Expected behavior The Dialer in the connection.go should respect OS TCP default. Actual behavior Although the `ConnectionTimeout` can be a user specified value at the NewClient() creation, the default is 10 seconds that is hard coded in client_impl.go (https://github.com/apache/pulsar-client-go/blob/master/pulsar/client_impl.go#L34) In fact, the previous value was 5 seconds. It was increased to 10 seconds by this PR https://github.com/apache/pulsar-client-go/pull/563 I believe we should not tweak Go's default. That is to respect the OS default. Here is the Go's net.Dialer comments. It states the TCP timeouts are often around 3 minutes. Ubuntu version I checked is at 2 minutes. In the NON TLS dial, the same timeout is used. Go's net.DialTimeout states the timeout also includes name resolution, if it resolves to multiple IPs, the timeout is shared between each consecutive dial. This could result more time spent on dialing. ``` type Dialer struct { // Timeout is the maximum amount of time a dial will wait for // a connect to complete. If Deadline is also set, it may fail // earlier. // // The default is no timeout. // // When using TCP and dialing a host name with multiple IP // addresses, the timeout may be divided between them. // // With or without a timeout, the operating system may impose // its own earlier timeout. For instance, TCP timeouts are // often around 3 minutes. Timeout time.Duration ``` ``` // DialTimeout acts like Dial but takes a timeout. // // The timeout includes name resolution, if required. // When using TCP, and the host in the address parameter resolves to // multiple IP addresses, the timeout is spread over each consecutive // dial, such that each is given an appropriate fraction of the time // to connect. // // See func Dial for a description of the network and address // parameters. func DialTimeout(network, address string, timeout time.Duration) (Conn, error) { ``` Therefore, I think the default of the client library should respect the OS setting. It means do not set the timeout if an application does not set it. Steps to reproduce System configuration Irrelevant -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] codelipenghui commented on a diff in pull request #21183: [fix] [broker] Make the new exclusive consumer instead the inactive one faster
codelipenghui commented on code in PR #21183: URL: https://github.com/apache/pulsar/pull/21183#discussion_r1327278706 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java: ## @@ -166,6 +166,10 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) { +Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this); +if (actConsumer != null) { +actConsumer.cnx().checkConnectionLiveness(); +} Review Comment: But the new consumer will not retry to connect to the topic, right? Do we need to wait for the connection liveness check done? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] tisonkun commented on pull request #20764: [refactor][cli][PIP-280] Refactor `pulsar-client-tools` module
tisonkun commented on PR #20764: URL: https://github.com/apache/pulsar/pull/20764#issuecomment-1721269192 Thanks for your contribution! I'll give a review this weekend. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch master updated: [fix][broker] fix bug caused by optimistic locking (#18390)
This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 2aa8c3b1a28 [fix][broker] fix bug caused by optimistic locking (#18390) 2aa8c3b1a28 is described below commit 2aa8c3b1a284673b512db2b815514343ec3e7c19 Author: thetumbled <52550727+thetumb...@users.noreply.github.com> AuthorDate: Fri Sep 15 21:12:39 2023 +0800 [fix][broker] fix bug caused by optimistic locking (#18390) --- .../util/collections/ConcurrentLongHashMap.java| 25 - .../collections/ConcurrentLongLongPairHashMap.java | 39 - .../util/collections/ConcurrentLongPairSet.java| 41 +- .../util/collections/ConcurrentOpenHashMap.java| 39 - .../util/collections/ConcurrentOpenHashSet.java| 22 +++- .../apache/pulsar/common/util/FutureUtilTest.java | 1 - .../collections/ConcurrentLongHashMapTest.java | 62 - .../ConcurrentLongLongPairHashMapTest.java | 65 ++ .../collections/ConcurrentLongPairSetTest.java | 64 + .../collections/ConcurrentOpenHashMapTest.java | 64 + .../collections/ConcurrentOpenHashSetTest.java | 64 + 11 files changed, 414 insertions(+), 72 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java index 26948a2f4bf..b6408ee9819 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java @@ -325,16 +325,17 @@ public class ConcurrentLongHashMap { } V get(long key, int keyHash) { -int bucket = keyHash; - long stamp = tryOptimisticRead(); boolean acquiredLock = false; +// add local variable here, so OutOfBound won't happen +long[] keys = this.keys; +V[] values = this.values; +// calculate table.length as capacity to avoid rehash changing capacity +int bucket = signSafeMod(keyHash, values.length); + try { while (true) { -int capacity = this.capacity; -bucket = signSafeMod(bucket, capacity); - // First try optimistic locking long storedKey = keys[bucket]; V storedValue = values[bucket]; @@ -352,16 +353,15 @@ public class ConcurrentLongHashMap { if (!acquiredLock) { stamp = readLock(); acquiredLock = true; + +// update local variable +keys = this.keys; +values = this.values; +bucket = signSafeMod(keyHash, values.length); storedKey = keys[bucket]; storedValue = values[bucket]; } -if (capacity != this.capacity) { -// There has been a rehashing. We need to restart the search -bucket = keyHash; -continue; -} - if (storedKey == key) { return storedValue != DeletedValue ? storedValue : null; } else if (storedValue == EmptyValue) { @@ -369,8 +369,7 @@ public class ConcurrentLongHashMap { return null; } } - -++bucket; +bucket = (bucket + 1) & (values.length - 1); } } finally { if (acquiredLock) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java index 57a024185e0..cfa414278cc 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java @@ -304,6 +304,9 @@ public class ConcurrentLongLongPairHashMap { // A section is a portion of the hash map that is covered by a single @SuppressWarnings("serial") private static final class Section extends StampedLock { +// Each item take up 4 continuous array space. +private static final int ITEM_SIZE = 4; + // Keys and values are stored
[GitHub] [pulsar] codelipenghui closed issue #18388: [Bug] ArrayIndexOutOfBoundsException caused by optimistic locking in ConcurrentLongLongPairHashMap implementation
codelipenghui closed issue #18388: [Bug] ArrayIndexOutOfBoundsException caused by optimistic locking in ConcurrentLongLongPairHashMap implementation URL: https://github.com/apache/pulsar/issues/18388 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] codelipenghui merged pull request #18390: [fix][broker] fix bug caused by optimistic locking
codelipenghui merged PR #18390: URL: https://github.com/apache/pulsar/pull/18390 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] branch branch-3.0 updated: [fix][bk] Improve to the ReplicaitonWorker performance by deleting invalid underreplication nodes (#21160)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 8b1c2a4d3a9 [fix][bk] Improve to the ReplicaitonWorker performance by deleting invalid underreplication nodes (#21160) 8b1c2a4d3a9 is described below commit 8b1c2a4d3a96cc8c637dbce73ce9a9c2adef7edf Author: Yan Zhao AuthorDate: Fri Sep 15 20:17:55 2023 +0800 [fix][bk] Improve to the ReplicaitonWorker performance by deleting invalid underreplication nodes (#21160) --- .../PulsarLedgerUnderreplicationManager.java | 29 ++ .../LedgerUnderreplicationManagerTest.java | 66 ++ 2 files changed, 95 insertions(+) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java index 79fdc44cb2b..dda8d7256ed 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java @@ -25,10 +25,12 @@ import static org.apache.bookkeeper.proto.DataFormats.LockDataFormat; import static org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat; import static org.apache.bookkeeper.proto.DataFormats.ReplicasCheckFormat; import static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat; +import com.google.common.base.Joiner; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.TextFormat; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -61,6 +63,8 @@ import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.apache.zookeeper.KeeperException; @Slf4j public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicationManager { @@ -393,6 +397,31 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati Lock l = heldLocks.get(ledgerId); if (l != null) { store.delete(getUrLedgerPath(ledgerId), Optional.of(l.getLedgerNodeVersion())).get(); +if (store instanceof ZKMetadataStore) { +try { +// clean up the hierarchy +String[] parts = getUrLedgerPath(ledgerId).split("/"); +for (int i = 1; i <= 4; i++) { +String[] p = Arrays.copyOf(parts, parts.length - i); +String path = Joiner.on("/").join(p); +Optional getResult = store.get(path).get(); +if (getResult.isPresent()) { +store.delete(path, Optional.of(getResult.get().getStat().getVersion())).get(); +} +} +} catch (ExecutionException ee) { +// This can happen when cleaning up the hierarchy. +// It's safe to ignore, it simply means another +// ledger in the same hierarchy has been marked as +// underreplicated. +if (ee.getCause() instanceof MetadataStoreException && ee.getCause().getCause() +instanceof KeeperException.NotEmptyException) { +//do nothing. +} else { +log.warn("Error deleting underrepcalited ledger parent node", ee); +} +} +} } } catch (ExecutionException ee) { if (ee.getCause() instanceof MetadataStoreException.NotFoundException) { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java index 0df325b3c57..649dc1663c6 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java @@ -23,12 +23,14 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static
[GitHub] [pulsar] Mortom123 opened a new issue, #21191: Feature Request: Custom listener_name for Functions
Mortom123 opened a new issue, #21191: URL: https://github.com/apache/pulsar/issues/21191 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Motivation The motivation of this feature comes from: #21088 To be more specific from [this comment](https://github.com/apache/pulsar/issues/21088#issuecomment-1717797667). Currently, when creating a function worker / sink / source one can assign a custom `broker-service-url` but not a `listener_name` usable through the brokers `advertisedListeners`, hence preventing running workers on dedicated machines outside of the brokers network (as far as I'm aware of). ### Solution Enable function workers / sinks / sources to be created with a custom `--listener_name` flag used by the client. ### Alternatives _No response_ ### Anything else? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #21161: [fix] [auto-recovery] Fix PulsarLedgerUnderreplicationManager notify problem.
poorbarcode commented on code in PR #21161: URL: https://github.com/apache/pulsar/pull/21161#discussion_r1327068442 ## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java: ## @@ -232,18 +236,26 @@ private void handleNotification(Notification n) { synchronized (this) { // Notify that there were some changes on the under-replicated z-nodes notifyAll(); - -if (n.getType() == NotificationType.Deleted) { -if (n.getPath().equals(basePath + '/' + BookKeeperConstants.DISABLE_NODE)) { -log.info("LedgerReplication is enabled externally through MetadataStore, " -+ "since DISABLE_NODE ZNode is deleted"); -if (replicationEnabledListener != null) { -replicationEnabledListener.operationComplete(0, null); -} -} else if (n.getPath().equals(lostBookieRecoveryDelayPath)) { -if (lostBookieRecoveryDelayListener != null) { - lostBookieRecoveryDelayListener.operationComplete(0, null); -} +if (lostBookieRecoveryDelayPath.equals(n.getPath())) { +List> callbackList; +synchronized (lostBookieRecoveryDelayCallbacks) { +callbackList = new ArrayList<>(lostBookieRecoveryDelayCallbacks); +lostBookieRecoveryDelayCallbacks.clear(); +} +for (BookkeeperInternalCallbacks.GenericCallback callback : callbackList) { +callback.operationComplete(0, null); +} +} +if (replicationDisablePath.equals(n.getPath()) && n.getType() == NotificationType.Deleted) { Review Comment: So, once the task is started, it does not support suspend, right? Could you add a comment to explain it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] horizonzy commented on pull request #21161: [fix] [auto-recovery] Fix PulsarLedgerUnderreplicationManager notify problem.
horizonzy commented on PR #21161: URL: https://github.com/apache/pulsar/pull/21161#issuecomment-1720989351 > #21188 Sorry, I comment by mistake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on pull request #21161: [fix] [auto-recovery] Fix PulsarLedgerUnderreplicationManager notify problem.
poorbarcode commented on PR #21161: URL: https://github.com/apache/pulsar/pull/21161#issuecomment-1720988012 @horizonzy > We need merge https://github.com/apache/pulsar/pull/21159 https://github.com/apache/pulsar/pull/21161 https://github.com/apache/pulsar/pull/21181 first, then start view this pr. I think this comment should be added under the PR https://github.com/apache/pulsar/pull/21188, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] horizonzy commented on a diff in pull request #21188: [improve] [auto-recovery] Migrate the replication testing from BookKeeper to Pulsar.
horizonzy commented on code in PR #21188: URL: https://github.com/apache/pulsar/pull/21188#discussion_r1327055923 ## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java: ## @@ -105,14 +105,17 @@ long getLedgerNodeVersion() { private final String urLockPath; private final String layoutPath; private final String lostBookieRecoveryDelayPath; +private final String replicationDisablePath; private final String checkAllLedgersCtimePath; private final String placementPolicyCheckCtimePath; private final String replicasCheckCtimePath; private final MetadataStoreExtended store; -private BookkeeperInternalCallbacks.GenericCallback replicationEnabledListener; -private BookkeeperInternalCallbacks.GenericCallback lostBookieRecoveryDelayListener; Review Comment: > Why change the type to a collection? Only the latest event needs to be notified, right? Not, if the user registers two callbacks, the previous callback shouldn't be overridden. It also need trigger callback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] horizonzy commented on pull request #21161: [fix] [auto-recovery] Fix PulsarLedgerUnderreplicationManager notify problem.
horizonzy commented on PR #21161: URL: https://github.com/apache/pulsar/pull/21161#issuecomment-1720969707 We need merge #21159 #21161 #21181 first, then start view this pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] horizonzy commented on a diff in pull request #21188: [improve] [auto-recovery] Migrate the replication testing from BookKeeper to Pulsar.
horizonzy commented on code in PR #21188: URL: https://github.com/apache/pulsar/pull/21188#discussion_r1327047652 ## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java: ## @@ -105,14 +105,17 @@ long getLedgerNodeVersion() { private final String urLockPath; private final String layoutPath; private final String lostBookieRecoveryDelayPath; +private final String replicationDisablePath; private final String checkAllLedgersCtimePath; private final String placementPolicyCheckCtimePath; private final String replicasCheckCtimePath; private final MetadataStoreExtended store; -private BookkeeperInternalCallbacks.GenericCallback replicationEnabledListener; -private BookkeeperInternalCallbacks.GenericCallback lostBookieRecoveryDelayListener; Review Comment: Please review https://github.com/apache/pulsar/pull/21161 first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] liangyepianzhou opened a new pull request, #21190: [fix][broker][branch-2.10] limit the memory used by reads end-to-end
liangyepianzhou opened a new pull request, #21190: URL: https://github.com/apache/pulsar/pull/21190 ### Motivation Cherry-picking https://github.com/apache/pulsar/pull/18245. ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #21188: [improve] [auto-recovery] Migrate the replication testing from BookKeeper to Pulsar.
poorbarcode commented on code in PR #21188: URL: https://github.com/apache/pulsar/pull/21188#discussion_r1327040690 ## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java: ## @@ -105,14 +105,17 @@ long getLedgerNodeVersion() { private final String urLockPath; private final String layoutPath; private final String lostBookieRecoveryDelayPath; +private final String replicationDisablePath; private final String checkAllLedgersCtimePath; private final String placementPolicyCheckCtimePath; private final String replicasCheckCtimePath; private final MetadataStoreExtended store; -private BookkeeperInternalCallbacks.GenericCallback replicationEnabledListener; -private BookkeeperInternalCallbacks.GenericCallback lostBookieRecoveryDelayListener; Review Comment: Why change the type to a collection? Only the latest event needs to be notified, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar] 01/02: InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel) (#18245)
This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch InflightReadsLimiter in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 47c98e57de5eeb5a114c05eb17698d3fd2b3342a Author: Enrico Olivelli AuthorDate: Fri Nov 11 14:08:41 2022 +0100 InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel) (#18245) * InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel) Motivation: Broker can go out of memory due to many reads enqueued on the PersistentDispatcherMultipleConsumers dispatchMessagesThread (that is used in case of dispatcherDispatchMessagesInSubscriptionThread set to true, that is the default value) The limit of the amount of memory retained due to reads MUST take into account also the entries coming from the Cache. When dispatcherDispatchMessagesInSubscriptionThread is false (the behaviour of Pulsar 2.10) there is some kind of natural (but still unpredictable!!) back pressure mechanism because the thread that receives the entries from BK of the cache dispatches immediately and synchronously the entries to the consumer and releases them Modifications: - Add a new component (InflightReadsLimiter) that keeps track of the overall amount of memory retained due to inflight reads. - Add a new configuration entry managedLedgerMaxReadsInFlightSizeInMB - The feature is disabled by default - Add new metrics to track the values * Change error message * checkstyle * Fix license * remove duplicate method after cherry-pick * Rename onDeallocate (cherry picked from commit 6fec66b12b04a37e4c2b05d78d4e33b380c270df) --- conf/broker.conf | 10 +- .../mledger/ManagedLedgerFactoryConfig.java| 5 + .../apache/bookkeeper/mledger/impl/EntryImpl.java | 25 +++ .../mledger/impl/cache/InflightReadsLimiter.java | 137 .../mledger/impl/cache/RangeEntryCacheImpl.java| 107 + .../impl/cache/RangeEntryCacheManagerImpl.java | 8 +- .../mledger/impl/EntryCacheManagerTest.java| 3 + .../bookkeeper/mledger/impl/EntryCacheTest.java| 2 + .../impl/cache/InflightReadsLimiterTest.java | 172 + .../apache/pulsar/broker/ServiceConfiguration.java | 6 + .../pulsar/broker/ManagedLedgerClientFactory.java | 2 + .../broker/service/AbstractBaseDispatcher.java | 10 ++ .../broker/service/PulsarCommandSenderImpl.java| 16 +- .../PersistentDispatcherMultipleConsumers.java | 11 +- ...istentStreamingDispatcherMultipleConsumers.java | 8 +- 15 files changed, 515 insertions(+), 7 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 9d7c68bc34e..466b4d8c0f3 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1064,7 +1064,15 @@ managedLedgerCursorRolloverTimeInSeconds=14400 # crashes. managedLedgerMaxUnackedRangesToPersist=1 -# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher +# Maximum amount of memory used hold data read from storage (or from the cache). +# This mechanism prevents the broker to have too many concurrent +# reads from storage and fall into Out of Memory errors in case +# of multiple concurrent reads to multiple concurrent consumers. +# Set 0 in order to disable the feature. +# +managedLedgerMaxReadsInFlightSizeInMB=0 + +# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher # than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into # zookeeper. managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index 78314be45c3..7db020969b7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -57,6 +57,11 @@ public class ManagedLedgerFactoryConfig { */ private boolean copyEntriesInCache = false; +/** + * Maximum number of (estimated) data in-flight reading from storage and the cache. + */ +private long managedLedgerMaxReadsInFlightSize = 0; + /** * Whether trace managed ledger task execution time. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 30503092398..ab0c4ec28d2 100644 ---
[pulsar] branch InflightReadsLimiter created (now eeb80e1092b)
This is an automated email from the ASF dual-hosted git repository. xiangying pushed a change to branch InflightReadsLimiter in repository https://gitbox.apache.org/repos/asf/pulsar.git at eeb80e1092b [fix][broker][branch-2.10] limit the memory used by reads end-to-end This branch includes the following new commits: new 47c98e57de5 InflightReadsLimiter - limit the memory used by reads end-to-end (from storage/cache to the write to the consumer channel) (#18245) new eeb80e1092b [fix][broker][branch-2.10] limit the memory used by reads end-to-end The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[pulsar] 02/02: [fix][broker][branch-2.10] limit the memory used by reads end-to-end
This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch InflightReadsLimiter in repository https://gitbox.apache.org/repos/asf/pulsar.git commit eeb80e1092b6d289f1adbfd65b87cecc9b29b900 Author: xiangying <1984997...@qq.com> AuthorDate: Fri Sep 15 17:27:32 2023 +0800 [fix][broker][branch-2.10] limit the memory used by reads end-to-end --- .../mledger/impl/cache/InflightReadsLimiter.java | 2 +- .../bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java | 14 +++--- .../mledger/impl/cache/InflightReadsLimiterTest.java | 2 +- .../persistent/PersistentDispatcherMultipleConsumers.java | 9 +++-- .../PersistentStreamingDispatcherMultipleConsumers.java| 7 ++- 5 files changed, 14 insertions(+), 20 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java index b946dc09a0c..f3848b6ddd9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 5401ae3563c..3e7401bc512 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -29,7 +29,6 @@ import io.netty.buffer.PooledByteBufAllocator; import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.client.api.LedgerEntry; @@ -72,7 +71,6 @@ public class RangeEntryCacheImpl implements EntryCache { public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) { this.manager = manager; this.ml = ml; -this.pendingReadsManager = new PendingReadsManager(this); this.interceptor = ml.getManagedLedgerInterceptor(); this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds(); this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp); @@ -281,14 +279,14 @@ public class RangeEntryCacheImpl implements EntryCache { @SuppressWarnings({ "unchecked", "rawtypes" }) private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final ReadEntriesCallback callback, Object ctx) { -asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null); +asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx, null); } -void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, +void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) { final AsyncCallbacks.ReadEntriesCallback callback = -handlePendingReadsLimits(lh, firstEntry, lastEntry, shouldCacheEntry, +handlePendingReadsLimits(lh, firstEntry, lastEntry, isSlowestReader, originalCallback, ctx, handle); if (callback == null) { return; @@ -371,8 +369,10 @@ public class RangeEntryCacheImpl implements EntryCache { private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, - AsyncCallbacks.ReadEntriesCallback originalCallback, -Object ctx, InflightReadsLimiter.Handle handle) { + AsyncCallbacks.ReadEntriesCallback + originalCallback, +Object ctx, +
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #21188: [improve] [auto-recovery] Migrate the replication testing from BookKeeper to Pulsar.
poorbarcode commented on code in PR #21188: URL: https://github.com/apache/pulsar/pull/21188#discussion_r1327040690 ## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java: ## @@ -105,14 +105,17 @@ long getLedgerNodeVersion() { private final String urLockPath; private final String layoutPath; private final String lostBookieRecoveryDelayPath; +private final String replicationDisablePath; private final String checkAllLedgersCtimePath; private final String placementPolicyCheckCtimePath; private final String replicasCheckCtimePath; private final MetadataStoreExtended store; -private BookkeeperInternalCallbacks.GenericCallback replicationEnabledListener; -private BookkeeperInternalCallbacks.GenericCallback lostBookieRecoveryDelayListener; Review Comment: Why change the type of `lostBookieRecoveryDelayListener` to a collection? Only the latest event needs to be notified, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] poorbarcode commented on a diff in pull request #21188: [improve] [auto-recovery] Migrate the replication testing from BookKeeper to Pulsar.
poorbarcode commented on code in PR #21188: URL: https://github.com/apache/pulsar/pull/21188#discussion_r1327040690 ## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java: ## @@ -105,14 +105,17 @@ long getLedgerNodeVersion() { private final String urLockPath; private final String layoutPath; private final String lostBookieRecoveryDelayPath; +private final String replicationDisablePath; private final String checkAllLedgersCtimePath; private final String placementPolicyCheckCtimePath; private final String replicasCheckCtimePath; private final MetadataStoreExtended store; -private BookkeeperInternalCallbacks.GenericCallback replicationEnabledListener; -private BookkeeperInternalCallbacks.GenericCallback lostBookieRecoveryDelayListener; Review Comment: Why change the type to a collection? Only the latest event needs to be notified, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] thetumbled commented on pull request #18390: [fix][broker] fix bug caused by optimistic locking
thetumbled commented on PR #18390: URL: https://github.com/apache/pulsar/pull/18390#issuecomment-1720956794 /pulsarbot run-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[pulsar-dotpulsar] annotated tag 3.0.1 updated (3018d07 -> bbedf18)
This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a change to annotated tag 3.0.1 in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git *** WARNING: tag 3.0.1 was modified! *** from 3018d07 (commit) to bbedf18 (tag) tagging 3018d07603dbd76eb7efcac2fcfd44aec7990b2d (commit) replaces 3.0.0 by Daniel Blankensteiner on Fri Sep 15 10:39:20 2023 +0200 - Log - Release 3.0.1 --- No new revisions were added by this update. Summary of changes:
[pulsar-dotpulsar] branch master updated: Make ready for release 3.0.1
This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git The following commit(s) were added to refs/heads/master by this push: new 3018d07 Make ready for release 3.0.1 3018d07 is described below commit 3018d07603dbd76eb7efcac2fcfd44aec7990b2d Author: Daniel Blankensteiner AuthorDate: Fri Sep 15 10:36:21 2023 +0200 Make ready for release 3.0.1 --- CHANGELOG.md | 7 ++- src/DotPulsar/DotPulsar.csproj | 2 +- tests/DotPulsar.Tests/ConsumerTests.cs | 80 +++--- tests/DotPulsar.Tests/ReaderTests.cs | 70 - 4 files changed, 65 insertions(+), 94 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dd7a71..1247972 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,16 +4,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [3.0.1] - 2023-09-15 ### Changed -- When calling GetLastMessageId(s) on a Reader or Consumer, it returns a MessageId without the topic field if - MessageId.Earliest is found. +- When calling GetLastMessageId(s) on a Reader or Consumer, it returns a MessageId without the topic field if MessageId.Earliest is found ### Fixed -- Fixed issue with DotPulsar client not handling connection faults for consumers and readers. +- Fixed issue with the DotPulsar client not handling connection faults for consumers and readers ## [3.0.0] - 2023-08-30 diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj index 36c6bb4..0e8fbd3 100644 --- a/src/DotPulsar/DotPulsar.csproj +++ b/src/DotPulsar/DotPulsar.csproj @@ -2,7 +2,7 @@ netstandard2.0;netstandard2.1;net6.0;net7.0 -3.0.0 +3.0.1 $(Version) $(Version) ApachePulsar,DanskeCommodities,dblank diff --git a/tests/DotPulsar.Tests/ConsumerTests.cs b/tests/DotPulsar.Tests/ConsumerTests.cs index 4c63a51..6460dbe 100644 --- a/tests/DotPulsar.Tests/ConsumerTests.cs +++ b/tests/DotPulsar.Tests/ConsumerTests.cs @@ -1,4 +1,4 @@ -/* +/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -208,26 +208,15 @@ public class ConsumerTests { //Arrange var semaphoreSlim = new SemaphoreSlim(1); -await using var -client = PulsarClient.Builder().ExceptionHandler(context => -{ -semaphoreSlim.WaitAsync(); -context.Result = FaultAction.Rethrow; -context.ExceptionHandled = true; -}) -.ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a cluster that does not exists. -.Build(); - -await using var consumer = client.NewConsumer(Schema.String) -.StateChangedHandler(changed => -{ -var topic = changed.Consumer.Topic; -var state = changed.ConsumerState; -_testOutputHelper.WriteLine($"The consumer for topic '{topic}' changed state to '{state}'"); -}) -.SubscriptionName("MySubscription") -.Topic("persistent://public/default/mytopic") -.Create(); +await using var client = PulsarClient.Builder().ExceptionHandler(context => +{ +semaphoreSlim.WaitAsync(); +context.Result = FaultAction.Rethrow; +context.ExceptionHandled = true; +}) +.ServiceUrl(new Uri("pulsar://nosuchhost")).Build(); + +await using var consumer = CreateConsumer(client, SubscriptionInitialPosition.Earliest, "persistent://a/b/c", "cn", "sn"); var receiveTask = consumer.Receive().AsTask(); semaphoreSlim.Release(); @@ -243,29 +232,16 @@ public class ConsumerTests public async Task Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFaultedException() { //Arrange -var cts = new CancellationTokenSource(); - -await using var -client = PulsarClient.Builder().ExceptionHandler(context => -{ -context.Result = FaultAction.Rethrow; -context.ExceptionHandled = true; -}) -.ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a cluster that does not exists. -.Build(); - -await using var consumer = client.NewConsumer(Schema.String) -.StateChangedHandler(changed => -{ -var topic = changed.Consumer.Topic; -var state = changed.ConsumerState; -
[GitHub] [pulsar] horizonzy closed pull request #21189: [test] [branch-3.0] Test branch-3.0 ci
horizonzy closed pull request #21189: [test] [branch-3.0] Test branch-3.0 ci URL: https://github.com/apache/pulsar/pull/21189 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] crossoverJie commented on issue #21172: [Bug] Topic compaction task can't stop when topic closed lead to compactionDurationTimeInMills mistake of statistics
crossoverJie commented on issue #21172: URL: https://github.com/apache/pulsar/issues/21172#issuecomment-1720800137 @coderzc Plz assign to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] mattisonchao closed pull request #21189: [test] [branch-3.0] Test branch-3.0 ci
mattisonchao closed pull request #21189: [test] [branch-3.0] Test branch-3.0 ci URL: https://github.com/apache/pulsar/pull/21189 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] liangyepianzhou commented on pull request #21166: [improve][client] PIP-302 Add alwaysRefresh Configuration Option for TableView to Read Latest Values
liangyepianzhou commented on PR #21166: URL: https://github.com/apache/pulsar/pull/21166#issuecomment-1720784784 **Prerequisite:** Since messages are constantly being written into the Topic and there is no read-write lock guarantee, we cannot assure the retrieval of the most up-to-date value. **Implementation Goal:** Record a checkpoint before reading and ensure the retrieval of the latest value of the key up to this checkpoint. **Use Case:** When read and write operations for a certain key do not occur simultaneously, we can refresh the TableView before reading the key to obtain the latest value for this key. Based on the above, I believe it would be more reasonable to introduce an asynchronous API `refreshAsync()`. There are two reasons for this: * It provides more flexibility for the user. * The operation of refresh is easier for users to understand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] horizonzy closed pull request #21189: [test] [branch-3.0] Test branch-3.0 ci
horizonzy closed pull request #21189: [test] [branch-3.0] Test branch-3.0 ci URL: https://github.com/apache/pulsar/pull/21189 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] horizonzy opened a new pull request, #21189: test commit.
horizonzy opened a new pull request, #21189: URL: https://github.com/apache/pulsar/pull/21189 Fixes #xyz Main Issue: #xyz PIP: #xyz ### Motivation ### Modifications ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org