Re: [PR] [fix][broker] Support running docker container with gid != 0 [pulsar]
lhotari commented on PR #22081: URL: https://github.com/apache/pulsar/pull/22081#issuecomment-1956079740 I created a separate issue about the issue with `readOnlyRootFilesystem`, #22088. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Bug] Pulsar's containers fail to start in k8s with readOnlyRootFilesystem pod/container security context [pulsar]
lhotari opened a new issue, #22088: URL: https://github.com/apache/pulsar/issues/22088 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version any, for example 3.0.2 ### Minimal reproduce step ``` ❯ docker run --user 1:10001 -e advertisedAddress=foobar --rm --read-only apachepulsar/pulsar:3.0.2 sh \ -c "bin/apply-config-from-env.py \ conf/standalone.conf && \ bin/pulsar standalone" [conf/standalone.conf] Applying config advertisedAddress = foobar Traceback (most recent call last): File "/pulsar/bin/apply-config-from-env.py", line 104, in f = open(conf_filename, 'w') OSError: [Errno 30] Read-only file system: 'conf/standalone.conf' ``` ### What did you expect to see? I should be possible to run Pulsar container with read only root filesystem. ### What did you see instead? `OSError: [Errno 30] Read-only file system` error ### Anything else? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Support running docker container with gid != 0 [pulsar]
lhotari commented on PR #22081: URL: https://github.com/apache/pulsar/pull/22081#issuecomment-1956071168 Good way to reproduce the issue with the current container (before applying this fix): ``` ❯ docker run --user 1:10001 -e advertisedAddress=foobar --rm -it apachepulsar/pulsar:3.0.2 sh \ -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone" [conf/standalone.conf] Applying config advertisedAddress = foobar Traceback (most recent call last): File "/pulsar/bin/apply-config-from-env.py", line 104, in f = open(conf_filename, 'w') PermissionError: [Errno 13] Permission denied: 'conf/standalone.conf' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][sec] Upgrade commons-compress to 1.26.0 [pulsar]
lhotari commented on PR #22086: URL: https://github.com/apache/pulsar/pull/22086#issuecomment-1956062185 > `commons-compress` v1.21 has no dependencies on other libraries, but v1.26.0 has dependencies on `commons-io` and `commons-lang3`, which seems to cause `JavaInstanceDepsTest` to fail. > > ``` > [INFO] +- org.apache.commons:commons-compress:jar:1.26.0:compile > [INFO] | +- commons-io:commons-io:jar:2.15.1:compile > [INFO] | \- org.apache.commons:commons-lang3:jar:3.14.0:compile > ``` > > Therefore, I modified `JavaInstanceDepsTest` so that the test passes even if `java-instance.jar` includes classes from these libraries. Please let me know if this change is incorrect. @massakam Makes sense. Thanks for addressing this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][sec] Upgrade commons-compress to 1.26.0 [pulsar]
massakam commented on PR #22086: URL: https://github.com/apache/pulsar/pull/22086#issuecomment-1956060456 `commons-compress` v1.21 has no dependencies on other libraries, but v1.26.0 has dependencies on `commons-io` and `commons-lang3`, which seems to cause `JavaInstanceDepsTest` to fail. ``` [INFO] +- org.apache.commons:commons-compress:jar:1.26.0:compile [INFO] | +- commons-io:commons-io:jar:2.15.1:compile [INFO] | \- org.apache.commons:commons-lang3:jar:3.14.0:compile ``` Therefore, I modified `JavaInstanceDepsTest` so that the test passes even if `java-instance.jar` includes classes from these libraries. Please let me know if this change is incorrect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] Issue in using relational db mysql and postgres as source for pulsar io topics [pulsar]
lhotari commented on issue #22083: URL: https://github.com/apache/pulsar/issues/22083#issuecomment-1956054877 > Pulsar 3.0.1 Do you have a chance to test with Pulsar 3.0.2 since that's the latest released 3.0.x version. Just to make sure that it's not already fixed. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Support running docker container with gid != 0 [pulsar]
lhotari merged PR #22081: URL: https://github.com/apache/pulsar/pull/22081 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][broker] Support running docker container with gid != 0 (#22081)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 4097ddd5e8c [fix][broker] Support running docker container with gid != 0 (#22081) 4097ddd5e8c is described below commit 4097ddd5e8c4fae4d95c939222341e5ad5dd6d20 Author: Lari Hotari AuthorDate: Wed Feb 21 09:34:29 2024 +0200 [fix][broker] Support running docker container with gid != 0 (#22081) --- docker/pulsar/Dockerfile | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docker/pulsar/Dockerfile b/docker/pulsar/Dockerfile index 4e5885ce55d..6a0dc0100e7 100644 --- a/docker/pulsar/Dockerfile +++ b/docker/pulsar/Dockerfile @@ -36,10 +36,14 @@ COPY scripts/install-pulsar-client.sh /pulsar/bin # The final image needs to give the root group sufficient permission for Pulsar components # to write to specific directories within /pulsar +# The ownership is changed to uid 1 to allow using a different root group. This is necessary when running the +# container when gid=0 is prohibited. In that case, the container must be run with uid 1 with +# any group id != 0 (for example 10001). # The file permissions are preserved when copying files from this builder image to the target image. RUN for SUBDIRECTORY in conf data download logs; do \ [ -d /pulsar/$SUBDIRECTORY ] || mkdir /pulsar/$SUBDIRECTORY; \ - chmod -R g+w /pulsar/$SUBDIRECTORY; \ + chmod -R ug+w /pulsar/$SUBDIRECTORY; \ + chown -R 1:0 /pulsar/$SUBDIRECTORY; \ done ### Create 2nd stage from Ubuntu image
Re: [I] [Bug] PIP-307 changes don't currently cover advertised listeners support (PIP-61/PIP-95) [pulsar]
lhotari commented on issue #22061: URL: https://github.com/apache/pulsar/issues/22061#issuecomment-1956048894 There's also a need for SNI proxy support to cover all connectivity options. I think that it is commonly used together with Pulsar's SNI proxy feature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][client] fixed getProxyConnection when the topic is migrated [pulsar]
lhotari commented on PR #22085: URL: https://github.com/apache/pulsar/pull/22085#issuecomment-1956044155 Since this is touching this area, I'm reminding about #22061 . :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][sec] Upgrade commons-compress to 1.26.0 [pulsar]
lhotari commented on PR #22086: URL: https://github.com/apache/pulsar/pull/22086#issuecomment-1956041194 JavaInstanceDepsTest seems to be failing. Please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [txn] Get previous position by managed ledger. [pulsar]
codecov-commenter commented on PR #22024: URL: https://github.com/apache/pulsar/pull/22024#issuecomment-1956034191 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22024?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) Report All modified and coverable lines are covered by tests :white_check_mark: > Comparison is base [(`825e997`)](https://app.codecov.io/gh/apache/pulsar/commit/825e997216dabe23a6dde0945ef769bbda0558e4?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) 73.57% compared to head [(`b9b8913`)](https://app.codecov.io/gh/apache/pulsar/pull/22024?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) 73.54%. > Report is 2 commits behind head on master. Additional details and impacted files [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/22024/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22024?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) ```diff @@ Coverage Diff @@ ## master #22024 +/- ## - Coverage 73.57% 73.54% -0.03% + Complexity3257532049 -526 Files 1874 1874 Lines139252 139252 Branches 1526015260 - Hits 102454 102413 -41 - Misses2888028916 +36 - Partials 7918 7923 +5 ``` | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/22024/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/22024/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `24.63% <100.00%> (-0.15%)` | :arrow_down: | | [systests](https://app.codecov.io/gh/apache/pulsar/pull/22024/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `24.26% <0.00%> (-0.09%)` | :arrow_down: | | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/22024/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `72.81% <100.00%> (-0.03%)` | :arrow_down: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. | [Files](https://app.codecov.io/gh/apache/pulsar/pull/22024?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [...ransaction/buffer/impl/TopicTransactionBuffer.java](https://app.codecov.io/gh/apache/pulsar/pull/22024?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9idWZmZXIvaW1wbC9Ub3BpY1RyYW5zYWN0aW9uQnVmZmVyLmphdmE=) | `87.74% <100.00%> (ø)` | | ... and [71 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/22024/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Flaky-test: ExtensibleLoadManagerImplTest.testGetMetrics [pulsar]
lhotari commented on issue #21556: URL: https://github.com/apache/pulsar/issues/21556#issuecomment-1956028348 > After some troubleshooting ([failure*.txt reproduce](https://github.com/MMirelli/pulsar-flake-troubleshooter/tree/9e7b9e2d33d9283dded0f63653d08a92902dbe09/fix-21556/data) the issue), the test seems to have some concurrency issues. From the runs I did I could gather the following stats: > > ``` > | Metric | Runs| Counter | > |:---:|:---:|:---:| > | brk_sunit_state_chn_subscribe_ops_total | 1,10| handlerCounters | > | brk_sunit_state_chn_owner_lookup_total | 2,3,6,9 | ownerLookUpCounters | > | brk_lb_assign_broker_breakdown_total| 4,5,7,8 | breakdownCounters | > ``` > > Where, for example, the first row reads: runs 1 and 10 showed an issue with metric named `brk_sunit_state_chn_subscribe_ops_total`, which is stored in `handlerCounters`. It seems that most of the issues might be caused by the fact that we are not using concurrent collections and / or lack of synchronization. As they are metrics we may anyway want to skip synchronization, so I am unsure whether it makes sense to put the time to fix (all) these issues. Impressive analysis @MMirelli ! Thanks. @heesung-sn @dragosvictor Please check this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][client] fixed getProxyConnection when the topic is migrated [pulsar]
dragosvictor commented on code in PR #22085: URL: https://github.com/apache/pulsar/pull/22085#discussion_r1497008116 ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java: ## @@ -990,8 +991,13 @@ public CompletableFuture getConnectionToServiceUrl() { return getConnection(address, address, cnxPool.genRandomKeyToSelectCon()); } -public CompletableFuture getProxyConnection(final InetSocketAddress logicalAddress, +public CompletableFuture getProxyConnection(final URI redirectedClusterURI, + final InetSocketAddress logicalAddress, final int randomKeyForSelectConnection) { + +LookupService lookup = +redirectedClusterURI == null ? this.lookup : getLookup(redirectedClusterURI.toString()); Review Comment: Since these connections are cached and pooled, will this still work if the `logicalAddress` stays the same in the source and destination clusters? Or is that not possible? Otherwise, we might have to use an actually random key for connection selection. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [txn] Get previous position by managed ledger. [pulsar]
thetumbled commented on PR #22024: URL: https://github.com/apache/pulsar/pull/22024#issuecomment-1956009864 /pulsarbot rerun-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [improve] Reuse the internal writer/reader under the same system topic [pulsar]
poorbarcode opened a new issue, #22087: URL: https://github.com/apache/pulsar/issues/22087 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Motivation So far, Pulsar creates an internal producer to send the Pulsar Event and close it after it is sent immediately. The following Profile was captured when doing much topic creation and deletion. ![image](https://github.com/apache/pulsar/assets/25195800/aed99a04-51d5-46e9-b471-71285d9191e3) https://github.com/apache/pulsar/assets/25195800/1f466756-eefd-4334-b8f3-2023f4593806;> ### Solution We can reuse the same writer/reader to improve the performance. ### Alternatives _No response_ ### Anything else? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [txn] Get previous position by managed ledger. [pulsar]
dao-jun commented on code in PR #22024: URL: https://github.com/apache/pulsar/pull/22024#discussion_r1496938782 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java: ## @@ -287,8 +287,8 @@ private void handleTransactionMessage(TxnID txnId, Position position) { .checkAbortedTransaction(txnId)) { ongoingTxns.put(txnId, (PositionImpl) position); PositionImpl firstPosition = ongoingTxns.get(ongoingTxns.firstKey()); -//max read position is less than first ongoing transaction message position, so entryId -1 -maxReadPosition = PositionImpl.get(firstPosition.getLedgerId(), firstPosition.getEntryId() - 1); +// max read position is less than first ongoing transaction message position +maxReadPosition = getPreviousPosition(firstPosition); Review Comment: it makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Do not try to open ML when the topic meta does not exist and do not expect to create a new one. #21995 [pulsar]
dao-jun commented on PR #22004: URL: https://github.com/apache/pulsar/pull/22004#issuecomment-1955928386 @poorbarcode there is a test keeps failing, could you please take a check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-336: WASM Function API [pulsar]
RobertIndie commented on code in PR #21992: URL: https://github.com/apache/pulsar/pull/21992#discussion_r1496859025 ## pip/pip-331.md: ## @@ -0,0 +1,129 @@ +# PIP-331: WASM Function API + +# Background knowledge + +WASM(WebAssembly) bytecode is designed to be encoded in a size- and load-time-efficient binary format. WASM aims to leverage the common hardware features available on various platforms to execute in browsers at machine code speed. + +WASI(WebAssembly System Interface) provide a portable interface for applications that run within a constrained sandbox environment, which allows WASM to run in non browser environments such as Linux. It's portable and secure. + +# Motivation + +The server and client sides of the Pulsar function use protobuf for decoupling. In principle, the language supported by protobuf can be supported by the pulsar function, now Pulsar provided the java, python and golang function client, but there are still many languages that are not supported. + +Before all language adaptations are completed (and it's almost entirely certain to be impossible), users cannot write pulsar function in their familiar languages. + +# Goals + +## In Scope + +Other languages, as long as their code can be compiled into WASM bytecode (such as Rust/golang/C++), users can use these languages to write pulsar function. + +## Out of Scope + +All existing abilities of the Java pulsar function client are not reimplemented, the WASM Pulsar functions is under the Java Pulsar functions. + +Due to the strict requirements of WASM on parameter types and for simplicity reasons, types other than `java.lang.Long` are not used as parameters or return value. + +# High Level Design + +```mermaid +flowchart LR; + +subgraph develop +direction TB +SourceCode ==> |"CompileToWASM"| WasmFile ==> |"RenameFile"| MoveToTheResourceDirectory ==> UnitTest +end + +subgraph runtime +direction TB +PulsarFunctionJava ==> |"LoadFromResource"| TheWasmFile ==> |"Invoke"| TheSourceCode +end + +develop --> runtime +``` + +# Detailed Design + +## Design & Implementation Details + +1. add `WasmLoader` to load WASM file and provide the WASM function to java, also provide the java function to WASM if we need. + +2. add `AbstractWasmFunction` and `AbstractWasmWindowFunction` as the core interface of the WASM function api. + +```java +public abstract class AbstractWasmFunction extends WasmLoader implements Function { + +private static final String PROCESS_METHOD_NAME = "process"; + +protected static final String INITIALIZE_METHOD_NAME = "initialize"; + +protected static final String CLOSE_METHOD_NAME = "close"; + +protected static final Map> ARGUMENTS = new ConcurrentHashMap<>(); + +@Override +public T process(X input, Context context) { +return super.getWasmExtern(PROCESS_METHOD_NAME) +.map(process -> { +Long argumentId = callWASI(input, context, process); +return doProcess(input, context, argumentId); +}) +.orElseThrow(() -> new PulsarWasmException( +PROCESS_METHOD_NAME + " function not found in " + super.getWasmName())); +} + +private Long callWASI(X input, + Context context, + Extern process) { +// call WASI function +final Long argumentId = getArgumentId(input, context); +ARGUMENTS.put(argumentId, new Argument<>(input, context)); +// WASI cannot easily pass Java objects like JNI, here we pass Long +// then we can get the argument by Long +WasmFunctions.consumer(super.getStore(), process.func(), WasmValType.I64) +.accept(argumentId); +ARGUMENTS.remove(argumentId); +return argumentId; +} + +protected abstract T doProcess(X input, Context context, Long argumentId); + +protected abstract Long getArgumentId(X input, Context context); + +@Override +public void initialize(Context context) { +super.getWasmExtern(INITIALIZE_METHOD_NAME) +.ifPresent(initialize -> callWASI(null, context, initialize)); +} + +@Override +public void close() { +super.getWasmExtern(CLOSE_METHOD_NAME) +.ifPresent(close -> callWASI(null, null, close)); +super.close(); +} + +protected static class Argument { +protected X input; +protected Context context; + +private Argument(X input, Context context) { +this.input = input; +this.context = context; +} +} +} +``` + +More detailed code implementation and test can be found in [here](https://github.com/apache/pulsar/pull/21975) Review Comment: It's better to extract more design details from the code implementation to this proposal. -- This is an automated message from the Apache Git Service. To
(pulsar-client-cpp) branch main updated: [feat] PIP-188 Support blue-green migration (#402)
This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git The following commit(s) were added to refs/heads/main by this push: new 543e51c [feat] PIP-188 Support blue-green migration (#402) 543e51c is described below commit 543e51c7ecd842056f93859defd23b851bfe842e Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com> AuthorDate: Tue Feb 20 19:02:31 2024 -0800 [feat] PIP-188 Support blue-green migration (#402) ### Motivation Support blue-green migration pip-188 for cpp client ### Modifications - added blue-green client logic - register the producer instance in the producers map before sending produce creation command. This is required since broker could send topic migration command in the middle of creating the producer. --- lib/BinaryProtoLookupService.cc| 1 - lib/BinaryProtoLookupService.h | 8 +- lib/ClientConnection.cc| 57 ++- lib/ClientConnection.h | 9 +- lib/ClientImpl.cc | 56 +-- lib/ClientImpl.h | 15 +- lib/HTTPLookupService.cc | 4 +- lib/HTTPLookupService.h| 6 +- lib/HandlerBase.cc | 16 +- lib/HandlerBase.h | 4 + lib/LookupService.h| 3 + lib/ProducerImpl.cc| 5 +- lib/RetryableLookupService.h | 4 + lib/ServiceNameResolver.h | 15 +- proto/PulsarApi.proto | 16 ++ run-unit-tests.sh | 3 + tests/ClientTest.cc| 17 ++- tests/LookupServiceTest.cc | 32 ++-- tests/MockClientImpl.h | 6 +- tests/PulsarFriend.h | 12 +- .../docker-compose.yml | 40 ++--- tests/extensibleLM/ExtensibleLoadManagerTest.cc| 169 ++--- tests/extensibleLM/docker-compose.yml | 4 + 23 files changed, 356 insertions(+), 146 deletions(-) diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc index 2d9ffc4..489d8a2 100644 --- a/lib/BinaryProtoLookupService.cc +++ b/lib/BinaryProtoLookupService.cc @@ -22,7 +22,6 @@ #include "ConnectionPool.h" #include "LogUtils.h" #include "NamespaceName.h" -#include "ServiceNameResolver.h" #include "TopicName.h" DECLARE_LOG_OBJECT() diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h index a3c059e..6132825 100644 --- a/lib/BinaryProtoLookupService.h +++ b/lib/BinaryProtoLookupService.h @@ -38,9 +38,9 @@ using GetSchemaPromisePtr = std::shared_ptr>; class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { public: -BinaryProtoLookupService(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool, +BinaryProtoLookupService(const std::string& serviceUrl, ConnectionPool& pool, const ClientConfiguration& clientConfiguration) -: serviceNameResolver_(serviceNameResolver), +: serviceNameResolver_(serviceUrl), cnxPool_(pool), listenerName_(clientConfiguration.getListenerName()), maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()) {} @@ -54,6 +54,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { Future getSchema(const TopicNamePtr& topicName, const std::string& version) override; +ServiceNameResolver& getServiceNameResolver() override { return serviceNameResolver_; } + protected: // Mark findBroker as protected to make it accessible from test. LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic, @@ -63,7 +65,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService { std::mutex mutex_; uint64_t requestIdGenerator_ = 0; -ServiceNameResolver& serviceNameResolver_; +ServiceNameResolver serviceNameResolver_; ConnectionPool& cnxPool_; std::string listenerName_; const int32_t maxLookupRedirects_; diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 00041b2..0beb739 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -403,7 +403,8 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver:: LOG_INFO(cnxString_ << "Connected to broker"); } else { LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_ -<< ", proxy: " << proxyServiceUrl_); +
Re: [PR] [feat] PIP-188 Support blue-green migration [pulsar-client-cpp]
BewareMyPower merged PR #402: URL: https://github.com/apache/pulsar-client-cpp/pull/402 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] Cannot determine whether the message is a duplicate at this time [pulsar]
graysonzeng commented on issue #21892: URL: https://github.com/apache/pulsar/issues/21892#issuecomment-1955780645 @lhotari I haven't tested it, but I think it's not reproducible without brokerEntryMetadataInterceptors. Currently it seems that CompositeByteBuf will be generated in addBrokerEntryMetadata only when brokerEntryMetadataInterceptors are enabled. https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L1723 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch dependabot/maven/org.apache.commons-commons-compress-1.26.0 deleted (was bebe136c8e5)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/org.apache.commons-commons-compress-1.26.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git was bebe136c8e5 Bump org.apache.commons:commons-compress from 1.21 to 1.26.0 The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
Re: [PR] Bump org.apache.commons:commons-compress from 1.21 to 1.26.0 [pulsar]
massakam closed pull request #22084: Bump org.apache.commons:commons-compress from 1.21 to 1.26.0 URL: https://github.com/apache/pulsar/pull/22084 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Bump org.apache.commons:commons-compress from 1.21 to 1.26.0 [pulsar]
dependabot[bot] commented on PR #22084: URL: https://github.com/apache/pulsar/pull/22084#issuecomment-1955734863 OK, I won't notify you again about this release, but will get in touch when a new version is available. If you'd rather skip all updates until the next major or minor version, let me know by commenting `@dependabot ignore this major version` or `@dependabot ignore this minor version`. If you change your mind, just re-open this PR and I'll resolve any conflicts on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Bump org.apache.commons:commons-compress from 1.21 to 1.26.0 [pulsar]
massakam commented on PR #22084: URL: https://github.com/apache/pulsar/pull/22084#issuecomment-1955734766 I created another PR because the license files needed to be updated. https://github.com/apache/pulsar/pull/22086 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][sec] Upgrade commons-compress to 1.26.0 [pulsar]
massakam opened a new pull request, #22086: URL: https://github.com/apache/pulsar/pull/22086 ### Motivation commons-compress 1.21 has the following vulnerability and should be upgraded to 1.26.0. https://nvd.nist.gov/vuln/detail/CVE-2024-25710 ### Verifying this change - [ ] Make sure that the change passes the CI checks. ### Documentation - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][client] fixed getProxyConnection when the topic is migrated [pulsar]
heesung-sn opened a new pull request, #22085: URL: https://github.com/apache/pulsar/pull/22085 PIP: https://github.com/apache/pulsar/pull/20748 ### Motivation When a topic is migrated to a cluster with a proxy, it needs to use the proxy in the new cluster. ### Modifications - updated getProxyConnection func to select lookup service based on redirectedClusterURI. ### Verifying this change - [x] Make sure that the change passes the CI checks. ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]
dragosvictor commented on code in PR #22058: URL: https://github.com/apache/pulsar/pull/22058#discussion_r1496703560 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -241,8 +243,15 @@ public class BrokerService implements Closeable { protected final AtomicReference lookupRequestSemaphore; protected final AtomicReference topicLoadRequestSemaphore; +@PulsarDeprecatedMetric(newMetricName = "pulsar.broker.lookup.pending.request.usage") private final ObserverGauge pendingLookupRequests; +private final ObservableLongGauge pendingLookupRequestsCounter; +private final ObservableLongGauge pendingLookupRequestsLimit; + +@PulsarDeprecatedMetric(newMetricName = "pulsar.broker.topic.load.pending.request.usage") private final ObserverGauge pendingTopicLoadRequests; +private final ObservableLongGauge pendingTopicLoadRequestsCounter; Review Comment: Kept to using a counter for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Bump org.apache.commons:commons-compress from 1.21 to 1.26.0 [pulsar]
github-actions[bot] commented on PR #22084: URL: https://github.com/apache/pulsar/pull/22084#issuecomment-1955423832 @dependabot[bot] Please add the following content to your PR description and select a checkbox: ``` - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch dependabot/maven/org.apache.commons-commons-compress-1.26.0 created (now bebe136c8e5)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/maven/org.apache.commons-commons-compress-1.26.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git at bebe136c8e5 Bump org.apache.commons:commons-compress from 1.21 to 1.26.0 No new revisions were added by this update.
[PR] Bump org.apache.commons:commons-compress from 1.21 to 1.26.0 [pulsar]
dependabot[bot] opened a new pull request, #22084: URL: https://github.com/apache/pulsar/pull/22084 Bumps org.apache.commons:commons-compress from 1.21 to 1.26.0. [![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=org.apache.commons:commons-compress=maven=1.21=1.26.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot show ignore conditions` will show all of the ignore conditions of the specified dependency - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) You can disable automated security fix PRs for this repo from the [Security Alerts page](https://github.com/apache/pulsar/network/alerts). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Bump ip from 2.0.0 to 2.0.1 [pulsar-site]
github-actions[bot] commented on PR #793: URL: https://github.com/apache/pulsar-site/pull/793#issuecomment-1955226974 @dependabot[bot] Please add the following content to your PR description and select a checkbox: ``` - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-site) branch dependabot/npm_and_yarn/ip-2.0.1 created (now 1e96b6d397f4)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/npm_and_yarn/ip-2.0.1 in repository https://gitbox.apache.org/repos/asf/pulsar-site.git at 1e96b6d397f4 Bump ip from 2.0.0 to 2.0.1 No new revisions were added by this update.
[PR] Bump ip from 2.0.0 to 2.0.1 [pulsar-site]
dependabot[bot] opened a new pull request, #793: URL: https://github.com/apache/pulsar-site/pull/793 Bumps [ip](https://github.com/indutny/node-ip) from 2.0.0 to 2.0.1. Commits https://github.com/indutny/node-ip/commit/3b0994a74eca51df01f08c40d6a65ba0e1845d04;>3b0994a 2.0.1 https://github.com/indutny/node-ip/commit/32f468f1245574785ec080705737a579be1223aa;>32f468f lib: fixed CVE-2023-42282 and added unit test See full diff in https://github.com/indutny/node-ip/compare/v2.0.0...v2.0.1;>compare view [![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=ip=npm_and_yarn=2.0.0=2.0.1)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- Dependabot commands and options You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot merge` will merge this PR after your CI passes on it - `@dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@dependabot cancel merge` will cancel a previously requested merge and block automerging - `@dependabot reopen` will reopen this PR if it is closed - `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@dependabot show ignore conditions` will show all of the ignore conditions of the specified dependency - `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) You can disable automated security fix PRs for this repo from the [Security Alerts page](https://github.com/apache/pulsar-site/network/alerts). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Flaky-test: ExtensibleLoadManagerImplTest.testGetMetrics [pulsar]
MMirelli commented on issue #21556: URL: https://github.com/apache/pulsar/issues/21556#issuecomment-1955127796 After some troubleshooting ([failure*.txt reproduce](https://github.com/MMirelli/pulsar-flake-troubleshooter/tree/9e7b9e2d33d9283dded0f63653d08a92902dbe09/fix-21556/data) the issue), the test seems to have some concurrency issues. From the runs I did I could gather the following stats: ``` | Metric | Runs| Counter | |:---:|:---:|:---:| | brk_sunit_state_chn_subscribe_ops_total | 1,10| handlerCounters | | brk_sunit_state_chn_owner_lookup_total | 2,3,6,9 | ownerLookUpCounters | | brk_lb_assign_broker_breakdown_total| 4,5,7,8 | breakdownCounters | ``` Where, for example, the first row reads: runs 1 and 10 showed an issue with metric named `brk_sunit_state_chn_subscribe_ops_total`, which is stored in `handlerCounters`. It seems that most of the issues might be caused by the fact that we are not using concurrent collections and / or lack of synchronization. As they are metrics we may anyway want to skip synchronization, so I am unsure whether it makes sense to put the time to fix (all) these issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]
dragosvictor commented on code in PR #22058: URL: https://github.com/apache/pulsar/pull/22058#discussion_r1496530720 ## pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java: ## @@ -178,6 +188,49 @@ public void testMultipleBrokerLookup() throws Exception { doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class)); loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1)); +var metricReader = pulsarTestContext.getOpenTelemetryMetricReader(); Review Comment: > I like the `CountDownLatch` recommendation, I agree it will improve readability here. Just realized that this wouldn't work either, since we want to validate that the metric is only updated once the semaphore releases a permit. It cannot be intercepted on the main testing thread either, since it's happening during the direct call to `Consumer.subscribe` within the test. We'd end up with less readable code if we were to pursue that idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]
dragosvictor commented on code in PR #22058: URL: https://github.com/apache/pulsar/pull/22058#discussion_r1496530720 ## pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java: ## @@ -178,6 +188,49 @@ public void testMultipleBrokerLookup() throws Exception { doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class)); loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1)); +var metricReader = pulsarTestContext.getOpenTelemetryMetricReader(); Review Comment: > I like the `CountDownLatch` recommendation, I agree it will improve readability here. Just realized that this wouldn't work either, since we want to validate that the metric is only updated once we semaphore releases a permit. It cannot be intercepted on the main testing thread either, since it's happening during the direct call to `Consumer.subscribe` within the test. We'd end up with less readable code if we were to pursue that idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]
dragosvictor commented on code in PR #22058: URL: https://github.com/apache/pulsar/pull/22058#discussion_r1496322969 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java: ## @@ -205,16 +236,21 @@ public CompletableFuture> getBrokerServiceUrlAsync(TopicN }); future.thenAccept(optResult -> { -lookupLatency.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); +var latencyNs = System.nanoTime() - startTime; +lookupLatency.observe(latencyNs, TimeUnit.NANOSECONDS); + lookupLatencyHistogram.record(MetricsUtil.convertToSeconds(latencyNs, TimeUnit.NANOSECONDS)); Review Comment: The problem is that they'd end up being 0 most of the time. If we're to standardize on seconds being the unit of time (as OpenTelemetry recommends, and I think we should), these numbers must be doubles, but `TimeUnit.toSeconds` 'speaks' `long` only, and we'd lose virtually all precision when the numbers are small enough. I'll add a comment to the utility class to explain this decision. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java: ## @@ -312,6 +313,8 @@ public PulsarService(ServiceConfiguration config, TransactionBatchedWriteValidator.validate(config); this.config = config; +this.openTelemetry = new PulsarBrokerOpenTelemetry(config); Review Comment: I would, but it's being replaced in the tests: https://github.com/apache/pulsar/pull/22058/files#diff-f6308e8022fc0713d66fa883d166e7b9189865b3c8ff9b79aaccaf97d69f2f25R56-R57. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws this.defaultServerBootstrap = defaultServerBootstrap(); this.pendingLookupRequests = ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-") -.supplier(() -> pulsar.getConfig().getMaxConcurrentLookupRequest() -- lookupRequestSemaphore.get().availablePermits()) +.supplier(this::getPendingLookupRequest) .register(); +this.pendingLookupRequestsCounter = pulsar.getOpenTelemetry().getMeter() +.gaugeBuilder("pulsar.broker.lookup.pending.request.usage") +.ofLongs() +.setDescription("The number of pending lookup requests in the broker. " ++ "When it reaches threshold \"maxConcurrentLookupRequest\" defined in broker.conf, " ++ "new requests are rejected.") +.buildWithCallback(measurement -> measurement.record(getPendingLookupRequest())); +this.pendingLookupRequestsLimit = pulsar.getOpenTelemetry().getMeter() +.gaugeBuilder("pulsar.broker.lookup.pending.request.limit") +.ofLongs() +.setDescription("The maximum number of pending lookup requests in the broker. " ++ "Equal to \"maxConcurrentLookupRequest\" defined in broker.conf.") +.buildWithCallback( +measurement -> measurement.record(pulsar.getConfig().getMaxConcurrentLookupRequest())); this.pendingTopicLoadRequests = ObserverGauge.build( -"pulsar_broker_topic_load_pending_requests", "-") -.supplier(() -> pulsar.getConfig().getMaxConcurrentTopicLoadRequest() -- topicLoadRequestSemaphore.get().availablePermits()) +"pulsar_broker_topic_load_pending_requests", "-") +.supplier(this::getPendingTopicLoadRequests) .register(); +this.pendingTopicLoadRequestsCounter = pulsar.getOpenTelemetry().getMeter() +.gaugeBuilder("pulsar.broker.topic.load.pending.request.usage") Review Comment: Name changes sound good! Note that the same can be said about the lookup request, as it is being used in requests other than topic lookup: [handlePartitionMetadataRequest](https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L579C20-L579C50) , [handleGetTopicsOfNamespace](https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L2391C20-L2391C46) and [handleCommandWatchTopicList](https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L3036C20-L3036C47). Perhaps we can rename it to `pulsar.broker.topic.lookup.operation.pending.[usage,limit]`? ## pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java: ## @@
[I] [Bug] Issue in using relational db mysql and postgres as source for pulsar io topics [pulsar]
abhinavisin opened a new issue, #22083: URL: https://github.com/apache/pulsar/issues/22083 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version Ubuntu 18.04 Pulsar 3.0.1 Mysql - Server version: 5.7.25-log MySQL Community Server (GPL) Postgres - 14.10 ### Minimal reproduce step Step 1. Follow the steps documented for bringing CDC connectors outlined in example https://pulsar.apache.org/docs/3.0.x/io-cdc-debezium/ Docker run mysql instance Step 2. Start a standalone pulsar instance bin/pulsar standalone Step 3. Start the Pulsar Debezium connector in local run mode Attached yaml file (debezium connector config) and logs of execution Step 4. No topics created for any table from "inventory" schema Only offset and history topics are created in [pulsar-debezium-mysql.log](https://github.com/apache/pulsar/files/14351115/pulsar-debezium-mysql.log) pulsar [debezium-mysql-source-config.txt](https://github.com/apache/pulsar/files/14351137/debezium-mysql-source-config.txt) Above was repeated for postgres and same results. **However if using pulsar 2.6.0 all topics and tables are created successfully.** ### What did you expect to see? No topic created for any table of the mysql and postgres in pulsar 3.0.1. Only following topics created for mysql table- persistent://public/default/offset-topic persistent://public/default/history-topic ### What did you see instead? Subscribe to the topic sub-products for the inventory.products table and get change data from relational db. ### Anything else? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][Offload] fix indexEntries NullPointerException error [pulsar]
lhotari commented on PR #22035: URL: https://github.com/apache/pulsar/pull/22035#issuecomment-1955057667 > > > > > The `close` method maybe be called more than once, so cause the issue. I think we can just let `indexEntries=null`, no need to `clear` it > > > > > > > > > > > > Wouldn't that be a problem if the object instance gets recycled multiple times? > > > > > > > > > maybe > > > > There have been bugs in the past with recycled objects that are caused by releasing the object multiple times. > > yes, but for this patch, it's ok to fix it like this, right? I doubt that it's correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Release 0.5.3] Update latest version in README.adoc to 0.5.3 [pulsar-client-reactive]
lhotari merged PR #162: URL: https://github.com/apache/pulsar-client-reactive/pull/162 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-client-reactive) branch main updated: [Release 0.5.3] Update latest version in README.adoc to 0.5.3 (#162)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git The following commit(s) were added to refs/heads/main by this push: new 6221cec [Release 0.5.3] Update latest version in README.adoc to 0.5.3 (#162) 6221cec is described below commit 6221cecc4a18619f869d65c3888f41184940a6f4 Author: Chris Bono AuthorDate: Tue Feb 20 14:22:09 2024 -0600 [Release 0.5.3] Update latest version in README.adoc to 0.5.3 (#162) Co-authored-by: onobc --- README.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.adoc b/README.adoc index 8cdc631..aa7f879 100644 --- a/README.adoc +++ b/README.adoc @@ -19,7 +19,7 @@ = Reactive client for Apache Pulsar :github: https://github.com/apache/pulsar-client-reactive -:latest_version: 0.5.2 +:latest_version: 0.5.3 Reactive client for Apache Pulsar which is compatible with the Reactive Streams specification. This uses Project Reactor as the Reactive Streams implementation.
(pulsar-client-reactive) branch main updated: [Release 0.5.3] Update next snapshot version to 0.5.4-SNAPSHOT (#163)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git The following commit(s) were added to refs/heads/main by this push: new c39a5f8 [Release 0.5.3] Update next snapshot version to 0.5.4-SNAPSHOT (#163) c39a5f8 is described below commit c39a5f8dd368daf0c99e06707a09c9777f67e47f Author: Chris Bono AuthorDate: Tue Feb 20 13:34:27 2024 -0600 [Release 0.5.3] Update next snapshot version to 0.5.4-SNAPSHOT (#163) Co-authored-by: onobc --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index f08e395..d0e9728 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,4 +17,4 @@ # under the License. # -version=0.5.3 +version=0.5.4-SNAPSHOT
Re: [PR] [Release 0.5.3] Update next snapshot version to 0.5.4-SNAPSHOT [pulsar-client-reactive]
lhotari merged PR #163: URL: https://github.com/apache/pulsar-client-reactive/pull/163 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Set ServiceUnitStateChannel topic compaction threshold explicitly, improve getOwnerAsync, and fix other bugs [pulsar]
heesung-sn commented on code in PR #22064: URL: https://github.com/apache/pulsar/pull/22064#discussion_r1496263801 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java: ## @@ -894,14 +920,20 @@ private List getIgnoredCommandMetrics(String advertisedBrokerAddress) { return List.of(metric); } -private void monitor() { +@VisibleForTesting +protected void monitor() { try { initWaiter.await(); // Monitor role // Periodically check the role in case ZK watcher fails. var isChannelOwner = serviceUnitStateChannel.isChannelOwner(); if (isChannelOwner) { +// System topic config might fail due to the race condition +// with topic policy init(Topic policies cache have not init). +if (!configuredSystemTopics) { Review Comment: > Why this change can fix this issue? I see that topic policy init failure `TopicPoliciesCacheNotInitException` in the below code when this LB is trying to set the compaction threshold. So, I think it will be fool-proof to retry setting compaction threshold policy upon such errors(until the policy cache is fully initialized). https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L234 > Can you enable the topic-level policies on the integration test( Sure. I can do this. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java: ## @@ -150,9 +156,34 @@ public synchronized void init() throws IOException { start(); } -private synchronized void validateTableViewStart() { -if (tableView == null) { -throw new IllegalStateException("table view has not been started"); +private void validateProducer() { +if (producer == null || !producer.isConnected()) { +try { +if (producer != null) { +producer.close(); +} +producer = null; +startProducer(); +log.info("Restarted producer on {}", topic); +} catch (Exception e) { +log.error("Failed to restart producer on {}", topic, e); +throw new RuntimeException(e); +} +} +} + +private void validateTableView() { +if (tableView == null || System.currentTimeMillis() - tableViewLastUpdateTimestamp +> ((long) conf.getLoadBalancerReportUpdateMaxIntervalMinutes()) * 60 * 1000 * 2) { +tableViewLastUpdateTimestamp = 0; +try { +closeTableView(); +startTableView(); +log.info("Restarted tableview on {}", topic); Review Comment: updated. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java: ## @@ -575,7 +601,7 @@ private CompletableFuture> dedupeLookupRequest( if (ex != null) { assignCounter.incrementFailure(); } -lookupRequests.remove(key, newFutureCreated.getValue()); +lookupRequests.remove(key); Review Comment: I think this change is fool-proof in case the lookupRequests values are replaced just in case. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java: ## @@ -150,9 +156,34 @@ public synchronized void init() throws IOException { start(); } -private synchronized void validateTableViewStart() { -if (tableView == null) { -throw new IllegalStateException("table view has not been started"); +private void validateProducer() { +if (producer == null || !producer.isConnected()) { +try { +if (producer != null) { +producer.close(); +} +producer = null; +startProducer(); +log.info("Restarted producer on {}", topic); +} catch (Exception e) { +log.error("Failed to restart producer on {}", topic, e); +throw new RuntimeException(e); +} +} +} + +private void validateTableView() { +if (tableView == null || System.currentTimeMillis() - tableViewLastUpdateTimestamp +> ((long) conf.getLoadBalancerReportUpdateMaxIntervalMinutes()) * 60 * 1000 * 2) { Review Comment: done. ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java: ## @@ -304,8 +304,14 @@ private void readTailMessages(Reader reader) { log.error("Reader {} was
Re: [PR] [fix][broker] Support running docker container with gid != 0 [pulsar]
codecov-commenter commented on PR #22081: URL: https://github.com/apache/pulsar/pull/22081#issuecomment-1954783138 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22081?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) Report All modified and coverable lines are covered by tests :white_check_mark: > Comparison is base [(`0b6bd70`)](https://app.codecov.io/gh/apache/pulsar/commit/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) 73.57% compared to head [(`912d41e`)](https://app.codecov.io/gh/apache/pulsar/pull/22081?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) 73.65%. Additional details and impacted files [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/22081/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22081?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) ```diff @@ Coverage Diff @@ ## master #22081 +/- ## + Coverage 73.57% 73.65% +0.07% - Complexity3255332583 +30 Files 1874 1874 Lines139252 139252 Branches 1526015260 + Hits 102451 102560 +109 + Misses2887728782 -95 + Partials 7924 7910 -14 ``` | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/22081/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | Coverage Δ | | |---|---|---| | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/22081/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `24.67% <ø> (-0.16%)` | :arrow_down: | | [systests](https://app.codecov.io/gh/apache/pulsar/pull/22081/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `24.42% <ø> (-0.04%)` | :arrow_down: | | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/22081/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) | `72.90% <ø> (+0.07%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. [see 80 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/22081/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [Release 0.5.3] Update next snapshot version to 0.5.4-SNAPSHOT [pulsar-client-reactive]
onobc opened a new pull request, #163: URL: https://github.com/apache/pulsar-client-reactive/pull/163 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Expire messages according to ledger close time to avoid client clock skew [pulsar]
dao-jun commented on PR #21940: URL: https://github.com/apache/pulsar/pull/21940#issuecomment-1954763943 seems there is a test keeps failing, please check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [Release 0.5.3] Update latest version in README.adoc to 0.5.3 [pulsar-client-reactive]
onobc opened a new pull request, #162: URL: https://github.com/apache/pulsar-client-reactive/pull/162 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][Offload] fix indexEntries NullPointerException error [pulsar]
Technoboy- commented on PR #22035: URL: https://github.com/apache/pulsar/pull/22035#issuecomment-1954678062 > > > > The `close` method maybe be called more than once, so cause the issue. I think we can just let `indexEntries=null`, no need to `clear` it > > > > > > > > > Wouldn't that be a problem if the object instance gets recycled multiple times? > > > > > > maybe > > There have been bugs in the past with recycled objects that are caused by releasing the object multiple times. yes, but for this patch, it's ok to fix it like this, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]
Technoboy- commented on code in PR #22058: URL: https://github.com/apache/pulsar/pull/22058#discussion_r1496197624 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java: ## @@ -205,16 +236,21 @@ public CompletableFuture> getBrokerServiceUrlAsync(TopicN }); future.thenAccept(optResult -> { -lookupLatency.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); +var latencyNs = System.nanoTime() - startTime; +lookupLatency.observe(latencyNs, TimeUnit.NANOSECONDS); + lookupLatencyHistogram.record(MetricsUtil.convertToSeconds(latencyNs, TimeUnit.NANOSECONDS)); Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Sync data/team.js with https://whimsy.apache.org/roster/committee/pulsar.json [pulsar-site]
lhotari merged PR #792: URL: https://github.com/apache/pulsar-site/pull/792 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-site) branch main updated: Sync data/team.js with https://whimsy.apache.org/roster/committee/pulsar.json (#792)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new aaaffa8d6741 Sync data/team.js with https://whimsy.apache.org/roster/committee/pulsar.json (#792) aaaffa8d6741 is described below commit aaaffa8d674107021fd7f8bea7c880ea69f074fd Author: Lari Hotari AuthorDate: Tue Feb 20 18:40:05 2024 +0200 Sync data/team.js with https://whimsy.apache.org/roster/committee/pulsar.json (#792) --- data/team.js | 12 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/data/team.js b/data/team.js index ed9ffe074b9b..acd41b2f8c50 100644 --- a/data/team.js +++ b/data/team.js @@ -20,6 +20,10 @@ module.exports = { "name": "Hang Chen", "apacheId": "chenhang" }, +{ + "name": "David Jensen", + "apacheId": "djensen" +}, { "name": "Enrico Olivelli", "apacheId": "eolivelli" @@ -174,6 +178,10 @@ module.exports = { "name": "Aloys Zhang", "apacheId": "aloyszhang" }, +{ + "name": "Asaf Mesika", + "apacheId": "amesika" +}, { "name": "Andrey Yegorov", "apacheId": "ayegorov" @@ -202,10 +210,6 @@ module.exports = { "name": "Dezhi Liu", "apacheId": "dezhiliu" }, -{ - "name": "David Jensen", - "apacheId": "djensen" -}, { "name": "Yuri Mizushima", "apacheId": "equanz"
[PR] Sync data/team.js with https://whimsy.apache.org/roster/committee/pulsar.json [pulsar-site]
lhotari opened a new pull request, #792: URL: https://github.com/apache/pulsar-site/pull/792 Process used to update `data/team.js` file: 1. Logged in to https://whimsy.apache.org/roster/committee/pulsar with browser 2. Appended ".json" to URL so that browser goes to https://whimsy.apache.org/roster/committee/pulsar.json 3. Clicked "Save as..." and stored the JSON as ~/Downloads/pulsar.json 4. Ran this command in a bash shell: `{ echo -n "module.exports = " && cat ~/Downloads/pulsar.json | jq '{"pmc": [.roster| to_entries | sort_by(.key) | .[] | select(.value.role|startswith("PMC")) | {"name":.value.name, "apacheId": .key}], "committers": [.roster| to_entries | sort_by(.key) | .[] | select(.value.role=="Committer") | {"name":.value.name, "apacheId": .key}]}' } | perl -pe 's/$/;\n/ if eof' > data/team.js` - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][broker] Support running docker container with gid != 0 [pulsar]
lhotari opened a new pull request, #22081: URL: https://github.com/apache/pulsar/pull/22081 ### Motivation Currently Pulsar's docker image must be run with gid=0. There are environments where the group id 0 is prohibited by default. One example is Tanzu Kubernetes Grid <=1.24 where a default Pod Security Policy called `vmware-system-restricted` is used. That PSP contains this type of rule: ```yaml supplementalGroups: rule: MustRunAs ranges: - min: 1 max: 65535 runAsUser: rule: MustRunAsNonRoot fsGroup: rule: MustRunAs ranges: - min: 1 max: 65535 ``` In this case, it's not possible to use Pulsar's docker image since Pulsar needs write access to a few directories. ### Modifications change the owner of the writable directories to user id 1. This will allow Tanzu to work with this type of securityContext for each Pulsar component (Broker, Zookeeper, Bookkeeper) ``` securityContext: runAsNonRoot: true runAsGroup: 10001 fsGroup: 10001 runAsUser: 1 ``` ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
svn commit: r67458 - /dev/pulsar/pulsar-client-reactive-0.5.3-candidate-1/ /release/pulsar/pulsar-client-reactive-0.5.3/
Author: lhotari Date: Tue Feb 20 15:35:32 2024 New Revision: 67458 Log: Release Reactive client for Apache Pulsar 0.5.3 Added: release/pulsar/pulsar-client-reactive-0.5.3/ - copied from r67457, dev/pulsar/pulsar-client-reactive-0.5.3-candidate-1/ Removed: dev/pulsar/pulsar-client-reactive-0.5.3-candidate-1/
(pulsar-client-reactive) annotated tag v0.5.3 updated (8c8d087 -> 1d8c34e)
This is an automated email from the ASF dual-hosted git repository. onobc pushed a change to annotated tag v0.5.3 in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git *** WARNING: tag v0.5.3 was modified! *** from 8c8d087 (commit) to 1d8c34e (tag) tagging 8c8d087abaa04c413150d6931451996e9fad8c2c (commit) replaces v0.5.2 by onobc on Tue Feb 20 09:15:29 2024 -0600 - Log - Release v0.5.3 -BEGIN PGP SIGNATURE- iQJFBAABCgAvFiEEuMw+Nf/n/RvndvFepQBgfx6cBFcFAmXUwhERHG9ub2JjQGFw YWNoZS5vcmcACgkQpQBgfx6cBFdNnRAAiAc3jDGamVvZ4Z4BNDMph3Q298wzzVZ+ vIhHXb1PUy6aOg6dKOVWJIA6xvVYMf/YumhRZzwDESFtodSJDvjXvGwG7cjUQaN3 Gw3vNn6dKckxRYLVatl+q1bElDT6kn7SxyY5JnkqzllnBPBboyfpLEdjU4GRsRqh a0JbXLOhWoEwPu+djxbnZiMV9rmnPH4wDHVaJuDP/T3qt9/GXuJNPXjVr5PqT4Yz zmtCKlahsLZ97VjqyQov6a7TDbtxgRzy79FxF4B1a3ICUlzmElR9UcY/7XCorPEv hDMFbQu+xBXQ12pMhW+r2K8hJph4ODZpcCkg4nQ7jEw/eh1QWxCsy3bXqfwnMcJB 85mBAEkfwymF4kr1U+8SlXFZqNAU0VFJvFdg6hW8ZmyhAZsFmWp+lBE8QFrpGQSz CXb98ei3mvK/+KheOWxYVxcZgSMHGWxf0fZME5CLt6t8dTbAkbR8mkXBaiL3pKU9 smP5SsKPbhu35z1iya9F5eON0M6w9IYVs4PYG0eiL4jv6cWSVPR+ADMO83mdcTWX H/LQUZoQJib99isx0jjNiFyEznb/Mp1k2ycEv0Ocp/H3B5hx2rIHsbJqjvFV+ivh p8uNfDpF2egST3aw0erfdj5wX9UD3mqwKhjNNeV1Yjxfxh47mW8cXwc3XDaJ2U+w 3kOYJWXDaYs= =3qFw -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
[I] [Question] AWS Lambda [pulsar-client-node]
jonasgroendahl opened a new issue, #367: URL: https://github.com/apache/pulsar-client-node/issues/367 Has anyone managed to get this working in an AWS lambda environment? Perhaps as a AWS lambda layer? I'm very curious since I'm struggling at the moment how to approach this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] fix broker unackmessages become a negative number [pulsar]
poorbarcode commented on PR #17003: URL: https://github.com/apache/pulsar/pull/17003#issuecomment-1954332047 Just explain the issue that the current PR fixed - the issue may lead `unackedMessages` of the consumer being larger than the correct value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Expire messages according to ledger close time to avoid client clock skew [pulsar]
codelipenghui commented on code in PR #21940: URL: https://github.com/apache/pulsar/pull/21940#discussion_r1495854131 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java: ## @@ -81,9 +81,9 @@ public boolean expireMessages(int messageTTLInSeconds) { if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName, messageTTLInSeconds); -if (checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds)) { -return true; -} +// First filter the entire Ledger reached TTL based on the Ledger closing time to avoid client clock skew +checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds); Review Comment: It looks like the returned value is not used by any places. Do we need to change it to `void`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] Update release docker image in release process [pulsar-site]
RobertIndie merged PR #745: URL: https://github.com/apache/pulsar-site/pull/745 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-site) branch main updated: [fix] Update release docker image in release process (#745)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new af2fdb6a147c [fix] Update release docker image in release process (#745) af2fdb6a147c is described below commit af2fdb6a147c2afa53f921c7a158a282cd8e1fea Author: Zike Yang AuthorDate: Tue Feb 20 21:25:14 2024 +0800 [fix] Update release docker image in release process (#745) PIP: https://github.com/apache/pulsar/pull/21872 The current release docker images process doesn't work for Pulsar above versions 3.0.0. Pulsar has added arm and amd arch supports for the docker image. If we use the original command to push the image, it will push only one arch. We recommend use tools like regctl to push images. For the latest tag of the pulsar image, this PR proposes to use the last feature release version or the patch release of the last feature release as the `latest` tag. - Co-authored-by: Penghui Li --- contribute/release-process.md | 18 +- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/contribute/release-process.md b/contribute/release-process.md index 12a25fb74700..1583bf56d270 100644 --- a/contribute/release-process.md +++ b/contribute/release-process.md @@ -319,20 +319,20 @@ Promote the Maven staging repository for release. Login to `https://repository.a ### Release Docker images -Copy the approved candidate docker images from your personal account to apachepulsar org. +Please ensure that the regctl tools have been properly installed. They can be obtained from the following link: https://github.com/regclient/regclient/blob/main/docs/install.md + +Copy the approved candidate Docker images from your personal account to the apachepulsar organization: ```bash -PULSAR_VERSION=2.x.x +PULSAR_VERSION=3.x.x OTHER_DOCKER_USER=otheruser -for image in pulsar pulsar-all pulsar-grafana pulsar-standalone; do -docker pull "${OTHER_DOCKER_USER}/$image:${PULSAR_VERSION}" && { - docker tag "${OTHER_DOCKER_USER}/$image:${PULSAR_VERSION}" "apachepulsar/$image:${PULSAR_VERSION}" - echo "Pushing apachepulsar/$image:${PULSAR_VERSION}" - docker push "apachepulsar/$image:${PULSAR_VERSION}" -} -done +CANDIDATE_TAG=3.x.x-80fb390 +regctl image copy ${OTHER_DOCKER_USER}/pulsar:${CANDIDATE_TAG} apachepulsar/pulsar:${PULSAR_VERSION} +regctl image copy ${OTHER_DOCKER_USER}/pulsar-all:${CANDIDATE_TAG} apachepulsar/pulsar-all:${PULSAR_VERSION} ``` +If this release is a feature release or a patch release of the last feature release, you should also push these images to the `latest` tag. + If you don't have the permission, you can ask someone with access to apachepulsar org to do that. ### Update project version
Re: [PR] [fix][broker] Set ServiceUnitStateChannel topic compaction threshold explicitly, improve getOwnerAsync, and fix other bugs [pulsar]
Technoboy- commented on code in PR #22064: URL: https://github.com/apache/pulsar/pull/22064#discussion_r1495725504 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java: ## @@ -150,9 +156,34 @@ public synchronized void init() throws IOException { start(); } -private synchronized void validateTableViewStart() { -if (tableView == null) { -throw new IllegalStateException("table view has not been started"); +private void validateProducer() { +if (producer == null || !producer.isConnected()) { +try { +if (producer != null) { +producer.close(); +} +producer = null; +startProducer(); +log.info("Restarted producer on {}", topic); +} catch (Exception e) { +log.error("Failed to restart producer on {}", topic, e); +throw new RuntimeException(e); +} +} +} + +private void validateTableView() { +if (tableView == null || System.currentTimeMillis() - tableViewLastUpdateTimestamp +> ((long) conf.getLoadBalancerReportUpdateMaxIntervalMinutes()) * 60 * 1000 * 2) { Review Comment: we can define a static field -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Set ServiceUnitStateChannel topic compaction threshold explicitly, improve getOwnerAsync, and fix other bugs [pulsar]
Technoboy- commented on code in PR #22064: URL: https://github.com/apache/pulsar/pull/22064#discussion_r1495704203 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java: ## @@ -575,7 +601,7 @@ private CompletableFuture> dedupeLookupRequest( if (ex != null) { assignCounter.incrementFailure(); } -lookupRequests.remove(key, newFutureCreated.getValue()); +lookupRequests.remove(key); Review Comment: why modify here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Support ack timeout [pulsar-client-go]
imqishi commented on issue #217: URL: https://github.com/apache/pulsar-client-go/issues/217#issuecomment-1954033612 > The ack timeout referred here is not the timeout on the ack operation, rather the timeout on the application processing a message. > > Regardless, it was a mistake to add in Java API, and that’s why we didn’t add it in the new Go API. > > Negative acks are better because they convey the application intention on the outcome of processing a message. > > There’s no context or timeout on the ack/nack operations because these happen in background. > > If these fail, the message will be resent and the application will have the chance to ack/nack again. hi, I want to know if there's no ack-timeout and consume exit without nack, when will the message be resent? If there's situation a long process message cause multiple retry? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-site) branch add-push-image created (now 8fbe1bce6d07)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch add-push-image in repository https://gitbox.apache.org/repos/asf/pulsar-site.git at 8fbe1bce6d07 Remove statement for the `lts` tag which has been proposed thorugh the PIP This branch includes the following new commits: new 8fbe1bce6d07 Remove statement for the `lts` tag which has been proposed thorugh the PIP The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(pulsar-site) 01/01: Remove statement for the `lts` tag which has been proposed thorugh the PIP
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch add-push-image in repository https://gitbox.apache.org/repos/asf/pulsar-site.git commit 8fbe1bce6d078e5d09441d635e99741fc5ec8d65 Author: Zike Yang AuthorDate: Tue Feb 20 18:41:44 2024 +0800 Remove statement for the `lts` tag which has been proposed thorugh the PIP --- contribute/release-process.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/contribute/release-process.md b/contribute/release-process.md index d1b45ac86ae3..cc8ad9fb8b21 100644 --- a/contribute/release-process.md +++ b/contribute/release-process.md @@ -334,8 +334,6 @@ regctl image copy ${OTHER_DOCKER_USER}/pulsar-all:${CANDIDATE_TAG} apachepulsar/ If this release is a feature release or a patch release of the last feature release, you should also push these images to the `latest` tag. -If this release is a LTS release or a patch release of the last LTS release, you should also push these images to the `lts` tag. - If you don't have the permission, you can ask someone with access to apachepulsar org to do that. ### Update project version
[PR] [WIP] Support partitioned topic reader [pulsar-client-go]
RobertIndie opened a new pull request, #1178: URL: https://github.com/apache/pulsar-client-go/pull/1178 Master Issue: #1177 ### Motivation Currently, there is an issue with the reader implementation. If the reader is creating, it won't get the topic metadata from the topic. The reader can only read messages from a single topic. If the topic is a partitioned topic, the reader won't know that and will try to create a non-partition topic with the same name. And it will lead to this issue: https://github.com/apache/pulsar/issues/22032 ### Modifications - Support partitioned topic reader ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API: (yes / no) - The schema: (yes / no / don't know) - The default values of configurations: (yes / no) - The wire protocol: (yes / no) ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][sec] Validate the user input avoiding unexpected truncation of user-controlled arithmetic data [pulsar]
liangyepianzhou closed pull request #22073: [fix][sec] Validate the user input avoiding unexpected truncation of user-controlled arithmetic data URL: https://github.com/apache/pulsar/pull/22073 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][sec] Missing JWT signature check [pulsar]
liangyepianzhou closed pull request #22075: [fix][sec] Missing JWT signature check URL: https://github.com/apache/pulsar/pull/22075 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][sec] implicit narrowing conversion in compound assignment [pulsar]
liangyepianzhou closed pull request #22074: [fix][sec] implicit narrowing conversion in compound assignment URL: https://github.com/apache/pulsar/pull/22074 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][sec] Arbitrary file access during archive extraction ("Zip Slip") [pulsar]
liangyepianzhou closed pull request #22077: [fix][sec] Arbitrary file access during archive extraction ("Zip Slip") URL: https://github.com/apache/pulsar/pull/22077 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][sec] Information exposure through a stack trace [pulsar]
liangyepianzhou closed pull request #22080: [fix][sec] Information exposure through a stack trace URL: https://github.com/apache/pulsar/pull/22080 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][sec] Information exposure through a stack trace [pulsar]
liangyepianzhou opened a new pull request, #22080: URL: https://github.com/apache/pulsar/pull/22080 Fixes https://github.com/apache/pulsar/security/code-scanning/33 ### Motivation Software developers often add stack traces to error messages, as a debugging aid. Whenever that error message occurs for an end user, the developer can use the stack trace to help identify how to fix the problem. In particular, stack traces can tell the developer more about the sequence of events that led to a failure, as opposed to merely the final state of the software when the error occurred. Unfortunately, the same information can be useful to an attacker. The sequence of class names in a stack trace can reveal the structure of the application as well as any internal components it relies on. Furthermore, the error message at the top of a stack trace can include information such as server-side file names and SQL code that the application relies on, allowing an attacker to fine-tune a subsequent injection attack. ### Modifications Send the user a more generic error message that reveals less information. Either suppress the stack trace entirely, or log it only on the server. ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][sec] Regular expression injection [pulsar]
lhotari closed pull request #22079: [fix][sec] Regular expression injection URL: https://github.com/apache/pulsar/pull/22079 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][sec] Regular expression injection [pulsar]
lhotari commented on PR #22079: URL: https://github.com/apache/pulsar/pull/22079#issuecomment-1953837658 this duplicates #22057. Please check the discussion on that PR. I'll close this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][sec] Regular expression injection [pulsar]
liangyepianzhou opened a new pull request, #22079: URL: https://github.com/apache/pulsar/pull/22079 Fixes https://github.com/apache/pulsar/security/code-scanning/26, https://github.com/apache/pulsar/security/code-scanning/25, https://github.com/apache/pulsar/security/code-scanning/24, https://github.com/apache/pulsar/security/code-scanning/23 ### Motivation Constructing a regular expression with unsanitized user input is dangerous as a malicious user may be able to modify the meaning of the expression. In particular, such a user may be able to provide a regular expression fragment that takes exponential time in the worst case, and use that to perform a Denial of Service attack. ### Modifications use a sanitization function such as `Pattern.quote` to escape meta-characters that have special meaning. ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] Multi-topics reader support [pulsar-client-go]
RobertIndie opened a new issue, #1177: URL: https://github.com/apache/pulsar-client-go/issues/1177 **Is your feature request related to a problem? Please describe.** Currently, there is an issue with the reader implementation. If the reader is creating, it won't get the topic metadata from the topic. The reader can only read messages from a single topic. If the topic is a partitioned topic, the reader won't know that and will try to create a non-partition topic with the same name. And it will lead to this issue: https://github.com/apache/pulsar/issues/22032 **Describe the solution you'd like** - Support multi-topics reader **Describe alternatives you've considered** No **Additional context** No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.2 updated: [fix][broker] Fix hash collision when using a consumer name that ends with a number (#22053)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 4ffdc2d60c1 [fix][broker] Fix hash collision when using a consumer name that ends with a number (#22053) 4ffdc2d60c1 is described below commit 4ffdc2d60c1c72d38b4f286ed54e68e580a90e0c Author: Lari Hotari AuthorDate: Thu Feb 15 11:07:10 2024 +0200 [fix][broker] Fix hash collision when using a consumer name that ends with a number (#22053) --- ...ConsistentHashingStickyKeyConsumerSelector.java | 14 ++-- ...istentHashingStickyKeyConsumerSelectorTest.java | 74 +- ...ntStickyKeyDispatcherMultipleConsumersTest.java | 4 +- 3 files changed, 70 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java index ea491bd40d3..b2b2b512c8c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java @@ -39,7 +39,8 @@ import org.apache.pulsar.common.util.Murmur3_32Hash; * number of keys assigned to each consumer. */ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyConsumerSelector { - +// use NUL character as field separator for hash key calculation +private static final String KEY_SEPARATOR = "\0"; private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); // Consistent-Hash ring @@ -59,8 +60,7 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons // Insert multiple points on the hash ring for every consumer // The points are deterministically added based on the hash of the consumer name for (int i = 0; i < numberOfPoints; i++) { -String key = consumer.consumerName() + i; -int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes()); +int hash = calculateHashForConsumerAndIndex(consumer, i); hashRing.compute(hash, (k, v) -> { if (v == null) { return Lists.newArrayList(consumer); @@ -79,14 +79,18 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons } } +private static int calculateHashForConsumerAndIndex(Consumer consumer, int index) { +String key = consumer.consumerName() + KEY_SEPARATOR + index; +return Murmur3_32Hash.getInstance().makeHash(key.getBytes()); +} + @Override public void removeConsumer(Consumer consumer) { rwLock.writeLock().lock(); try { // Remove all the points that were added for this consumer for (int i = 0; i < numberOfPoints; i++) { -String key = consumer.consumerName() + i; -int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes()); +int hash = calculateHashForConsumerAndIndex(consumer, i); hashRing.compute(hash, (k, v) -> { if (v == null) { return null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java index dbca31416bb..48311c57338 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java @@ -21,18 +21,18 @@ package org.apache.pulsar.broker.service; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; - -import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; -import org.apache.pulsar.client.api.Range; -import org.testng.Assert; -import org.testng.annotations.Test; - import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; +import org.apache.pulsar.client.api.Range; +import org.testng.Assert; +import org.testng.annotations.Test; @Test(groups = "broker") public class ConsistentHashingStickyKeyConsumerSelectorTest { @@ -154,17 +154,17 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest { } Map> expectedResult = new
Re: [PR] [improve][pip] PIP-336: WASM Function API [pulsar]
asafm commented on code in PR #21992: URL: https://github.com/apache/pulsar/pull/21992#discussion_r1495401560 ## pip/pip-331.md: ## @@ -0,0 +1,129 @@ +# PIP-331: WASM Function API + +# Background knowledge + +WASM(WebAssembly) bytecode is designed to be encoded in a size- and load-time-efficient binary format. WASM aims to leverage the common hardware features available on various platforms to execute in browsers at machine code speed. + +WASI(WebAssembly System Interface) provide a portable interface for applications that run within a constrained sandbox environment, which allows WASM to run in non browser environments such as Linux. It's portable and secure. + +# Motivation + +The server and client sides of the Pulsar function use protobuf for decoupling. In principle, the language supported by protobuf can be supported by the pulsar function, now Pulsar provided the java, python and golang function client, but there are still many languages that are not supported. + +Before all language adaptations are completed (and it's almost entirely certain to be impossible), users cannot write pulsar function in their familiar languages. + +# Goals + +## In Scope + +Other languages, as long as their code can be compiled into WASM bytecode (such as Rust/golang/C++), users can use these languages to write pulsar function. + +## Out of Scope + +All existing abilities of the Java pulsar function client are not reimplemented, the WASM Pulsar functions is under the Java Pulsar functions. + +Due to the strict requirements of WASM on parameter types and for simplicity reasons, types other than `java.lang.Long` are not used as parameters or return value. + +# High Level Design + +```mermaid +flowchart LR; + +subgraph develop +direction TB +SourceCode ==> |"CompileToWASM"| WasmFile ==> |"RenameFile"| MoveToTheResourceDirectory ==> UnitTest +end + +subgraph runtime +direction TB +PulsarFunctionJava ==> |"LoadFromResource"| TheWasmFile ==> |"Invoke"| TheSourceCode +end + +develop --> runtime +``` + +# Detailed Design + +## Design & Implementation Details + +1. add `WasmLoader` to load WASM file and provide the WASM function to java, also provide the java function to WASM if we need. + +2. add `AbstractWasmFunction` and `AbstractWasmWindowFunction` as the core interface of the WASM function api. + +```java +public abstract class AbstractWasmFunction extends WasmLoader implements Function { Review Comment: Well Go have no inheritence - and I'm pretty sure they have a similar need. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-336: WASM Function API [pulsar]
asafm commented on code in PR #21992: URL: https://github.com/apache/pulsar/pull/21992#discussion_r1495399544 ## pip/pip-331.md: ## @@ -0,0 +1,129 @@ +# PIP-331: WASM Function API + +# Background knowledge + +WASM(WebAssembly) bytecode is designed to be encoded in a size- and load-time-efficient binary format. WASM aims to leverage the common hardware features available on various platforms to execute in browsers at machine code speed. + +WASI(WebAssembly System Interface) provide a portable interface for applications that run within a constrained sandbox environment, which allows WASM to run in non browser environments such as Linux. It's portable and secure. + +# Motivation + +The server and client sides of the Pulsar function use protobuf for decoupling. In principle, the language supported by protobuf can be supported by the pulsar function, now Pulsar provided the java, python and golang function client, but there are still many languages that are not supported. + +Before all language adaptations are completed (and it's almost entirely certain to be impossible), users cannot write pulsar function in their familiar languages. + +# Goals + +## In Scope + +Other languages, as long as their code can be compiled into WASM bytecode (such as Rust/golang/C++), users can use these languages to write pulsar function. + +## Out of Scope + +All existing abilities of the Java pulsar function client are not reimplemented, the WASM Pulsar functions is under the Java Pulsar functions. + +Due to the strict requirements of WASM on parameter types and for simplicity reasons, types other than `java.lang.Long` are not used as parameters or return value. + +# High Level Design + +```mermaid +flowchart LR; + +subgraph develop +direction TB +SourceCode ==> |"CompileToWASM"| WasmFile ==> |"RenameFile"| MoveToTheResourceDirectory ==> UnitTest +end + +subgraph runtime +direction TB +PulsarFunctionJava ==> |"LoadFromResource"| TheWasmFile ==> |"Invoke"| TheSourceCode +end + +develop --> runtime +``` + +# Detailed Design + +## Design & Implementation Details + +1. add `WasmLoader` to load WASM file and provide the WASM function to java, also provide the java function to WASM if we need. + +2. add `AbstractWasmFunction` and `AbstractWasmWindowFunction` as the core interface of the WASM function api. + +```java +public abstract class AbstractWasmFunction extends WasmLoader implements Function { + +private static final String PROCESS_METHOD_NAME = "process"; + +protected static final String INITIALIZE_METHOD_NAME = "initialize"; + +protected static final String CLOSE_METHOD_NAME = "close"; + +protected static final Map> ARGUMENTS = new ConcurrentHashMap<>(); + +@Override +public T process(X input, Context context) { +return super.getWasmExtern(PROCESS_METHOD_NAME) +.map(process -> { +Long argumentId = callWASI(input, context, process); +return doProcess(input, context, argumentId); +}) +.orElseThrow(() -> new PulsarWasmException( +PROCESS_METHOD_NAME + " function not found in " + super.getWasmName())); +} + +private Long callWASI(X input, + Context context, + Extern process) { +// call WASI function +final Long argumentId = getArgumentId(input, context); +ARGUMENTS.put(argumentId, new Argument<>(input, context)); Review Comment: Serialization sounds slow, as it should be done per message, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.2 updated: [fix] [broker] Fix can not subscribe partitioned topic with a suffix-matched regexp (#22025)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 569386640ab [fix] [broker] Fix can not subscribe partitioned topic with a suffix-matched regexp (#22025) 569386640ab is described below commit 569386640ab6205781e8afa89a57fb539292fcec Author: fengyubiao AuthorDate: Mon Feb 19 16:43:39 2024 +0800 [fix] [broker] Fix can not subscribe partitioned topic with a suffix-matched regexp (#22025) --- .../pulsar/broker/resources/TopicResources.java| 3 ++ .../pulsar/broker/namespace/NamespaceService.java | 3 ++ .../broker/service/PulsarCommandSenderImpl.java| 6 .../pulsar/broker/service/TopicListService.java| 22 -- .../client/impl/PatternTopicsConsumerImplTest.java | 34 ++ .../apache/pulsar/client/api/ConsumerBuilder.java | 8 ++--- .../client/impl/MultiTopicsConsumerImpl.java | 10 +-- .../impl/PatternMultiTopicsConsumerImpl.java | 24 +-- .../pulsar/client/impl/TopicListWatcher.java | 4 ++- .../impl/conf/ConsumerConfigurationData.java | 2 +- .../apache/pulsar/common/protocol/Commands.java| 7 + 11 files changed, 104 insertions(+), 19 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java index 840ced0a1c1..0963f25c3d3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java @@ -50,6 +50,9 @@ public class TopicResources { store.registerListener(this::handleNotification); } +/*** + * List persistent topics names under a namespace, the topic name contains the partition suffix. + */ public CompletableFuture> listPersistentTopicsAsync(NamespaceName ns) { String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent"; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index d8c3fd169a2..b55eda150af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1432,6 +1432,9 @@ public class NamespaceService implements AutoCloseable { }); } +/*** + * List persistent topics names under a namespace, the topic name contains the partition suffix. + */ public CompletableFuture> getListOfPersistentTopics(NamespaceName namespaceName) { return pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(namespaceName); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java index dd74fc4e71e..105650caaaf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java @@ -356,12 +356,18 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender { writeAndFlush(outBuf); } +/*** + * @param topics topic names which are matching, the topic name contains the partition suffix. + */ @Override public void sendWatchTopicListSuccess(long requestId, long watcherId, String topicsHash, List topics) { BaseCommand command = Commands.newWatchTopicListSuccess(requestId, watcherId, topicsHash, topics); interceptAndWriteCommand(command); } +/*** + * {@inheritDoc} + */ @Override public void sendWatchTopicListUpdate(long watcherId, List newTopics, List deletedTopics, String topicsHash) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java index 7aa50057d73..aea5b9fc65b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java @@ -31,6 +31,7 @@ import org.apache.pulsar.broker.resources.TopicResources; import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.topics.TopicList; import
Re: [PR] [improve][pip] PIP-336: WASM Function API [pulsar]
asafm commented on code in PR #21992: URL: https://github.com/apache/pulsar/pull/21992#discussion_r1495396614 ## pip/pip-331.md: ## @@ -0,0 +1,129 @@ +# PIP-331: WASM Function API + +# Background knowledge + +WASM(WebAssembly) bytecode is designed to be encoded in a size- and load-time-efficient binary format. WASM aims to leverage the common hardware features available on various platforms to execute in browsers at machine code speed. + +WASI(WebAssembly System Interface) provide a portable interface for applications that run within a constrained sandbox environment, which allows WASM to run in non browser environments such as Linux. It's portable and secure. + +# Motivation + +The server and client sides of the Pulsar function use protobuf for decoupling. In principle, the language supported by protobuf can be supported by the pulsar function, now Pulsar provided the java, python and golang function client, but there are still many languages that are not supported. + +Before all language adaptations are completed (and it's almost entirely certain to be impossible), users cannot write pulsar function in their familiar languages. + +# Goals + +## In Scope + +Other languages, as long as their code can be compiled into WASM bytecode (such as Rust/golang/C++), users can use these languages to write pulsar function. + +## Out of Scope + +All existing abilities of the Java pulsar function client are not reimplemented, the WASM Pulsar functions is under the Java Pulsar functions. + +Due to the strict requirements of WASM on parameter types and for simplicity reasons, types other than `java.lang.Long` are not used as parameters or return value. + +# High Level Design + +```mermaid +flowchart LR; + +subgraph develop +direction TB +SourceCode ==> |"CompileToWASM"| WasmFile ==> |"RenameFile"| MoveToTheResourceDirectory ==> UnitTest +end + +subgraph runtime +direction TB +PulsarFunctionJava ==> |"LoadFromResource"| TheWasmFile ==> |"Invoke"| TheSourceCode +end + +develop --> runtime +``` + +# Detailed Design + +## Design & Implementation Details + +1. add `WasmLoader` to load WASM file and provide the WASM function to java, also provide the java function to WASM if we need. Review Comment: Then you need to mention that and expand on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.2 updated: [improve][broker] Do not retain the data in the system topic (#22022)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new c4482dd0a18 [improve][broker] Do not retain the data in the system topic (#22022) c4482dd0a18 is described below commit c4482dd0a18ab8333fc0b8228b378f6ef0f264bc Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Tue Feb 6 15:28:43 2024 +0800 [improve][broker] Do not retain the data in the system topic (#22022) ### Motivation For some use case, the users need to store all the messages even though these message are acked by all subscription. So they set the retention policy of the namespace to infinite retention (setting both time and size limits to `-1`). But the data in the system topic does not need for infinite retention. ### Modifications For system topics, do not retain messages that have already been acknowledged. --- .../pulsar/broker/service/BrokerService.java | 15 +-- .../pulsar/compaction/CompactionRetentionTest.java | 48 ++ 2 files changed, 59 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 13bce3f67df..0a9d100bf7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1771,10 +1771,17 @@ public class BrokerService implements Closeable { } if (retentionPolicies == null) { -retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( -() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), -serviceConfig.getDefaultRetentionSizeInMB()) -); +if (SystemTopicNames.isSystemTopic(topicName)) { +if (log.isDebugEnabled()) { +log.debug("{} Disable data retention policy for system topic.", topicName); +} +retentionPolicies = new RetentionPolicies(0, 0); +} else { +retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( +() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), + serviceConfig.getDefaultRetentionSizeInMB()) +); +} } ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java index 055c595fbfe..98bf2b819c2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -45,9 +46,13 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -212,6 +217,49 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest { ); } +@Test +public void testRetentionPolicesForSystemTopic() throws Exception { +String namespace = "my-tenant/my-ns"; +String topicPrefix = "persistent://" + namespace + "/"; +admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, -1)); +// Check event topics and transaction internal topics. +for (String eventTopic : SystemTopicNames.EVENTS_TOPIC_NAMES) { +checkSystemTopicRetentionPolicy(topicPrefix + eventTopic); +} +
Re: [PR] [improve][pip] PIP-336: WASM Function API [pulsar]
asafm commented on code in PR #21992: URL: https://github.com/apache/pulsar/pull/21992#discussion_r1495396150 ## pip/pip-331.md: ## @@ -0,0 +1,129 @@ +# PIP-331: WASM Function API + +# Background knowledge + +WASM(WebAssembly) bytecode is designed to be encoded in a size- and load-time-efficient binary format. WASM aims to leverage the common hardware features available on various platforms to execute in browsers at machine code speed. + +WASI(WebAssembly System Interface) provide a portable interface for applications that run within a constrained sandbox environment, which allows WASM to run in non browser environments such as Linux. It's portable and secure. + +# Motivation + +The server and client sides of the Pulsar function use protobuf for decoupling. In principle, the language supported by protobuf can be supported by the pulsar function, now Pulsar provided the java, python and golang function client, but there are still many languages that are not supported. + +Before all language adaptations are completed (and it's almost entirely certain to be impossible), users cannot write pulsar function in their familiar languages. + +# Goals + +## In Scope + +Other languages, as long as their code can be compiled into WASM bytecode (such as Rust/golang/C++), users can use these languages to write pulsar function. + +## Out of Scope + +All existing abilities of the Java pulsar function client are not reimplemented, the WASM Pulsar functions is under the Java Pulsar functions. + +Due to the strict requirements of WASM on parameter types and for simplicity reasons, types other than `java.lang.Long` are not used as parameters or return value. Review Comment: I think there is a problem here. I can't learn about this feature you mentioned from the documentation of wasmtime-java since it's non-existent, and the scarce documentation in this PIP doesn't help. How can this be maintained by other people going forward? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.2 updated: [fix] [broker] Subscription stuck due to called Admin API analyzeSubscriptionBacklog (#22019)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 9e4ebdbacd1 [fix] [broker] Subscription stuck due to called Admin API analyzeSubscriptionBacklog (#22019) 9e4ebdbacd1 is described below commit 9e4ebdbacd17216f65006e8bacdb0265101cb3b5 Author: fengyubiao AuthorDate: Mon Feb 19 00:04:10 2024 +0800 [fix] [broker] Subscription stuck due to called Admin API analyzeSubscriptionBacklog (#22019) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 29 +++-- .../service/persistent/PersistentSubscription.java | 30 +++--- .../apache/pulsar/broker/admin/AdminApi2Test.java | 29 + .../admin/AnalyzeBacklogSubscriptionTest.java | 18 ++--- .../common/util/collections/BitSetRecyclable.java | 8 ++ 5 files changed, 99 insertions(+), 15 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 7e930e711ec..da013c07313 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -196,11 +196,11 @@ public class ManagedCursorImpl implements ManagedCursor { position.ackSet = null; return position; }; -private final RangeSetWrapper individualDeletedMessages; +protected final RangeSetWrapper individualDeletedMessages; // Maintain the deletion status for batch messages // (ledgerId, entryId) -> deletion indexes -private final ConcurrentSkipListMap batchDeletedIndexes; +protected final ConcurrentSkipListMap batchDeletedIndexes; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private RateLimiter markDeleteLimiter; @@ -3617,4 +3617,29 @@ public class ManagedCursorImpl implements ManagedCursor { public ManagedLedgerConfig getConfig() { return config; } + +/*** + * Create a non-durable cursor and copy the ack stats. + */ +public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) throws ManagedLedgerException { +NonDurableCursorImpl newNonDurableCursor = +(NonDurableCursorImpl) ledger.newNonDurableCursor(getMarkDeletedPosition(), nonDurableCursorName); +if (individualDeletedMessages != null) { +this.individualDeletedMessages.forEach(range -> { +newNonDurableCursor.individualDeletedMessages.addOpenClosed( +range.lowerEndpoint().getLedgerId(), +range.lowerEndpoint().getEntryId(), +range.upperEndpoint().getLedgerId(), +range.upperEndpoint().getEntryId()); +return true; +}); +} +if (batchDeletedIndexes != null) { +for (Map.Entry entry : this.batchDeletedIndexes.entrySet()) { +BitSetRecyclable copiedBitSet = BitSetRecyclable.valueOf(entry.getValue()); +newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), copiedBitSet); +} +} +return newNonDurableCursor; +} } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index dc79146110f..e5d90bf0ef4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -514,9 +515,15 @@ public class PersistentSubscription extends AbstractSubscription implements Subs return "Null"; } -@Override public CompletableFuture analyzeBacklog(Optional position) { - +final ManagedLedger managedLedger = topic.getManagedLedger(); +final String newNonDurableCursorName = "analyze-backlog-" + UUID.randomUUID(); +ManagedCursor newNonDurableCursor; +try { +newNonDurableCursor = ((ManagedCursorImpl) cursor).duplicateNonDurableCursor(newNonDurableCursorName); +} catch (ManagedLedgerException e) { +return CompletableFuture.failedFuture(e); +} long start = System.currentTimeMillis(); if
(pulsar) branch branch-3.2 updated: [improve] [broker] Do not print an Error log when responding to `HTTP-404` when calling `Admin API` and the topic does not exist. (#21995)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 0fbb7fd1b2a [improve] [broker] Do not print an Error log when responding to `HTTP-404` when calling `Admin API` and the topic does not exist. (#21995) 0fbb7fd1b2a is described below commit 0fbb7fd1b2ab15d00ca248c80a90edce4365cb8e Author: fengyubiao AuthorDate: Sun Feb 18 15:46:52 2024 +0800 [improve] [broker] Do not print an Error log when responding to `HTTP-404` when calling `Admin API` and the topic does not exist. (#21995) --- .../apache/pulsar/broker/admin/AdminResource.java | 4 + .../broker/admin/impl/PersistentTopicsBase.java| 88 +++--- .../broker/admin/impl/SchemasResourceBase.java | 2 +- .../broker/admin/v2/NonPersistentTopics.java | 6 +- .../pulsar/broker/admin/v2/PersistentTopics.java | 36 - .../pulsar/broker/admin/v3/Transactions.java | 12 +-- 6 files changed, 75 insertions(+), 73 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 1526ae18a90..2ceec189975 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -834,6 +834,10 @@ public abstract class AdminResource extends PulsarWebResource { == Status.NOT_FOUND.getStatusCode(); } +protected static boolean isNot307And404Exception(Throwable ex) { +return !isRedirectException(ex) && !isNotFoundException(ex); +} + protected static String getTopicNotFoundErrorMessage(String topic) { return String.format("Topic %s not found", topic); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 379d6675b57..0cdb140c7c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -874,7 +874,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get partitioned metadata while unloading topic {}", clientAppId(), topicName, ex); } @@ -884,7 +884,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { + if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to validate the global namespace ownership while unloading topic {}", clientAppId(), topicName, ex); } @@ -1052,7 +1052,7 @@ public class PersistentTopicsBase extends AdminResource { })) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. -if (!isRedirectException(ex)) { +if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, ex); } resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -1074,7 +1074,7 @@ public class PersistentTopicsBase extends AdminResource { })) .exceptionally(ex -> { // If the exception is not redirect exception we need to log it. -if (!isRedirectException(ex)) { +if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to unload tc {},{}", clientAppId(), topicName.getPartitionIndex(), ex); } @@ -1176,7 +1176,7 @@ public class PersistentTopicsBase extends AdminResource { } }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. -if (!isRedirectException(ex)) { +if (!isNot307And404Exception(ex)) { log.error("[{}] Failed to get partitioned topic
(pulsar) branch branch-3.2 updated: [fix] [broker] add timeout for health check read. (#21990)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 1be9692a6f4 [fix] [broker] add timeout for health check read. (#21990) 1be9692a6f4 is described below commit 1be9692a6f46a23bf9bdecdefe7f7a25d47a16c6 Author: thetumbled <52550727+thetumb...@users.noreply.github.com> AuthorDate: Tue Jan 30 23:40:09 2024 +0800 [fix] [broker] add timeout for health check read. (#21990) --- .../pulsar/broker/admin/impl/BrokersBase.java | 13 - .../broker/admin/AdminApiHealthCheckTest.java | 63 ++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index f056b18f3f1..61b354610ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -26,6 +26,7 @@ import io.swagger.annotations.ApiResponses; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -34,6 +35,7 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -80,6 +82,12 @@ public class BrokersBase extends AdminResource { // log a full thread dump when a deadlock is detected in healthcheck once every 10 minutes // to prevent excessive logging private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 60L; +// there is a timeout of 60 seconds default in the client(readTimeoutMs), so we need to set the timeout +// a bit shorter than 60 seconds to avoid the client timeout exception thrown before the server timeout exception. +// or we can't propagate the server timeout exception to the client. +private static final Duration HEALTH_CHECK_READ_TIMEOUT = Duration.ofSeconds(58); +private static final TimeoutException HEALTH_CHECK_TIMEOUT_EXCEPTION = +FutureUtil.createTimeoutException("Timeout", BrokersBase.class, "healthCheckRecursiveReadNext(...)"); private volatile long threadDumpLoggedTimestamp; @GET @@ -434,7 +442,10 @@ public class BrokersBase extends AdminResource { }); throw FutureUtil.wrapToCompletionException(createException); }).thenCompose(reader -> producer.sendAsync(messageStr) -.thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr)) +.thenCompose(__ -> FutureUtil.addTimeoutHandling( + healthCheckRecursiveReadNext(reader, messageStr), +HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(), +() -> HEALTH_CHECK_TIMEOUT_EXCEPTION)) .whenComplete((__, ex) -> { closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName) .whenComplete((unused, innerEx) -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java index a780f889de8..357422b11f6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java @@ -23,6 +23,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import java.lang.management.ManagementFactory; import java.lang.management.ThreadMXBean; +import java.lang.reflect.Field; import java.time.Duration; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -31,13 +32,21 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; +import
(pulsar) branch branch-3.2 updated: [fix][client] Fix ConsumerBuilderImpl#subscribe silent stuck when using pulsar-client:3.0.x with jackson-annotations prior to 2.12.0 (#21985)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 07f2586a23f [fix][client] Fix ConsumerBuilderImpl#subscribe silent stuck when using pulsar-client:3.0.x with jackson-annotations prior to 2.12.0 (#21985) 07f2586a23f is described below commit 07f2586a23ffba765128d2dee874faf7a6dc8713 Author: 萧易客 AuthorDate: Tue Jan 30 19:20:29 2024 +0800 [fix][client] Fix ConsumerBuilderImpl#subscribe silent stuck when using pulsar-client:3.0.x with jackson-annotations prior to 2.12.0 (#21985) ### Motivation In summary, `jackson-annotations:2.12.0` or later is now required for `pulsar-client 3.0.x`, and this also applies to versions `3.1.x` and `3.2.x`. Otherwise, `ConsumerBuilderImpl#subscribe` may become stuck without displaying any error message. ### Modifications Modify the `whenComplete` to a combination of `thenAccept` and `exceptionally`. The modification is harmless. --- .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 14 +++--- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index baabaf67070..84504b632ad 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1025,13 +1025,13 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { private void subscribeTopicPartitions(CompletableFuture subscribeResult, String topicName, int numPartitions, boolean createIfDoesNotExist) { -client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((schema, cause) -> { -if (null == cause) { -doSubscribeTopicPartitions(schema, subscribeResult, topicName, numPartitions, createIfDoesNotExist); -} else { -subscribeResult.completeExceptionally(cause); -} -}); +client.preProcessSchemaBeforeSubscribe(client, schema, topicName) +.thenAccept(schema -> { +doSubscribeTopicPartitions(schema, subscribeResult, topicName, numPartitions, createIfDoesNotExist); +}).exceptionally(cause -> { +subscribeResult.completeExceptionally(cause); +return null; +}); } private void doSubscribeTopicPartitions(Schema schema,
Re: [PR] [improve][pip] PIP-336: WASM Function API [pulsar]
asafm commented on code in PR #21992: URL: https://github.com/apache/pulsar/pull/21992#discussion_r1495389552 ## pip/pip-331.md: ## @@ -0,0 +1,129 @@ +# PIP-331: WASM Function API + +# Background knowledge + +WASM(WebAssembly) bytecode is designed to be encoded in a size- and load-time-efficient binary format. WASM aims to leverage the common hardware features available on various platforms to execute in browsers at machine code speed. + +WASI(WebAssembly System Interface) provide a portable interface for applications that run within a constrained sandbox environment, which allows WASM to run in non browser environments such as Linux. It's portable and secure. + +# Motivation + +The server and client sides of the Pulsar function use protobuf for decoupling. In principle, the language supported by protobuf can be supported by the pulsar function, now Pulsar provided the java, python and golang function client, but there are still many languages that are not supported. + +Before all language adaptations are completed (and it's almost entirely certain to be impossible), users cannot write pulsar function in their familiar languages. + +# Goals + +## In Scope + +Other languages, as long as their code can be compiled into WASM bytecode (such as Rust/golang/C++), users can use these languages to write pulsar function. + +## Out of Scope + +All existing abilities of the Java pulsar function client are not reimplemented, the WASM Pulsar functions is under the Java Pulsar functions. Review Comment: Both repos you mentioned have ~100 stars. The wasmtime repo you mentioned doesn't mention Java as a supported language. Doesn't sound like a solid ground to stand upon no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.2 updated: [improve] [proxy] Add a check for brokerServiceURL that does not support multi uri yet (#21972)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 6aae7f63285 [improve] [proxy] Add a check for brokerServiceURL that does not support multi uri yet (#21972) 6aae7f63285 is described below commit 6aae7f63285863d1077530f007d1f25f9f4c3e50 Author: fengyubiao AuthorDate: Tue Jan 30 19:34:01 2024 +0800 [improve] [proxy] Add a check for brokerServiceURL that does not support multi uri yet (#21972) ### Motivation At the beginning of the design, these two configurations(`brokerServiceURL & brokerServiceURLTLS`) do not support setting multiple broker addresses, which should instead be set to a “discovery service provider.” see: https://github.com/apache/pulsar/pull/1002 and https://github.com/apache/pulsar/pull/14682 Users will get the below error if they set A to a multi-broker URLs ``` "2024-01-09 00:20:10,261 -0800 [pulsar-proxy-io-4-7] WARN io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. java.lang.IllegalArgumentException: port out of range:-1 at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143) ~[?:?] at java.net.InetSocketAddress.createUnresolved(InetSocketAddress.java:254) ~[?:?] at org.apache.pulsar.proxy.server.LookupProxyHandler.getAddr(LookupProxyHandler.java:432) ~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0] at org.apache.pulsar.proxy.server.LookupProxyHandler.handleGetSchema(LookupProxyHandler.java:357) ~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0] at org.apache.pulsar.proxy.server.ProxyConnection.handleGetSchema(ProxyConnection.java:463) ~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0] at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:326) ~[io.streamnative-pulsar-common-2.9.2.12.jar:2.9.2.12] at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:221) ~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372) ~[io.netty-netty-handler-4.1.74.Final.jar:4.1.74.Final] at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1246) ~[io.netty-netty-handler-4.1.74.Final.jar:4.1.74.Final] at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1286) ~[io.netty-netty-handler-4.1.74.Final.jar:4.1.74.Final] at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510) ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final] at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449) ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
(pulsar) branch branch-3.2 updated: [fix][broker] Fix schema deletion error when deleting a partitioned topic with many partitions and schema (#21977)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 7462f669e9e [fix][broker] Fix schema deletion error when deleting a partitioned topic with many partitions and schema (#21977) 7462f669e9e is described below commit 7462f669e9e9f64db5fbc5e39f535bf2d33c2223 Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com> AuthorDate: Mon Jan 29 20:16:05 2024 -0800 [fix][broker] Fix schema deletion error when deleting a partitioned topic with many partitions and schema (#21977) --- .../pulsar/broker/service/BrokerService.java | 29 ++ .../service/schema/BookkeeperSchemaStorage.java| 6 +++-- .../tests/integration/schema/SchemaTest.java | 11 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 62197900076..13bce3f67df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -123,8 +123,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SystemTopic; import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; -import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; -import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; @@ -3447,22 +3445,21 @@ public class BrokerService implements Closeable { } public CompletableFuture deleteSchema(TopicName topicName) { +// delete schema at the upper level when deleting the partitioned topic. +if (topicName.isPartitioned()) { +return CompletableFuture.completedFuture(null); +} String base = topicName.getPartitionedTopicName(); String id = TopicName.get(base).getSchemaName(); -SchemaRegistryService schemaRegistryService = getPulsar().getSchemaRegistryService(); -return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id)) -.thenCompose(schema -> { -if (schema != null) { -// It's different from `SchemasResource.deleteSchema` -// because when we delete a topic, the schema -// history is meaningless. But when we delete a schema of a topic, a new schema could be -// registered in the future. -log.info("Delete schema storage of id: {}", id); -return getPulsar().getSchemaRegistryService().deleteSchemaStorage(id); -} else { -return CompletableFuture.completedFuture(null); -} -}); +return getPulsar().getSchemaRegistryService().deleteSchemaStorage(id).whenComplete((vid, ex) -> { +if (vid != null && ex == null) { +// It's different from `SchemasResource.deleteSchema` +// because when we delete a topic, the schema +// history is meaningless. But when we delete a schema of a topic, a new schema could be +// registered in the future. +log.info("Deleted schema storage of id: {}", id); +} +}); } private CompletableFuture checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 78e30f6fff8..c509764bf67 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -707,7 +707,8 @@ public class BookkeeperSchemaStorage implements SchemaStorage { message += " - entry=" + entryId; } boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException -&& rc != BKException.Code.NoSuchEntryException; +&& rc != BKException.Code.NoSuchEntryException +&& rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException; return new
(pulsar) branch branch-3.2 updated: [fix] [broker] Fix reader stuck when read from compacted topic with read compact mode disable (#21969)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new ea681a9d579 [fix] [broker] Fix reader stuck when read from compacted topic with read compact mode disable (#21969) ea681a9d579 is described below commit ea681a9d579868ff0efd1fd863ad97208c751706 Author: thetumbled <52550727+thetumb...@users.noreply.github.com> AuthorDate: Wed Jan 31 00:11:07 2024 +0800 [fix] [broker] Fix reader stuck when read from compacted topic with read compact mode disable (#21969) --- .../apache/pulsar/broker/service/ServerCnx.java| 32 +- .../compaction/GetLastMessageIdCompactedTest.java | 27 ++ 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 9f2b98aeb40..0d9b5ea73e0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2170,7 +2170,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { (PositionImpl) markDeletePosition, partitionIndex, requestId, -consumer.getSubscription().getName()); +consumer.getSubscription().getName(), +consumer.readCompacted()); }).exceptionally(e -> { writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.UnknownError, "Failed to recover Transaction Buffer.")); @@ -2188,15 +2189,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { PositionImpl markDeletePosition, int partitionIndex, long requestId, -String subscriptionName) { +String subscriptionName, +boolean readCompacted) { PersistentTopic persistentTopic = (PersistentTopic) topic; ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); // If it's not pointing to a valid entry, respond messageId of the current position. // If the compaction cursor reach the end of the topic, respond messageId from compacted ledger -CompletableFuture compactionHorizonFuture = - persistentTopic.getTopicCompactionService().getLastCompactedPosition(); +CompletableFuture compactionHorizonFuture = readCompacted +? persistentTopic.getTopicCompactionService().getLastCompactedPosition() : +CompletableFuture.completedFuture(null); compactionHorizonFuture.whenComplete((compactionHorizon, ex) -> { if (ex != null) { @@ -2205,8 +2208,22 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { return; } -if (lastPosition.getEntryId() == -1 || (compactionHorizon != null -&& lastPosition.compareTo((PositionImpl) compactionHorizon) <= 0)) { +if (lastPosition.getEntryId() == -1 || !ml.ledgerExists(lastPosition.getLedgerId())) { +// there is no entry in the original topic +if (compactionHorizon != null) { +// if readCompacted is true, we need to read the last entry from compacted topic +handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex, +markDeletePosition); +} else { +// if readCompacted is false, we need to return MessageId.earliest + writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, partitionIndex, -1, +markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, +markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); +} +return; +} + +if (compactionHorizon != null && lastPosition.compareTo((PositionImpl) compactionHorizon) <= 0) { handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex, markDeletePosition); return; @@ -2241,7 +2258,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { batchSizeFuture.whenComplete((batchSize, e) -> { if (e != null) { -if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) { +if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException +
(pulsar) branch branch-3.2 updated: [fix][fn] Use unified PackageManagement service to download packages (#21955)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new c980ee24b72 [fix][fn] Use unified PackageManagement service to download packages (#21955) c980ee24b72 is described below commit c980ee24b726bcf6f6f980a60d94ce37d8e73016 Author: jiangpengcheng AuthorDate: Wed Jan 31 22:18:47 2024 +0800 [fix][fn] Use unified PackageManagement service to download packages (#21955) --- .../java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index db31847f91c..fc2873d8271 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -1766,6 +1766,8 @@ public abstract class ComponentImpl implements Component { + "when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl)); } +} else if (Utils.hasPackageTypePrefix(existingPackagePath)) { +componentPackageFile = getPackageFile(existingPackagePath); } else if (uploadedInputStream != null) { componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream); } else if (!existingPackagePath.startsWith(Utils.BUILTIN)) {
(pulsar) branch branch-3.2 updated: [fix][client] Fix multi-topics consumer could receive old messages after seek (#21945)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 003efdd734e [fix][client] Fix multi-topics consumer could receive old messages after seek (#21945) 003efdd734e is described below commit 003efdd734ee3a373bf86bdfcd740f2c9ab83771 Author: Yunze Xu AuthorDate: Wed Jan 31 00:31:15 2024 +0800 [fix][client] Fix multi-topics consumer could receive old messages after seek (#21945) --- .../pulsar/client/impl/TopicsConsumerImplTest.java | 80 +- .../client/impl/MultiTopicsConsumerImpl.java | 66 -- 2 files changed, 125 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 51b32c2b44e..c343ab0d6e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; @@ -57,22 +58,27 @@ import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; - import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.Set; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -1394,4 +1400,76 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { } } +@DataProvider +public static Object[][] seekByFunction() { +return new Object[][] { +{ true }, { false } +}; +} + +@Test(timeOut = 3, dataProvider = "seekByFunction") +public void testSeekToNewerPosition(boolean seekByFunction) throws Exception { +final var topic1 = TopicName.get(newTopicName()).toString() +.replace("my-property", "public").replace("my-ns", "default"); +final var topic2 = TopicName.get(newTopicName()).toString() +.replace("my-property", "public").replace("my-ns", "default"); +@Cleanup final var producer1 = pulsarClient.newProducer(Schema.STRING).topic(topic1).create(); +@Cleanup final var producer2 = pulsarClient.newProducer(Schema.STRING).topic(topic2).create(); +producer1.send("1-0"); +producer2.send("2-0"); +producer1.send("1-1"); +producer2.send("2-1"); +final var consumer1 = pulsarClient.newConsumer(Schema.STRING) +.topics(Arrays.asList(topic1, topic2)).subscriptionName("sub") +.ackTimeout(1, TimeUnit.SECONDS) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); +final var timestamps = new ArrayList(); +for (int i = 0; i < 4; i++) { +timestamps.add(consumer1.receive().getPublishTime()); +} +timestamps.sort(Comparator.naturalOrder()); +final var timestamp = timestamps.get(2); +consumer1.close(); + +final Function, CompletableFuture> seekAsync = consumer -> { +final var future = seekByFunction ? consumer.seekAsync(__ -> timestamp) : consumer.seekAsync(timestamp); +assertEquals(((ConsumerBase) consumer).getIncomingMessageSize(), 0L); +assertEquals(((ConsumerBase) consumer).getTotalIncomingMessages(), 0); +assertTrue(((ConsumerBase) consumer).getUnAckedMessageTracker().isEmpty()); +return future; +}; + +@Cleanup final var consumer2 = pulsarClient.newConsumer(Schema.STRING) +.topics(Arrays.asList(topic1, topic2)).subscriptionName("sub-2") +.ackTimeout(1, TimeUnit.SECONDS) +
Re: [PR] [feat] PIP-188 Support blue-green migration [pulsar-client-cpp]
BewareMyPower commented on code in PR #402: URL: https://github.com/apache/pulsar-client-cpp/pull/402#discussion_r1495363450 ## lib/HandlerBase.h: ## @@ -52,6 +52,7 @@ class HandlerBase : public std::enable_shared_from_this { ClientConnectionWeakPtr getCnx() const; void setCnx(const ClientConnectionPtr& cnx); void resetCnx() { setCnx(nullptr); } +void setRedirectedClusterURI(const std::string serviceUrl) { redirectedClusterURI_ = serviceUrl; } Review Comment: Use `const std::string&` as the parameter type. ## lib/HandlerBase.h: ## @@ -145,6 +146,8 @@ class HandlerBase : public std::enable_shared_from_this { mutable std::mutex connectionMutex_; std::atomic reconnectionPending_; ClientConnectionWeakPtr connection_; +std::string redirectedClusterURI_; Review Comment: This field could be accessed in different threads. You should use a mutex to protect it. ## lib/ProducerImpl.cc: ## @@ -144,7 +144,11 @@ Future ProducerImpl::connectionOpened(const ClientConnectionPtr& c return promise.getFuture(); } +LOG_DEBUG("Creating producer for topic:" << topic() << ", producerName:" << producerName_ << " on " Review Comment: Maybe we can use `LOG_INFO` like Java client? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org