Re: [PR] [fix][broker] Support running docker container with gid != 0 [pulsar]

2024-02-20 Thread via GitHub


lhotari commented on PR #22081:
URL: https://github.com/apache/pulsar/pull/22081#issuecomment-1956079740

   I created a separate issue about the issue with `readOnlyRootFilesystem`, 
#22088. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug] Pulsar's containers fail to start in k8s with readOnlyRootFilesystem pod/container security context [pulsar]

2024-02-20 Thread via GitHub


lhotari opened a new issue, #22088:
URL: https://github.com/apache/pulsar/issues/22088

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   any, for example 3.0.2
   
   ### Minimal reproduce step
   
   ```
   ❯ docker run --user 1:10001 -e advertisedAddress=foobar --rm --read-only 
apachepulsar/pulsar:3.0.2  sh \
   -c "bin/apply-config-from-env.py \
   conf/standalone.conf && \
   bin/pulsar standalone"
   [conf/standalone.conf] Applying config advertisedAddress = foobar
   Traceback (most recent call last):
 File "/pulsar/bin/apply-config-from-env.py", line 104, in 
   f = open(conf_filename, 'w')
   OSError: [Errno 30] Read-only file system: 'conf/standalone.conf'
   ```
   
   ### What did you expect to see?
   
   I should be possible to run Pulsar container with read only root filesystem.
   
   ### What did you see instead?
   
   `OSError: [Errno 30] Read-only file system` error
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Support running docker container with gid != 0 [pulsar]

2024-02-20 Thread via GitHub


lhotari commented on PR #22081:
URL: https://github.com/apache/pulsar/pull/22081#issuecomment-1956071168

   Good way to reproduce the issue with the current container (before applying 
this fix):
   ```
   ❯ docker run --user 1:10001 -e advertisedAddress=foobar --rm -it 
apachepulsar/pulsar:3.0.2  sh \
   -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar 
standalone"
   [conf/standalone.conf] Applying config advertisedAddress = foobar
   Traceback (most recent call last):
 File "/pulsar/bin/apply-config-from-env.py", line 104, in 
   f = open(conf_filename, 'w')
   PermissionError: [Errno 13] Permission denied: 'conf/standalone.conf'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][sec] Upgrade commons-compress to 1.26.0 [pulsar]

2024-02-20 Thread via GitHub


lhotari commented on PR #22086:
URL: https://github.com/apache/pulsar/pull/22086#issuecomment-1956062185

   > `commons-compress` v1.21 has no dependencies on other libraries, but 
v1.26.0 has dependencies on `commons-io` and `commons-lang3`, which seems to 
cause `JavaInstanceDepsTest` to fail.
   > 
   > ```
   > [INFO] +- org.apache.commons:commons-compress:jar:1.26.0:compile
   > [INFO] |  +- commons-io:commons-io:jar:2.15.1:compile
   > [INFO] |  \- org.apache.commons:commons-lang3:jar:3.14.0:compile
   > ```
   > 
   > Therefore, I modified `JavaInstanceDepsTest` so that the test passes even 
if `java-instance.jar` includes classes from these libraries. Please let me 
know if this change is incorrect.
   
   @massakam Makes sense. Thanks for addressing this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][sec] Upgrade commons-compress to 1.26.0 [pulsar]

2024-02-20 Thread via GitHub


massakam commented on PR #22086:
URL: https://github.com/apache/pulsar/pull/22086#issuecomment-1956060456

   `commons-compress` v1.21 has no dependencies on other libraries, but v1.26.0 
has dependencies on `commons-io` and `commons-lang3`, which seems to cause 
`JavaInstanceDepsTest` to fail.
   ```
   [INFO] +- org.apache.commons:commons-compress:jar:1.26.0:compile
   [INFO] |  +- commons-io:commons-io:jar:2.15.1:compile
   [INFO] |  \- org.apache.commons:commons-lang3:jar:3.14.0:compile
   ```
   
   Therefore, I modified `JavaInstanceDepsTest` so that the test passes even if 
`java-instance.jar` includes classes from these libraries. Please let me know 
if this change is incorrect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] Issue in using relational db mysql and postgres as source for pulsar io topics [pulsar]

2024-02-20 Thread via GitHub


lhotari commented on issue #22083:
URL: https://github.com/apache/pulsar/issues/22083#issuecomment-1956054877

   > Pulsar 3.0.1
   
   Do you have a chance to test with Pulsar 3.0.2 since that's the latest 
released 3.0.x version. Just to make sure that it's not already fixed. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Support running docker container with gid != 0 [pulsar]

2024-02-20 Thread via GitHub


lhotari merged PR #22081:
URL: https://github.com/apache/pulsar/pull/22081


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][broker] Support running docker container with gid != 0 (#22081)

2024-02-20 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 4097ddd5e8c [fix][broker] Support running docker container with gid != 
0 (#22081)
4097ddd5e8c is described below

commit 4097ddd5e8c4fae4d95c939222341e5ad5dd6d20
Author: Lari Hotari 
AuthorDate: Wed Feb 21 09:34:29 2024 +0200

[fix][broker] Support running docker container with gid != 0 (#22081)
---
 docker/pulsar/Dockerfile | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/docker/pulsar/Dockerfile b/docker/pulsar/Dockerfile
index 4e5885ce55d..6a0dc0100e7 100644
--- a/docker/pulsar/Dockerfile
+++ b/docker/pulsar/Dockerfile
@@ -36,10 +36,14 @@ COPY scripts/install-pulsar-client.sh /pulsar/bin
 
 # The final image needs to give the root group sufficient permission for 
Pulsar components
 # to write to specific directories within /pulsar
+# The ownership is changed to uid 1 to allow using a different root group. 
This is necessary when running the
+# container when gid=0 is prohibited. In that case, the container must be run 
with uid 1 with
+# any group id != 0 (for example 10001).
 # The file permissions are preserved when copying files from this builder 
image to the target image.
 RUN for SUBDIRECTORY in conf data download logs; do \
  [ -d /pulsar/$SUBDIRECTORY ] || mkdir /pulsar/$SUBDIRECTORY; \
- chmod -R g+w /pulsar/$SUBDIRECTORY; \
+ chmod -R ug+w /pulsar/$SUBDIRECTORY; \
+ chown -R 1:0 /pulsar/$SUBDIRECTORY; \
  done
 
 ### Create 2nd stage from Ubuntu image



Re: [I] [Bug] PIP-307 changes don't currently cover advertised listeners support (PIP-61/PIP-95) [pulsar]

2024-02-20 Thread via GitHub


lhotari commented on issue #22061:
URL: https://github.com/apache/pulsar/issues/22061#issuecomment-1956048894

   There's also a need for SNI proxy support to cover all connectivity options. 
I think that it is commonly used together with Pulsar's SNI proxy feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][client] fixed getProxyConnection when the topic is migrated [pulsar]

2024-02-20 Thread via GitHub


lhotari commented on PR #22085:
URL: https://github.com/apache/pulsar/pull/22085#issuecomment-1956044155

   Since this is touching this area, I'm reminding about #22061 . :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][sec] Upgrade commons-compress to 1.26.0 [pulsar]

2024-02-20 Thread via GitHub


lhotari commented on PR #22086:
URL: https://github.com/apache/pulsar/pull/22086#issuecomment-1956041194

   JavaInstanceDepsTest seems to be failing. Please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [txn] Get previous position by managed ledger. [pulsar]

2024-02-20 Thread via GitHub


codecov-commenter commented on PR #22024:
URL: https://github.com/apache/pulsar/pull/22024#issuecomment-1956034191

   ## 
[Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22024?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   All modified and coverable lines are covered by tests :white_check_mark:
   > Comparison is base 
[(`825e997`)](https://app.codecov.io/gh/apache/pulsar/commit/825e997216dabe23a6dde0945ef769bbda0558e4?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 73.57% compared to head 
[(`b9b8913`)](https://app.codecov.io/gh/apache/pulsar/pull/22024?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 73.54%.
   > Report is 2 commits behind head on master.
   
   Additional details and impacted files
   
   
   [![Impacted file tree 
graph](https://app.codecov.io/gh/apache/pulsar/pull/22024/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22024?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #22024  +/-   ##
   
   - Coverage 73.57%   73.54%   -0.03% 
   + Complexity3257532049 -526 
   
 Files  1874 1874  
 Lines139252   139252  
 Branches  1526015260  
   
   - Hits 102454   102413  -41 
   - Misses2888028916  +36 
   - Partials   7918 7923   +5 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pulsar/pull/22024/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[inttests](https://app.codecov.io/gh/apache/pulsar/pull/22024/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `24.63% <100.00%> (-0.15%)` | :arrow_down: |
   | 
[systests](https://app.codecov.io/gh/apache/pulsar/pull/22024/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `24.26% <0.00%> (-0.09%)` | :arrow_down: |
   | 
[unittests](https://app.codecov.io/gh/apache/pulsar/pull/22024/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `72.81% <100.00%> (-0.03%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   | 
[Files](https://app.codecov.io/gh/apache/pulsar/pull/22024?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[...ransaction/buffer/impl/TopicTransactionBuffer.java](https://app.codecov.io/gh/apache/pulsar/pull/22024?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9idWZmZXIvaW1wbC9Ub3BpY1RyYW5zYWN0aW9uQnVmZmVyLmphdmE=)
 | `87.74% <100.00%> (ø)` | |
   
   ... and [71 files with indirect coverage 
changes](https://app.codecov.io/gh/apache/pulsar/pull/22024/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Flaky-test: ExtensibleLoadManagerImplTest.testGetMetrics [pulsar]

2024-02-20 Thread via GitHub


lhotari commented on issue #21556:
URL: https://github.com/apache/pulsar/issues/21556#issuecomment-1956028348

   > After some troubleshooting ([failure*.txt 
reproduce](https://github.com/MMirelli/pulsar-flake-troubleshooter/tree/9e7b9e2d33d9283dded0f63653d08a92902dbe09/fix-21556/data)
 the issue), the test seems to have some concurrency issues. From the runs I 
did I could gather the following stats:
   > 
   > ```
   > | Metric  | Runs| Counter |
   > |:---:|:---:|:---:|
   > | brk_sunit_state_chn_subscribe_ops_total | 1,10| handlerCounters |
   > | brk_sunit_state_chn_owner_lookup_total  | 2,3,6,9 | ownerLookUpCounters |
   > | brk_lb_assign_broker_breakdown_total| 4,5,7,8 | breakdownCounters   |
   > ```
   > 
   > Where, for example, the first row reads: runs 1 and 10 showed an issue 
with metric named `brk_sunit_state_chn_subscribe_ops_total`, which is stored in 
`handlerCounters`. It seems that most of the issues might be caused by the fact 
that we are not using concurrent collections and / or lack of synchronization. 
As they are metrics we may anyway want to skip synchronization, so I am unsure 
whether it makes sense to put the time to fix (all) these issues.
   
   Impressive analysis @MMirelli ! Thanks.
   @heesung-sn @dragosvictor Please check this.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][client] fixed getProxyConnection when the topic is migrated [pulsar]

2024-02-20 Thread via GitHub


dragosvictor commented on code in PR #22085:
URL: https://github.com/apache/pulsar/pull/22085#discussion_r1497008116


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java:
##
@@ -990,8 +991,13 @@ public CompletableFuture 
getConnectionToServiceUrl() {
 return getConnection(address, address, 
cnxPool.genRandomKeyToSelectCon());
 }
 
-public CompletableFuture getProxyConnection(final 
InetSocketAddress logicalAddress,
+public CompletableFuture getProxyConnection(final URI 
redirectedClusterURI,
+   final 
InetSocketAddress logicalAddress,
final int 
randomKeyForSelectConnection) {
+
+LookupService lookup =
+redirectedClusterURI == null ? this.lookup : 
getLookup(redirectedClusterURI.toString());

Review Comment:
   Since these connections are cached and pooled, will this still work if the 
`logicalAddress` stays the same in the source and destination clusters? Or is 
that not possible? Otherwise, we might have to use an actually random key for 
connection selection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [txn] Get previous position by managed ledger. [pulsar]

2024-02-20 Thread via GitHub


thetumbled commented on PR #22024:
URL: https://github.com/apache/pulsar/pull/22024#issuecomment-1956009864

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [improve] Reuse the internal writer/reader under the same system topic [pulsar]

2024-02-20 Thread via GitHub


poorbarcode opened a new issue, #22087:
URL: https://github.com/apache/pulsar/issues/22087

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   So far, Pulsar creates an internal producer to send the Pulsar Event and 
close it after it is sent immediately.
   
   The following Profile was captured when doing much topic creation and 
deletion.
   
   
![image](https://github.com/apache/pulsar/assets/25195800/aed99a04-51d5-46e9-b471-71285d9191e3)
   
   https://github.com/apache/pulsar/assets/25195800/1f466756-eefd-4334-b8f3-2023f4593806;>
   
   
   ### Solution
   
   We can reuse the same writer/reader to improve the performance.
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [txn] Get previous position by managed ledger. [pulsar]

2024-02-20 Thread via GitHub


dao-jun commented on code in PR #22024:
URL: https://github.com/apache/pulsar/pull/22024#discussion_r1496938782


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##
@@ -287,8 +287,8 @@ private void handleTransactionMessage(TxnID txnId, Position 
position) {
 .checkAbortedTransaction(txnId)) {
 ongoingTxns.put(txnId, (PositionImpl) position);
 PositionImpl firstPosition = 
ongoingTxns.get(ongoingTxns.firstKey());
-//max read position is less than first ongoing transaction message 
position, so entryId -1
-maxReadPosition = PositionImpl.get(firstPosition.getLedgerId(), 
firstPosition.getEntryId() - 1);
+// max read position is less than first ongoing transaction 
message position
+maxReadPosition = getPreviousPosition(firstPosition);

Review Comment:
   it makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Do not try to open ML when the topic meta does not exist and do not expect to create a new one. #21995 [pulsar]

2024-02-20 Thread via GitHub


dao-jun commented on PR #22004:
URL: https://github.com/apache/pulsar/pull/22004#issuecomment-1955928386

   @poorbarcode there is a test keeps failing, could you please take a check


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][pip] PIP-336: WASM Function API [pulsar]

2024-02-20 Thread via GitHub


RobertIndie commented on code in PR #21992:
URL: https://github.com/apache/pulsar/pull/21992#discussion_r1496859025


##
pip/pip-331.md:
##
@@ -0,0 +1,129 @@
+# PIP-331: WASM Function API
+
+# Background knowledge
+
+WASM(WebAssembly) bytecode is designed to be encoded in a size- and 
load-time-efficient binary format. WASM aims to leverage the common hardware 
features available on various platforms to execute in browsers at machine code 
speed.
+
+WASI(WebAssembly System Interface) provide a portable interface for 
applications that run within a constrained sandbox environment, which allows 
WASM to run in non browser environments such as Linux. It's portable and secure.
+
+# Motivation
+
+The server and client sides of the Pulsar function use protobuf for 
decoupling. In principle, the language supported by protobuf can be supported 
by the pulsar function, now Pulsar provided the java, python and golang 
function client, but there are still many languages that are not supported.
+
+Before all language adaptations are completed (and it's almost entirely 
certain to be impossible), users cannot write pulsar function in their familiar 
languages.
+
+# Goals
+
+## In Scope
+
+Other languages, as long as their code can be compiled into WASM bytecode 
(such as Rust/golang/C++), users can use these languages to write pulsar 
function.
+
+## Out of Scope
+
+All existing abilities of the Java pulsar function client are not 
reimplemented, the WASM Pulsar functions is under the Java Pulsar functions.
+
+Due to the strict requirements of WASM on parameter types and for simplicity 
reasons, types other than `java.lang.Long` are not used as parameters or return 
value.
+
+# High Level Design
+
+```mermaid 
+flowchart LR;
+
+subgraph develop
+direction TB
+SourceCode ==> |"CompileToWASM"| WasmFile ==> |"RenameFile"| 
MoveToTheResourceDirectory ==> UnitTest
+end
+
+subgraph runtime
+direction TB
+PulsarFunctionJava ==> |"LoadFromResource"| TheWasmFile ==> |"Invoke"| 
TheSourceCode
+end
+
+develop --> runtime
+```
+
+# Detailed Design
+
+## Design & Implementation Details
+
+1. add `WasmLoader` to load WASM file and provide the WASM function to java, 
also provide the java function to WASM if we need.
+
+2. add `AbstractWasmFunction` and `AbstractWasmWindowFunction` as the core 
interface of the WASM function api.
+
+```java
+public abstract class AbstractWasmFunction extends WasmLoader implements 
Function {
+
+private static final String PROCESS_METHOD_NAME = "process";
+
+protected static final String INITIALIZE_METHOD_NAME = "initialize";
+
+protected static final String CLOSE_METHOD_NAME = "close";
+
+protected static final Map> ARGUMENTS = new 
ConcurrentHashMap<>();
+
+@Override
+public T process(X input, Context context) {
+return super.getWasmExtern(PROCESS_METHOD_NAME)
+.map(process -> {
+Long argumentId = callWASI(input, context, process);
+return doProcess(input, context, argumentId);
+})
+.orElseThrow(() -> new PulsarWasmException(
+PROCESS_METHOD_NAME + " function not found in " + 
super.getWasmName()));
+}
+
+private Long callWASI(X input,
+  Context context,
+  Extern process) {
+// call WASI function
+final Long argumentId = getArgumentId(input, context);
+ARGUMENTS.put(argumentId, new Argument<>(input, context));
+// WASI cannot easily pass Java objects like JNI, here we pass Long
+// then we can get the argument by Long
+WasmFunctions.consumer(super.getStore(), process.func(), 
WasmValType.I64)
+.accept(argumentId);
+ARGUMENTS.remove(argumentId);
+return argumentId;
+}
+
+protected abstract T doProcess(X input, Context context, Long argumentId);
+
+protected abstract Long getArgumentId(X input, Context context);
+
+@Override
+public void initialize(Context context) {
+super.getWasmExtern(INITIALIZE_METHOD_NAME)
+.ifPresent(initialize -> callWASI(null, context, initialize));
+}
+
+@Override
+public void close() {
+super.getWasmExtern(CLOSE_METHOD_NAME)
+.ifPresent(close -> callWASI(null, null, close));
+super.close();
+}
+
+protected static class Argument {
+protected X input;
+protected Context context;
+
+private Argument(X input, Context context) {
+this.input = input;
+this.context = context;
+}
+}
+}
+```
+
+More detailed code implementation and test can be found in 
[here](https://github.com/apache/pulsar/pull/21975)

Review Comment:
   It's better to extract more design details from the code implementation to 
this proposal.



-- 
This is an automated message from the Apache Git Service.
To 

(pulsar-client-cpp) branch main updated: [feat] PIP-188 Support blue-green migration (#402)

2024-02-20 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
 new 543e51c  [feat] PIP-188 Support blue-green migration (#402)
543e51c is described below

commit 543e51c7ecd842056f93859defd23b851bfe842e
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Tue Feb 20 19:02:31 2024 -0800

[feat] PIP-188 Support blue-green migration (#402)

### Motivation

Support blue-green migration pip-188 for cpp client

### Modifications

- added blue-green client logic
- register the producer instance in the producers map before sending 
produce creation command. This is required since broker could send topic 
migration command in the middle of creating the producer.
---
 lib/BinaryProtoLookupService.cc|   1 -
 lib/BinaryProtoLookupService.h |   8 +-
 lib/ClientConnection.cc|  57 ++-
 lib/ClientConnection.h |   9 +-
 lib/ClientImpl.cc  |  56 +--
 lib/ClientImpl.h   |  15 +-
 lib/HTTPLookupService.cc   |   4 +-
 lib/HTTPLookupService.h|   6 +-
 lib/HandlerBase.cc |  16 +-
 lib/HandlerBase.h  |   4 +
 lib/LookupService.h|   3 +
 lib/ProducerImpl.cc|   5 +-
 lib/RetryableLookupService.h   |   4 +
 lib/ServiceNameResolver.h  |  15 +-
 proto/PulsarApi.proto  |  16 ++
 run-unit-tests.sh  |   3 +
 tests/ClientTest.cc|  17 ++-
 tests/LookupServiceTest.cc |  32 ++--
 tests/MockClientImpl.h |   6 +-
 tests/PulsarFriend.h   |  12 +-
 .../docker-compose.yml |  40 ++---
 tests/extensibleLM/ExtensibleLoadManagerTest.cc| 169 ++---
 tests/extensibleLM/docker-compose.yml  |   4 +
 23 files changed, 356 insertions(+), 146 deletions(-)

diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc
index 2d9ffc4..489d8a2 100644
--- a/lib/BinaryProtoLookupService.cc
+++ b/lib/BinaryProtoLookupService.cc
@@ -22,7 +22,6 @@
 #include "ConnectionPool.h"
 #include "LogUtils.h"
 #include "NamespaceName.h"
-#include "ServiceNameResolver.h"
 #include "TopicName.h"
 
 DECLARE_LOG_OBJECT()
diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h
index a3c059e..6132825 100644
--- a/lib/BinaryProtoLookupService.h
+++ b/lib/BinaryProtoLookupService.h
@@ -38,9 +38,9 @@ using GetSchemaPromisePtr = std::shared_ptr>;
 
 class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
public:
-BinaryProtoLookupService(ServiceNameResolver& serviceNameResolver, 
ConnectionPool& pool,
+BinaryProtoLookupService(const std::string& serviceUrl, ConnectionPool& 
pool,
  const ClientConfiguration& clientConfiguration)
-: serviceNameResolver_(serviceNameResolver),
+: serviceNameResolver_(serviceUrl),
   cnxPool_(pool),
   listenerName_(clientConfiguration.getListenerName()),
   maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()) {}
@@ -54,6 +54,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public 
LookupService {
 
 Future getSchema(const TopicNamePtr& topicName, const 
std::string& version) override;
 
+ServiceNameResolver& getServiceNameResolver() override { return 
serviceNameResolver_; }
+
protected:
 // Mark findBroker as protected to make it accessible from test.
 LookupResultFuture findBroker(const std::string& address, bool 
authoritative, const std::string& topic,
@@ -63,7 +65,7 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public 
LookupService {
 std::mutex mutex_;
 uint64_t requestIdGenerator_ = 0;
 
-ServiceNameResolver& serviceNameResolver_;
+ServiceNameResolver serviceNameResolver_;
 ConnectionPool& cnxPool_;
 std::string listenerName_;
 const int32_t maxLookupRedirects_;
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 00041b2..0beb739 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -403,7 +403,8 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& 
err, tcp::resolver::
 LOG_INFO(cnxString_ << "Connected to broker");
 } else {
 LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical 
broker: " << logicalAddress_
-<< ", proxy: " << proxyServiceUrl_);
+ 

Re: [PR] [feat] PIP-188 Support blue-green migration [pulsar-client-cpp]

2024-02-20 Thread via GitHub


BewareMyPower merged PR #402:
URL: https://github.com/apache/pulsar-client-cpp/pull/402


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] Cannot determine whether the message is a duplicate at this time [pulsar]

2024-02-20 Thread via GitHub


graysonzeng commented on issue #21892:
URL: https://github.com/apache/pulsar/issues/21892#issuecomment-1955780645

   @lhotari I haven't tested it, but I think it's not reproducible without 
brokerEntryMetadataInterceptors.  Currently it seems that CompositeByteBuf will 
be generated in addBrokerEntryMetadata only when 
brokerEntryMetadataInterceptors are enabled.
   
   
https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L1723


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch dependabot/maven/org.apache.commons-commons-compress-1.26.0 deleted (was bebe136c8e5)

2024-02-20 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/org.apache.commons-commons-compress-1.26.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


 was bebe136c8e5 Bump org.apache.commons:commons-compress from 1.21 to 
1.26.0

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



Re: [PR] Bump org.apache.commons:commons-compress from 1.21 to 1.26.0 [pulsar]

2024-02-20 Thread via GitHub


massakam closed pull request #22084: Bump org.apache.commons:commons-compress 
from 1.21 to 1.26.0
URL: https://github.com/apache/pulsar/pull/22084


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bump org.apache.commons:commons-compress from 1.21 to 1.26.0 [pulsar]

2024-02-20 Thread via GitHub


dependabot[bot] commented on PR #22084:
URL: https://github.com/apache/pulsar/pull/22084#issuecomment-1955734863

   OK, I won't notify you again about this release, but will get in touch when 
a new version is available. If you'd rather skip all updates until the next 
major or minor version, let me know by commenting `@dependabot ignore this 
major version` or `@dependabot ignore this minor version`.
   
   If you change your mind, just re-open this PR and I'll resolve any conflicts 
on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bump org.apache.commons:commons-compress from 1.21 to 1.26.0 [pulsar]

2024-02-20 Thread via GitHub


massakam commented on PR #22084:
URL: https://github.com/apache/pulsar/pull/22084#issuecomment-1955734766

   I created another PR because the license files needed to be updated.
   https://github.com/apache/pulsar/pull/22086


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][sec] Upgrade commons-compress to 1.26.0 [pulsar]

2024-02-20 Thread via GitHub


massakam opened a new pull request, #22086:
URL: https://github.com/apache/pulsar/pull/22086

   ### Motivation
   
   commons-compress 1.21 has the following vulnerability and should be upgraded 
to 1.26.0.
   https://nvd.nist.gov/vuln/detail/CVE-2024-25710
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][client] fixed getProxyConnection when the topic is migrated [pulsar]

2024-02-20 Thread via GitHub


heesung-sn opened a new pull request, #22085:
URL: https://github.com/apache/pulsar/pull/22085

   
   
   PIP: https://github.com/apache/pulsar/pull/20748
   
   
   
   ### Motivation
   
   
   
   When a topic is migrated to a cluster with a proxy, it needs to use the 
proxy in the new cluster.
   
   ### Modifications
   
   - updated getProxyConnection func to select lookup service based on 
redirectedClusterURI.
   
   
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]

2024-02-20 Thread via GitHub


dragosvictor commented on code in PR #22058:
URL: https://github.com/apache/pulsar/pull/22058#discussion_r1496703560


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##
@@ -241,8 +243,15 @@ public class BrokerService implements Closeable {
 protected final AtomicReference lookupRequestSemaphore;
 protected final AtomicReference topicLoadRequestSemaphore;
 
+@PulsarDeprecatedMetric(newMetricName = 
"pulsar.broker.lookup.pending.request.usage")
 private final ObserverGauge pendingLookupRequests;
+private final ObservableLongGauge pendingLookupRequestsCounter;
+private final ObservableLongGauge pendingLookupRequestsLimit;
+
+@PulsarDeprecatedMetric(newMetricName = 
"pulsar.broker.topic.load.pending.request.usage")
 private final ObserverGauge pendingTopicLoadRequests;
+private final ObservableLongGauge pendingTopicLoadRequestsCounter;

Review Comment:
   Kept to using a counter for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bump org.apache.commons:commons-compress from 1.21 to 1.26.0 [pulsar]

2024-02-20 Thread via GitHub


github-actions[bot] commented on PR #22084:
URL: https://github.com/apache/pulsar/pull/22084#issuecomment-1955423832

   @dependabot[bot] Please add the following content to your PR description and 
select a checkbox:
   ```
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch dependabot/maven/org.apache.commons-commons-compress-1.26.0 created (now bebe136c8e5)

2024-02-20 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/maven/org.apache.commons-commons-compress-1.26.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


  at bebe136c8e5 Bump org.apache.commons:commons-compress from 1.21 to 
1.26.0

No new revisions were added by this update.



[PR] Bump org.apache.commons:commons-compress from 1.21 to 1.26.0 [pulsar]

2024-02-20 Thread via GitHub


dependabot[bot] opened a new pull request, #22084:
URL: https://github.com/apache/pulsar/pull/22084

   Bumps org.apache.commons:commons-compress from 1.21 to 1.26.0.
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=org.apache.commons:commons-compress=maven=1.21=1.26.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it yourself. You can also trigger a rebase manually by commenting 
`@dependabot rebase`.
   
   [//]: # (dependabot-automerge-start)
   [//]: # (dependabot-automerge-end)
   
   ---
   
   
   Dependabot commands and options
   
   
   You can trigger Dependabot actions by commenting on this PR:
   - `@dependabot rebase` will rebase this PR
   - `@dependabot recreate` will recreate this PR, overwriting any edits that 
have been made to it
   - `@dependabot merge` will merge this PR after your CI passes on it
   - `@dependabot squash and merge` will squash and merge this PR after your CI 
passes on it
   - `@dependabot cancel merge` will cancel a previously requested merge and 
block automerging
   - `@dependabot reopen` will reopen this PR if it is closed
   - `@dependabot close` will close this PR and stop Dependabot recreating it. 
You can achieve the same result by closing it manually
   - `@dependabot show  ignore conditions` will show all of 
the ignore conditions of the specified dependency
   - `@dependabot ignore this major version` will close this PR and stop 
Dependabot creating any more for this major version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this minor version` will close this PR and stop 
Dependabot creating any more for this minor version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this dependency` will close this PR and stop 
Dependabot creating any more for this dependency (unless you reopen the PR or 
upgrade to it yourself)
   You can disable automated security fix PRs for this repo from the [Security 
Alerts page](https://github.com/apache/pulsar/network/alerts).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Bump ip from 2.0.0 to 2.0.1 [pulsar-site]

2024-02-20 Thread via GitHub


github-actions[bot] commented on PR #793:
URL: https://github.com/apache/pulsar-site/pull/793#issuecomment-1955226974

   @dependabot[bot] Please add the following content to your PR description and 
select a checkbox:
   ```
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-site) branch dependabot/npm_and_yarn/ip-2.0.1 created (now 1e96b6d397f4)

2024-02-20 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch dependabot/npm_and_yarn/ip-2.0.1
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


  at 1e96b6d397f4 Bump ip from 2.0.0 to 2.0.1

No new revisions were added by this update.



[PR] Bump ip from 2.0.0 to 2.0.1 [pulsar-site]

2024-02-20 Thread via GitHub


dependabot[bot] opened a new pull request, #793:
URL: https://github.com/apache/pulsar-site/pull/793

   Bumps [ip](https://github.com/indutny/node-ip) from 2.0.0 to 2.0.1.
   
   Commits
   
   https://github.com/indutny/node-ip/commit/3b0994a74eca51df01f08c40d6a65ba0e1845d04;>3b0994a
 2.0.1
   https://github.com/indutny/node-ip/commit/32f468f1245574785ec080705737a579be1223aa;>32f468f
 lib: fixed CVE-2023-42282 and added unit test
   See full diff in https://github.com/indutny/node-ip/compare/v2.0.0...v2.0.1;>compare 
view
   
   
   
   
   
   [![Dependabot compatibility 
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=ip=npm_and_yarn=2.0.0=2.0.1)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
   
   Dependabot will resolve any conflicts with this PR as long as you don't 
alter it yourself. You can also trigger a rebase manually by commenting 
`@dependabot rebase`.
   
   [//]: # (dependabot-automerge-start)
   [//]: # (dependabot-automerge-end)
   
   ---
   
   
   Dependabot commands and options
   
   
   You can trigger Dependabot actions by commenting on this PR:
   - `@dependabot rebase` will rebase this PR
   - `@dependabot recreate` will recreate this PR, overwriting any edits that 
have been made to it
   - `@dependabot merge` will merge this PR after your CI passes on it
   - `@dependabot squash and merge` will squash and merge this PR after your CI 
passes on it
   - `@dependabot cancel merge` will cancel a previously requested merge and 
block automerging
   - `@dependabot reopen` will reopen this PR if it is closed
   - `@dependabot close` will close this PR and stop Dependabot recreating it. 
You can achieve the same result by closing it manually
   - `@dependabot show  ignore conditions` will show all of 
the ignore conditions of the specified dependency
   - `@dependabot ignore this major version` will close this PR and stop 
Dependabot creating any more for this major version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this minor version` will close this PR and stop 
Dependabot creating any more for this minor version (unless you reopen the PR 
or upgrade to it yourself)
   - `@dependabot ignore this dependency` will close this PR and stop 
Dependabot creating any more for this dependency (unless you reopen the PR or 
upgrade to it yourself)
   You can disable automated security fix PRs for this repo from the [Security 
Alerts page](https://github.com/apache/pulsar-site/network/alerts).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Flaky-test: ExtensibleLoadManagerImplTest.testGetMetrics [pulsar]

2024-02-20 Thread via GitHub


MMirelli commented on issue #21556:
URL: https://github.com/apache/pulsar/issues/21556#issuecomment-1955127796

   After some troubleshooting ([failure*.txt 
reproduce](https://github.com/MMirelli/pulsar-flake-troubleshooter/tree/9e7b9e2d33d9283dded0f63653d08a92902dbe09/fix-21556/data)
 the issue), the test seems to have some concurrency issues. From the runs I 
did I could gather the following stats:
   ```
   | Metric  | Runs| Counter |
   |:---:|:---:|:---:|
   | brk_sunit_state_chn_subscribe_ops_total | 1,10| handlerCounters |
   | brk_sunit_state_chn_owner_lookup_total  | 2,3,6,9 | ownerLookUpCounters |
   | brk_lb_assign_broker_breakdown_total| 4,5,7,8 | breakdownCounters   |
   ```
   Where, for example, the first row reads: runs 1 and 10 showed an issue with 
metric named `brk_sunit_state_chn_subscribe_ops_total`, which is stored in 
`handlerCounters`. It seems that most of the issues might be caused by the fact 
that we are not using concurrent collections and / or lack of synchronization. 
As they are metrics we may anyway want to skip synchronization, so I am unsure 
whether it makes sense to put the time to fix (all) these issues.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]

2024-02-20 Thread via GitHub


dragosvictor commented on code in PR #22058:
URL: https://github.com/apache/pulsar/pull/22058#discussion_r1496530720


##
pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java:
##
@@ -178,6 +188,49 @@ public void testMultipleBrokerLookup() throws Exception {
 
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
 loadManagerField.set(pulsar.getNamespaceService(), new 
AtomicReference<>(loadManager1));
 
+var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();

Review Comment:
   > I like the `CountDownLatch` recommendation, I agree it will improve 
readability here.
   
   Just realized that this wouldn't work either, since we want to validate that 
the metric is only updated once the semaphore releases a permit. It cannot be 
intercepted on the main testing thread either, since it's happening during the 
direct call to `Consumer.subscribe` within the test. We'd end up with less 
readable code if we were to pursue that idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]

2024-02-20 Thread via GitHub


dragosvictor commented on code in PR #22058:
URL: https://github.com/apache/pulsar/pull/22058#discussion_r1496530720


##
pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java:
##
@@ -178,6 +188,49 @@ public void testMultipleBrokerLookup() throws Exception {
 
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
 loadManagerField.set(pulsar.getNamespaceService(), new 
AtomicReference<>(loadManager1));
 
+var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();

Review Comment:
   > I like the `CountDownLatch` recommendation, I agree it will improve 
readability here.
   
   Just realized that this wouldn't work either, since we want to validate that 
the metric is only updated once we semaphore releases a permit. It cannot be 
intercepted on the main testing thread either, since it's happening during the 
direct call to `Consumer.subscribe` within the test. We'd end up with less 
readable code if we were to pursue that idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]

2024-02-20 Thread via GitHub


dragosvictor commented on code in PR #22058:
URL: https://github.com/apache/pulsar/pull/22058#discussion_r1496322969


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##
@@ -205,16 +236,21 @@ public CompletableFuture> 
getBrokerServiceUrlAsync(TopicN
 });
 
 future.thenAccept(optResult -> {
-lookupLatency.observe(System.nanoTime() - startTime, 
TimeUnit.NANOSECONDS);
+var latencyNs = System.nanoTime() - startTime;
+lookupLatency.observe(latencyNs, TimeUnit.NANOSECONDS);
+
lookupLatencyHistogram.record(MetricsUtil.convertToSeconds(latencyNs, 
TimeUnit.NANOSECONDS));

Review Comment:
   The problem is that they'd end up being 0 most of the time. If we're to 
standardize on seconds being the unit of time (as OpenTelemetry recommends, and 
I think we should), these numbers must be doubles, but `TimeUnit.toSeconds` 
'speaks' `long` only, and we'd lose virtually all precision when the numbers 
are small enough. 
   
   I'll add a comment to the utility class to explain this decision.



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##
@@ -312,6 +313,8 @@ public PulsarService(ServiceConfiguration config,
 TransactionBatchedWriteValidator.validate(config);
 this.config = config;
 
+this.openTelemetry = new PulsarBrokerOpenTelemetry(config);

Review Comment:
   I would, but it's being replaced in the tests: 
https://github.com/apache/pulsar/pull/22058/files#diff-f6308e8022fc0713d66fa883d166e7b9189865b3c8ff9b79aaccaf97d69f2f25R56-R57.



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##
@@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup 
eventLoopGroup) throws
 this.defaultServerBootstrap = defaultServerBootstrap();
 
 this.pendingLookupRequests = 
ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
-.supplier(() -> 
pulsar.getConfig().getMaxConcurrentLookupRequest()
-- lookupRequestSemaphore.get().availablePermits())
+.supplier(this::getPendingLookupRequest)
 .register();
+this.pendingLookupRequestsCounter = 
pulsar.getOpenTelemetry().getMeter()
+.gaugeBuilder("pulsar.broker.lookup.pending.request.usage")
+.ofLongs()
+.setDescription("The number of pending lookup requests in the 
broker. "
++ "When it reaches threshold 
\"maxConcurrentLookupRequest\" defined in broker.conf, "
++ "new requests are rejected.")
+.buildWithCallback(measurement -> 
measurement.record(getPendingLookupRequest()));
+this.pendingLookupRequestsLimit = pulsar.getOpenTelemetry().getMeter()
+.gaugeBuilder("pulsar.broker.lookup.pending.request.limit")
+.ofLongs()
+.setDescription("The maximum number of pending lookup requests 
in the broker. "
++ "Equal to \"maxConcurrentLookupRequest\" defined in 
broker.conf.")
+.buildWithCallback(
+measurement -> 
measurement.record(pulsar.getConfig().getMaxConcurrentLookupRequest()));
 
 this.pendingTopicLoadRequests = ObserverGauge.build(
-"pulsar_broker_topic_load_pending_requests", "-")
-.supplier(() -> 
pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
-- topicLoadRequestSemaphore.get().availablePermits())
+"pulsar_broker_topic_load_pending_requests", "-")
+.supplier(this::getPendingTopicLoadRequests)
 .register();
+this.pendingTopicLoadRequestsCounter = 
pulsar.getOpenTelemetry().getMeter()
+.gaugeBuilder("pulsar.broker.topic.load.pending.request.usage")

Review Comment:
   Name changes sound good!
   
   Note that the same can be said about the lookup request, as it is being used 
in requests other than topic lookup: 
[handlePartitionMetadataRequest](https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L579C20-L579C50)
 , 
[handleGetTopicsOfNamespace](https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L2391C20-L2391C46)
 and 
[handleCommandWatchTopicList](https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L3036C20-L3036C47).
 Perhaps we can rename it to 
`pulsar.broker.topic.lookup.operation.pending.[usage,limit]`?



##
pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java:
##
@@ 

[I] [Bug] Issue in using relational db mysql and postgres as source for pulsar io topics [pulsar]

2024-02-20 Thread via GitHub


abhinavisin opened a new issue, #22083:
URL: https://github.com/apache/pulsar/issues/22083

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   Ubuntu 18.04
   Pulsar 3.0.1
   Mysql - Server version: 5.7.25-log MySQL Community Server (GPL)
   Postgres - 14.10
   
   ### Minimal reproduce step
   
   Step 1. Follow the steps documented for bringing CDC connectors outlined in 
example 
   https://pulsar.apache.org/docs/3.0.x/io-cdc-debezium/
   Docker run mysql instance 
   Step 2. Start a standalone pulsar instance 
  bin/pulsar standalone
   Step 3. Start the Pulsar Debezium connector in local run mode 
  Attached yaml file (debezium connector config) and logs of 
execution 
   Step 4. No topics created for any table from "inventory" schema 
Only offset and history topics are created in
   
[pulsar-debezium-mysql.log](https://github.com/apache/pulsar/files/14351115/pulsar-debezium-mysql.log)
pulsar
   
[debezium-mysql-source-config.txt](https://github.com/apache/pulsar/files/14351137/debezium-mysql-source-config.txt)
   
   Above was repeated for postgres and same results. 
   **However if using pulsar 2.6.0 all topics and tables are created 
successfully.** 
   
   ### What did you expect to see?
   
   No topic created for any table of the mysql and postgres in pulsar 3.0.1.
   Only following topics created for mysql table- 
   persistent://public/default/offset-topic
   persistent://public/default/history-topic
   
   ### What did you see instead?
   
   Subscribe to the topic sub-products for the inventory.products table and get 
change data from relational db.
   
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][Offload] fix indexEntries NullPointerException error [pulsar]

2024-02-20 Thread via GitHub


lhotari commented on PR #22035:
URL: https://github.com/apache/pulsar/pull/22035#issuecomment-1955057667

   > > > > > The `close` method maybe be called more than once, so cause the 
issue. I think we can just let `indexEntries=null`, no need to `clear` it
   > > > > 
   > > > > 
   > > > > Wouldn't that be a problem if the object instance gets recycled 
multiple times?
   > > > 
   > > > 
   > > > maybe
   > > 
   > > There have been bugs in the past with recycled objects that are caused 
by releasing the object multiple times.
   > 
   > yes, but for this patch, it's ok to fix it like this, right?
   
   I doubt that it's correct. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Release 0.5.3] Update latest version in README.adoc to 0.5.3 [pulsar-client-reactive]

2024-02-20 Thread via GitHub


lhotari merged PR #162:
URL: https://github.com/apache/pulsar-client-reactive/pull/162


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-client-reactive) branch main updated: [Release 0.5.3] Update latest version in README.adoc to 0.5.3 (#162)

2024-02-20 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
 new 6221cec  [Release 0.5.3] Update latest version in README.adoc to 0.5.3 
(#162)
6221cec is described below

commit 6221cecc4a18619f869d65c3888f41184940a6f4
Author: Chris Bono 
AuthorDate: Tue Feb 20 14:22:09 2024 -0600

[Release 0.5.3] Update latest version in README.adoc to 0.5.3 (#162)

Co-authored-by: onobc 
---
 README.adoc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/README.adoc b/README.adoc
index 8cdc631..aa7f879 100644
--- a/README.adoc
+++ b/README.adoc
@@ -19,7 +19,7 @@
 = Reactive client for Apache Pulsar
 
 :github: https://github.com/apache/pulsar-client-reactive
-:latest_version: 0.5.2
+:latest_version: 0.5.3
 
 Reactive client for Apache Pulsar which is compatible with the Reactive 
Streams specification.
 This uses Project Reactor as the Reactive Streams implementation.



(pulsar-client-reactive) branch main updated: [Release 0.5.3] Update next snapshot version to 0.5.4-SNAPSHOT (#163)

2024-02-20 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


The following commit(s) were added to refs/heads/main by this push:
 new c39a5f8  [Release 0.5.3] Update next snapshot version to 
0.5.4-SNAPSHOT (#163)
c39a5f8 is described below

commit c39a5f8dd368daf0c99e06707a09c9777f67e47f
Author: Chris Bono 
AuthorDate: Tue Feb 20 13:34:27 2024 -0600

[Release 0.5.3] Update next snapshot version to 0.5.4-SNAPSHOT (#163)

Co-authored-by: onobc 
---
 gradle.properties | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/gradle.properties b/gradle.properties
index f08e395..d0e9728 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,4 +17,4 @@
 # under the License.
 #
 
-version=0.5.3
+version=0.5.4-SNAPSHOT



Re: [PR] [Release 0.5.3] Update next snapshot version to 0.5.4-SNAPSHOT [pulsar-client-reactive]

2024-02-20 Thread via GitHub


lhotari merged PR #163:
URL: https://github.com/apache/pulsar-client-reactive/pull/163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Set ServiceUnitStateChannel topic compaction threshold explicitly, improve getOwnerAsync, and fix other bugs [pulsar]

2024-02-20 Thread via GitHub


heesung-sn commented on code in PR #22064:
URL: https://github.com/apache/pulsar/pull/22064#discussion_r1496263801


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##
@@ -894,14 +920,20 @@ private List getIgnoredCommandMetrics(String 
advertisedBrokerAddress) {
 return List.of(metric);
 }
 
-private void monitor() {
+@VisibleForTesting
+protected void monitor() {
 try {
 initWaiter.await();
 
 // Monitor role
 // Periodically check the role in case ZK watcher fails.
 var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
 if (isChannelOwner) {
+// System topic config might fail due to the race condition
+// with topic policy init(Topic policies cache have not init).
+if (!configuredSystemTopics) {

Review Comment:
   > Why this change can fix this issue?
   
   I see that topic policy init failure `TopicPoliciesCacheNotInitException` in 
the below code when this LB is trying to set the compaction threshold. So, I 
think it will be fool-proof to retry setting compaction threshold policy upon 
such errors(until the policy cache is fully initialized).
   
   
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L234
   
   > Can you enable the topic-level policies on the integration test(
   
   Sure. I can do this.
   
   
   
   



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java:
##
@@ -150,9 +156,34 @@ public synchronized void init() throws IOException {
 start();
 }
 
-private synchronized void validateTableViewStart() {
-if (tableView == null) {
-throw new IllegalStateException("table view has not been started");
+private void validateProducer() {
+if (producer == null || !producer.isConnected()) {
+try {
+if (producer != null) {
+producer.close();
+}
+producer = null;
+startProducer();
+log.info("Restarted producer on {}", topic);
+} catch (Exception e) {
+log.error("Failed to restart producer on {}", topic, e);
+throw new RuntimeException(e);
+}
+}
+}
+
+private void validateTableView() {
+if (tableView == null || System.currentTimeMillis() - 
tableViewLastUpdateTimestamp
+> ((long) 
conf.getLoadBalancerReportUpdateMaxIntervalMinutes()) * 60 * 1000 * 2) {
+tableViewLastUpdateTimestamp = 0;
+try {
+closeTableView();
+startTableView();
+log.info("Restarted tableview on {}", topic);

Review Comment:
   updated.



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##
@@ -575,7 +601,7 @@ private CompletableFuture> 
dedupeLookupRequest(
 if (ex != null) {
 assignCounter.incrementFailure();
 }
-lookupRequests.remove(key, newFutureCreated.getValue());
+lookupRequests.remove(key);

Review Comment:
   I think this change is fool-proof in case the lookupRequests values are 
replaced just in case.



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java:
##
@@ -150,9 +156,34 @@ public synchronized void init() throws IOException {
 start();
 }
 
-private synchronized void validateTableViewStart() {
-if (tableView == null) {
-throw new IllegalStateException("table view has not been started");
+private void validateProducer() {
+if (producer == null || !producer.isConnected()) {
+try {
+if (producer != null) {
+producer.close();
+}
+producer = null;
+startProducer();
+log.info("Restarted producer on {}", topic);
+} catch (Exception e) {
+log.error("Failed to restart producer on {}", topic, e);
+throw new RuntimeException(e);
+}
+}
+}
+
+private void validateTableView() {
+if (tableView == null || System.currentTimeMillis() - 
tableViewLastUpdateTimestamp
+> ((long) 
conf.getLoadBalancerReportUpdateMaxIntervalMinutes()) * 60 * 1000 * 2) {

Review Comment:
   done.



##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##
@@ -304,8 +304,14 @@ private void readTailMessages(Reader reader) {
 log.error("Reader {} was 

Re: [PR] [fix][broker] Support running docker container with gid != 0 [pulsar]

2024-02-20 Thread via GitHub


codecov-commenter commented on PR #22081:
URL: https://github.com/apache/pulsar/pull/22081#issuecomment-1954783138

   ## 
[Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22081?src=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   All modified and coverable lines are covered by tests :white_check_mark:
   > Comparison is base 
[(`0b6bd70`)](https://app.codecov.io/gh/apache/pulsar/commit/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440?el=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 73.57% compared to head 
[(`912d41e`)](https://app.codecov.io/gh/apache/pulsar/pull/22081?src=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 73.65%.
   
   Additional details and impacted files
   
   
   [![Impacted file tree 
graph](https://app.codecov.io/gh/apache/pulsar/pull/22081/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22081?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #22081  +/-   ##
   
   + Coverage 73.57%   73.65%   +0.07% 
   - Complexity3255332583  +30 
   
 Files  1874 1874  
 Lines139252   139252  
 Branches  1526015260  
   
   + Hits 102451   102560 +109 
   + Misses2887728782  -95 
   + Partials   7924 7910  -14 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pulsar/pull/22081/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[inttests](https://app.codecov.io/gh/apache/pulsar/pull/22081/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `24.67% <ø> (-0.16%)` | :arrow_down: |
   | 
[systests](https://app.codecov.io/gh/apache/pulsar/pull/22081/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `24.42% <ø> (-0.04%)` | :arrow_down: |
   | 
[unittests](https://app.codecov.io/gh/apache/pulsar/pull/22081/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `72.90% <ø> (+0.07%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   [see 80 files with indirect coverage 
changes](https://app.codecov.io/gh/apache/pulsar/pull/22081/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [Release 0.5.3] Update next snapshot version to 0.5.4-SNAPSHOT [pulsar-client-reactive]

2024-02-20 Thread via GitHub


onobc opened a new pull request, #163:
URL: https://github.com/apache/pulsar-client-reactive/pull/163

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] Expire messages according to ledger close time to avoid client clock skew [pulsar]

2024-02-20 Thread via GitHub


dao-jun commented on PR #21940:
URL: https://github.com/apache/pulsar/pull/21940#issuecomment-1954763943

   seems there is a test keeps failing, please check


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [Release 0.5.3] Update latest version in README.adoc to 0.5.3 [pulsar-client-reactive]

2024-02-20 Thread via GitHub


onobc opened a new pull request, #162:
URL: https://github.com/apache/pulsar-client-reactive/pull/162

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][Offload] fix indexEntries NullPointerException error [pulsar]

2024-02-20 Thread via GitHub


Technoboy- commented on PR #22035:
URL: https://github.com/apache/pulsar/pull/22035#issuecomment-1954678062

   > > > > The `close` method maybe be called more than once, so cause the 
issue. I think we can just let `indexEntries=null`, no need to `clear` it
   > > > 
   > > > 
   > > > Wouldn't that be a problem if the object instance gets recycled 
multiple times?
   > > 
   > > 
   > > maybe
   > 
   > There have been bugs in the past with recycled objects that are caused by 
releasing the object multiple times.
   
   yes, but for this patch, it's ok to fix it like this, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]

2024-02-20 Thread via GitHub


Technoboy- commented on code in PR #22058:
URL: https://github.com/apache/pulsar/pull/22058#discussion_r1496197624


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##
@@ -205,16 +236,21 @@ public CompletableFuture> 
getBrokerServiceUrlAsync(TopicN
 });
 
 future.thenAccept(optResult -> {
-lookupLatency.observe(System.nanoTime() - startTime, 
TimeUnit.NANOSECONDS);
+var latencyNs = System.nanoTime() - startTime;
+lookupLatency.observe(latencyNs, TimeUnit.NANOSECONDS);
+
lookupLatencyHistogram.record(MetricsUtil.convertToSeconds(latencyNs, 
TimeUnit.NANOSECONDS));

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Sync data/team.js with https://whimsy.apache.org/roster/committee/pulsar.json [pulsar-site]

2024-02-20 Thread via GitHub


lhotari merged PR #792:
URL: https://github.com/apache/pulsar-site/pull/792


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-site) branch main updated: Sync data/team.js with https://whimsy.apache.org/roster/committee/pulsar.json (#792)

2024-02-20 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
 new aaaffa8d6741 Sync data/team.js with 
https://whimsy.apache.org/roster/committee/pulsar.json (#792)
aaaffa8d6741 is described below

commit aaaffa8d674107021fd7f8bea7c880ea69f074fd
Author: Lari Hotari 
AuthorDate: Tue Feb 20 18:40:05 2024 +0200

Sync data/team.js with 
https://whimsy.apache.org/roster/committee/pulsar.json (#792)
---
 data/team.js | 12 
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/data/team.js b/data/team.js
index ed9ffe074b9b..acd41b2f8c50 100644
--- a/data/team.js
+++ b/data/team.js
@@ -20,6 +20,10 @@ module.exports = {
   "name": "Hang Chen",
   "apacheId": "chenhang"
 },
+{
+  "name": "David Jensen",
+  "apacheId": "djensen"
+},
 {
   "name": "Enrico Olivelli",
   "apacheId": "eolivelli"
@@ -174,6 +178,10 @@ module.exports = {
   "name": "Aloys Zhang",
   "apacheId": "aloyszhang"
 },
+{
+  "name": "Asaf Mesika",
+  "apacheId": "amesika"
+},
 {
   "name": "Andrey Yegorov",
   "apacheId": "ayegorov"
@@ -202,10 +210,6 @@ module.exports = {
   "name": "Dezhi Liu",
   "apacheId": "dezhiliu"
 },
-{
-  "name": "David Jensen",
-  "apacheId": "djensen"
-},
 {
   "name": "Yuri Mizushima",
   "apacheId": "equanz"



[PR] Sync data/team.js with https://whimsy.apache.org/roster/committee/pulsar.json [pulsar-site]

2024-02-20 Thread via GitHub


lhotari opened a new pull request, #792:
URL: https://github.com/apache/pulsar-site/pull/792

   Process used to update `data/team.js` file:
   1. Logged in to https://whimsy.apache.org/roster/committee/pulsar with 
browser
   2. Appended ".json" to URL so that browser goes to 
https://whimsy.apache.org/roster/committee/pulsar.json
   3. Clicked "Save as..." and stored the JSON as ~/Downloads/pulsar.json
   4. Ran this command in a bash shell: `{ echo -n "module.exports = " && cat 
~/Downloads/pulsar.json | jq '{"pmc": [.roster| to_entries | sort_by(.key) | 
.[] | select(.value.role|startswith("PMC")) | {"name":.value.name, "apacheId": 
.key}], "committers": [.roster| to_entries | sort_by(.key) | .[] | 
select(.value.role=="Committer") | {"name":.value.name, "apacheId": .key}]}' } 
| perl -pe 's/$/;\n/ if eof' > data/team.js`
   
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][broker] Support running docker container with gid != 0 [pulsar]

2024-02-20 Thread via GitHub


lhotari opened a new pull request, #22081:
URL: https://github.com/apache/pulsar/pull/22081

   ### Motivation
   
   Currently Pulsar's docker image must be run with gid=0. There are 
environments where the group id 0 is prohibited by default. One example is 
Tanzu Kubernetes Grid <=1.24 where a default Pod Security Policy called 
`vmware-system-restricted` is used. That PSP contains this type of rule:
   
   ```yaml
   supplementalGroups:
 rule: MustRunAs
 ranges:
   - min: 1
 max: 65535
   runAsUser:
 rule: MustRunAsNonRoot
   fsGroup:
 rule: MustRunAs
 ranges:
   - min: 1
 max: 65535
   ```
   
   In this case, it's not possible to use Pulsar's docker image since Pulsar 
needs write access to a few directories.
   
   ### Modifications
   
   change the owner of the writable directories to user id 1.
   
   This will allow Tanzu to work with this type of securityContext for each 
Pulsar component (Broker, Zookeeper, Bookkeeper)
   ```
 securityContext:
   runAsNonRoot: true
   runAsGroup: 10001
   fsGroup: 10001
   runAsUser: 1
   ```
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



svn commit: r67458 - /dev/pulsar/pulsar-client-reactive-0.5.3-candidate-1/ /release/pulsar/pulsar-client-reactive-0.5.3/

2024-02-20 Thread lhotari
Author: lhotari
Date: Tue Feb 20 15:35:32 2024
New Revision: 67458

Log:
Release Reactive client for Apache Pulsar 0.5.3

Added:
release/pulsar/pulsar-client-reactive-0.5.3/
  - copied from r67457, dev/pulsar/pulsar-client-reactive-0.5.3-candidate-1/
Removed:
dev/pulsar/pulsar-client-reactive-0.5.3-candidate-1/



(pulsar-client-reactive) annotated tag v0.5.3 updated (8c8d087 -> 1d8c34e)

2024-02-20 Thread onobc
This is an automated email from the ASF dual-hosted git repository.

onobc pushed a change to annotated tag v0.5.3
in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git


*** WARNING: tag v0.5.3 was modified! ***

from 8c8d087  (commit)
  to 1d8c34e  (tag)
 tagging 8c8d087abaa04c413150d6931451996e9fad8c2c (commit)
 replaces v0.5.2
  by onobc
  on Tue Feb 20 09:15:29 2024 -0600

- Log -
Release v0.5.3
-BEGIN PGP SIGNATURE-

iQJFBAABCgAvFiEEuMw+Nf/n/RvndvFepQBgfx6cBFcFAmXUwhERHG9ub2JjQGFw
YWNoZS5vcmcACgkQpQBgfx6cBFdNnRAAiAc3jDGamVvZ4Z4BNDMph3Q298wzzVZ+
vIhHXb1PUy6aOg6dKOVWJIA6xvVYMf/YumhRZzwDESFtodSJDvjXvGwG7cjUQaN3
Gw3vNn6dKckxRYLVatl+q1bElDT6kn7SxyY5JnkqzllnBPBboyfpLEdjU4GRsRqh
a0JbXLOhWoEwPu+djxbnZiMV9rmnPH4wDHVaJuDP/T3qt9/GXuJNPXjVr5PqT4Yz
zmtCKlahsLZ97VjqyQov6a7TDbtxgRzy79FxF4B1a3ICUlzmElR9UcY/7XCorPEv
hDMFbQu+xBXQ12pMhW+r2K8hJph4ODZpcCkg4nQ7jEw/eh1QWxCsy3bXqfwnMcJB
85mBAEkfwymF4kr1U+8SlXFZqNAU0VFJvFdg6hW8ZmyhAZsFmWp+lBE8QFrpGQSz
CXb98ei3mvK/+KheOWxYVxcZgSMHGWxf0fZME5CLt6t8dTbAkbR8mkXBaiL3pKU9
smP5SsKPbhu35z1iya9F5eON0M6w9IYVs4PYG0eiL4jv6cWSVPR+ADMO83mdcTWX
H/LQUZoQJib99isx0jjNiFyEznb/Mp1k2ycEv0Ocp/H3B5hx2rIHsbJqjvFV+ivh
p8uNfDpF2egST3aw0erfdj5wX9UD3mqwKhjNNeV1Yjxfxh47mW8cXwc3XDaJ2U+w
3kOYJWXDaYs=
=3qFw
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



[I] [Question] AWS Lambda [pulsar-client-node]

2024-02-20 Thread via GitHub


jonasgroendahl opened a new issue, #367:
URL: https://github.com/apache/pulsar-client-node/issues/367

   Has anyone managed to get this working in an AWS lambda environment? Perhaps 
as a AWS lambda layer?
   
   I'm very curious since I'm struggling at the moment how to approach this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] fix broker unackmessages become a negative number [pulsar]

2024-02-20 Thread via GitHub


poorbarcode commented on PR #17003:
URL: https://github.com/apache/pulsar/pull/17003#issuecomment-1954332047

   Just explain the issue that the current PR fixed
   - the issue may lead `unackedMessages` of the consumer being larger than the 
correct value.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] Expire messages according to ledger close time to avoid client clock skew [pulsar]

2024-02-20 Thread via GitHub


codelipenghui commented on code in PR #21940:
URL: https://github.com/apache/pulsar/pull/21940#discussion_r1495854131


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##
@@ -81,9 +81,9 @@ public boolean expireMessages(int messageTTLInSeconds) {
 if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) 
{
 log.info("[{}][{}] Starting message expiry check, ttl= {} 
seconds", topicName, subName,
 messageTTLInSeconds);
-if (checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds)) {
-return true;
-}
+// First filter the entire Ledger reached TTL based on the Ledger 
closing time to avoid client clock skew
+checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds);

Review Comment:
   It looks like the returned value is not used by any places. Do we need to 
change it to `void`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] Update release docker image in release process [pulsar-site]

2024-02-20 Thread via GitHub


RobertIndie merged PR #745:
URL: https://github.com/apache/pulsar-site/pull/745


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-site) branch main updated: [fix] Update release docker image in release process (#745)

2024-02-20 Thread zike
This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
 new af2fdb6a147c [fix] Update release docker image in release process 
(#745)
af2fdb6a147c is described below

commit af2fdb6a147c2afa53f921c7a158a282cd8e1fea
Author: Zike Yang 
AuthorDate: Tue Feb 20 21:25:14 2024 +0800

[fix] Update release docker image in release process (#745)

PIP: https://github.com/apache/pulsar/pull/21872

The current release docker images process doesn't work for Pulsar above 
versions 3.0.0. Pulsar has added arm and amd arch supports for the docker 
image. If we use the original command to push the image, it will push only one 
arch.

We recommend use tools like regctl to push images.

For the latest tag of the pulsar image, this PR proposes to use the last 
feature release version or the patch release of the last feature release as the 
`latest` tag.

-

Co-authored-by: Penghui Li 
---
 contribute/release-process.md | 18 +-
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/contribute/release-process.md b/contribute/release-process.md
index 12a25fb74700..1583bf56d270 100644
--- a/contribute/release-process.md
+++ b/contribute/release-process.md
@@ -319,20 +319,20 @@ Promote the Maven staging repository for release. Login 
to `https://repository.a
 
 ### Release Docker images
 
-Copy the approved candidate docker images from your personal account to 
apachepulsar org.
+Please ensure that the regctl tools have been properly installed. They can be 
obtained from the following link: 
https://github.com/regclient/regclient/blob/main/docs/install.md
+
+Copy the approved candidate Docker images from your personal account to the 
apachepulsar organization:
 
 ```bash
-PULSAR_VERSION=2.x.x
+PULSAR_VERSION=3.x.x
 OTHER_DOCKER_USER=otheruser
-for image in pulsar pulsar-all pulsar-grafana pulsar-standalone; do
-docker pull "${OTHER_DOCKER_USER}/$image:${PULSAR_VERSION}" && {
-  docker tag "${OTHER_DOCKER_USER}/$image:${PULSAR_VERSION}" 
"apachepulsar/$image:${PULSAR_VERSION}"
-  echo "Pushing apachepulsar/$image:${PULSAR_VERSION}"
-  docker push "apachepulsar/$image:${PULSAR_VERSION}"
-}
-done
+CANDIDATE_TAG=3.x.x-80fb390
+regctl image copy ${OTHER_DOCKER_USER}/pulsar:${CANDIDATE_TAG} 
apachepulsar/pulsar:${PULSAR_VERSION}
+regctl image copy ${OTHER_DOCKER_USER}/pulsar-all:${CANDIDATE_TAG} 
apachepulsar/pulsar-all:${PULSAR_VERSION}
 ```
 
+If this release is a feature release or a patch release of the last feature 
release, you should also push these images to the `latest` tag.
+
 If you don't have the permission, you can ask someone with access to 
apachepulsar org to do that.
 
 ### Update project version



Re: [PR] [fix][broker] Set ServiceUnitStateChannel topic compaction threshold explicitly, improve getOwnerAsync, and fix other bugs [pulsar]

2024-02-20 Thread via GitHub


Technoboy- commented on code in PR #22064:
URL: https://github.com/apache/pulsar/pull/22064#discussion_r1495725504


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java:
##
@@ -150,9 +156,34 @@ public synchronized void init() throws IOException {
 start();
 }
 
-private synchronized void validateTableViewStart() {
-if (tableView == null) {
-throw new IllegalStateException("table view has not been started");
+private void validateProducer() {
+if (producer == null || !producer.isConnected()) {
+try {
+if (producer != null) {
+producer.close();
+}
+producer = null;
+startProducer();
+log.info("Restarted producer on {}", topic);
+} catch (Exception e) {
+log.error("Failed to restart producer on {}", topic, e);
+throw new RuntimeException(e);
+}
+}
+}
+
+private void validateTableView() {
+if (tableView == null || System.currentTimeMillis() - 
tableViewLastUpdateTimestamp
+> ((long) 
conf.getLoadBalancerReportUpdateMaxIntervalMinutes()) * 60 * 1000 * 2) {

Review Comment:
   we can define a static field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Set ServiceUnitStateChannel topic compaction threshold explicitly, improve getOwnerAsync, and fix other bugs [pulsar]

2024-02-20 Thread via GitHub


Technoboy- commented on code in PR #22064:
URL: https://github.com/apache/pulsar/pull/22064#discussion_r1495704203


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##
@@ -575,7 +601,7 @@ private CompletableFuture> 
dedupeLookupRequest(
 if (ex != null) {
 assignCounter.incrementFailure();
 }
-lookupRequests.remove(key, newFutureCreated.getValue());
+lookupRequests.remove(key);

Review Comment:
   why modify here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Support ack timeout [pulsar-client-go]

2024-02-20 Thread via GitHub


imqishi commented on issue #217:
URL: 
https://github.com/apache/pulsar-client-go/issues/217#issuecomment-1954033612

   > The ack timeout referred here is not the timeout on the ack operation, 
rather the timeout on the application processing a message.
   > 
   > Regardless, it was a mistake to add in Java API, and that’s why we didn’t 
add it in the new Go API.
   > 
   > Negative acks are better because they convey the application intention on 
the outcome of processing a message.
   > 
   > There’s no context or timeout on the ack/nack operations because these 
happen in background.
   > 
   > If these fail, the message will be resent and the application will have 
the chance to ack/nack again.
   
   hi, I want to know if there's no ack-timeout and consume exit without nack, 
when will the message be resent? If there's situation a long process message 
cause multiple retry?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-site) branch add-push-image created (now 8fbe1bce6d07)

2024-02-20 Thread zike
This is an automated email from the ASF dual-hosted git repository.

zike pushed a change to branch add-push-image
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


  at 8fbe1bce6d07 Remove statement for the `lts` tag which has been 
proposed thorugh the PIP

This branch includes the following new commits:

 new 8fbe1bce6d07 Remove statement for the `lts` tag which has been 
proposed thorugh the PIP

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




(pulsar-site) 01/01: Remove statement for the `lts` tag which has been proposed thorugh the PIP

2024-02-20 Thread zike
This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch add-push-image
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git

commit 8fbe1bce6d078e5d09441d635e99741fc5ec8d65
Author: Zike Yang 
AuthorDate: Tue Feb 20 18:41:44 2024 +0800

Remove statement for the `lts` tag which has been proposed thorugh the PIP
---
 contribute/release-process.md | 2 --
 1 file changed, 2 deletions(-)

diff --git a/contribute/release-process.md b/contribute/release-process.md
index d1b45ac86ae3..cc8ad9fb8b21 100644
--- a/contribute/release-process.md
+++ b/contribute/release-process.md
@@ -334,8 +334,6 @@ regctl image copy 
${OTHER_DOCKER_USER}/pulsar-all:${CANDIDATE_TAG} apachepulsar/
 
 If this release is a feature release or a patch release of the last feature 
release, you should also push these images to the `latest` tag.
 
-If this release is a LTS release or a patch release of the last LTS release, 
you should also push these images to the `lts` tag.
-
 If you don't have the permission, you can ask someone with access to 
apachepulsar org to do that.
 
 ### Update project version



[PR] [WIP] Support partitioned topic reader [pulsar-client-go]

2024-02-20 Thread via GitHub


RobertIndie opened a new pull request, #1178:
URL: https://github.com/apache/pulsar-client-go/pull/1178

   
   
   Master Issue: #1177
   
   ### Motivation
   
   Currently, there is an issue with the reader implementation. If the reader 
is creating, it won't get the topic metadata from the topic. The reader can 
only read messages from a single topic. If the topic is a partitioned topic, 
the reader won't know that and will try to create a non-partition topic with 
the same name. And it will lead to this issue: 
https://github.com/apache/pulsar/issues/22032
   
   ### Modifications
   
   - Support partitioned topic reader
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API: (yes / no)
 - The schema: (yes / no / don't know)
 - The default values of configurations: (yes / no)
 - The wire protocol: (yes / no)
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / GoDocs / 
not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a followup 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][sec] Validate the user input avoiding unexpected truncation of user-controlled arithmetic data [pulsar]

2024-02-20 Thread via GitHub


liangyepianzhou closed pull request #22073: [fix][sec] Validate the user input 
avoiding unexpected truncation of user-controlled arithmetic data
URL: https://github.com/apache/pulsar/pull/22073


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][sec] Missing JWT signature check [pulsar]

2024-02-20 Thread via GitHub


liangyepianzhou closed pull request #22075: [fix][sec] Missing JWT signature 
check
URL: https://github.com/apache/pulsar/pull/22075


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][sec] implicit narrowing conversion in compound assignment [pulsar]

2024-02-20 Thread via GitHub


liangyepianzhou closed pull request #22074: [fix][sec] implicit narrowing 
conversion in compound assignment
URL: https://github.com/apache/pulsar/pull/22074


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][sec] Arbitrary file access during archive extraction ("Zip Slip") [pulsar]

2024-02-20 Thread via GitHub


liangyepianzhou closed pull request #22077: [fix][sec] Arbitrary file access 
during archive extraction ("Zip Slip")
URL: https://github.com/apache/pulsar/pull/22077


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][sec] Information exposure through a stack trace [pulsar]

2024-02-20 Thread via GitHub


liangyepianzhou closed pull request #22080: [fix][sec] Information exposure 
through a stack trace
URL: https://github.com/apache/pulsar/pull/22080


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][sec] Information exposure through a stack trace [pulsar]

2024-02-20 Thread via GitHub


liangyepianzhou opened a new pull request, #22080:
URL: https://github.com/apache/pulsar/pull/22080

   Fixes https://github.com/apache/pulsar/security/code-scanning/33
   ### Motivation
   Software developers often add stack traces to error messages, as a debugging 
aid. Whenever that error message occurs for an end user, the developer can use 
the stack trace to help identify how to fix the problem. In particular, stack 
traces can tell the developer more about the sequence of events that led to a 
failure, as opposed to merely the final state of the software when the error 
occurred.
   
   Unfortunately, the same information can be useful to an attacker. The 
sequence of class names in a stack trace can reveal the structure of the 
application as well as any internal components it relies on. Furthermore, the 
error message at the top of a stack trace can include information such as 
server-side file names and SQL code that the application relies on, allowing an 
attacker to fine-tune a subsequent injection attack.
   
   ### Modifications
   
   Send the user a more generic error message that reveals less information. 
Either suppress the stack trace entirely, or log it only on the server.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][sec] Regular expression injection [pulsar]

2024-02-20 Thread via GitHub


lhotari closed pull request #22079: [fix][sec] Regular expression injection
URL: https://github.com/apache/pulsar/pull/22079


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][sec] Regular expression injection [pulsar]

2024-02-20 Thread via GitHub


lhotari commented on PR #22079:
URL: https://github.com/apache/pulsar/pull/22079#issuecomment-1953837658

   this duplicates #22057. Please check the discussion on that PR. I'll close 
this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][sec] Regular expression injection [pulsar]

2024-02-20 Thread via GitHub


liangyepianzhou opened a new pull request, #22079:
URL: https://github.com/apache/pulsar/pull/22079

   Fixes https://github.com/apache/pulsar/security/code-scanning/26,  
https://github.com/apache/pulsar/security/code-scanning/25, 
https://github.com/apache/pulsar/security/code-scanning/24, 
https://github.com/apache/pulsar/security/code-scanning/23
   
   ### Motivation
   Constructing a regular expression with unsanitized user input is dangerous 
as a malicious user may be able to modify the meaning of the expression. In 
particular, such a user may be able to provide a regular expression fragment 
that takes exponential time in the worst case, and use that to perform a Denial 
of Service attack.
   
   ### Modifications
   
   use a sanitization function such as `Pattern.quote` to escape 
meta-characters that have special meaning.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] Multi-topics reader support [pulsar-client-go]

2024-02-20 Thread via GitHub


RobertIndie opened a new issue, #1177:
URL: https://github.com/apache/pulsar-client-go/issues/1177

   **Is your feature request related to a problem? Please describe.**
   Currently, there is an issue with the reader implementation. If the reader 
is creating, it won't get the topic metadata from the topic. The reader can 
only read messages from a single topic. If the topic is a partitioned topic, 
the reader won't know that and will try to create a non-partition topic with 
the same name. And it will lead to this issue: 
https://github.com/apache/pulsar/issues/22032
   
   **Describe the solution you'd like**
   - Support multi-topics reader
   
   **Describe alternatives you've considered**
   No
   
   **Additional context**
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated: [fix][broker] Fix hash collision when using a consumer name that ends with a number (#22053)

2024-02-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 4ffdc2d60c1 [fix][broker] Fix hash collision when using a consumer 
name that ends with a number (#22053)
4ffdc2d60c1 is described below

commit 4ffdc2d60c1c72d38b4f286ed54e68e580a90e0c
Author: Lari Hotari 
AuthorDate: Thu Feb 15 11:07:10 2024 +0200

[fix][broker] Fix hash collision when using a consumer name that ends with 
a number (#22053)
---
 ...ConsistentHashingStickyKeyConsumerSelector.java | 14 ++--
 ...istentHashingStickyKeyConsumerSelectorTest.java | 74 +-
 ...ntStickyKeyDispatcherMultipleConsumersTest.java |  4 +-
 3 files changed, 70 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index ea491bd40d3..b2b2b512c8c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -39,7 +39,8 @@ import org.apache.pulsar.common.util.Murmur3_32Hash;
  * number of keys assigned to each consumer.
  */
 public class ConsistentHashingStickyKeyConsumerSelector implements 
StickyKeyConsumerSelector {
-
+// use NUL character as field separator for hash key calculation
+private static final String KEY_SEPARATOR = "\0";
 private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
 // Consistent-Hash ring
@@ -59,8 +60,7 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
 // Insert multiple points on the hash ring for every consumer
 // The points are deterministically added based on the hash of the 
consumer name
 for (int i = 0; i < numberOfPoints; i++) {
-String key = consumer.consumerName() + i;
-int hash = 
Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+int hash = calculateHashForConsumerAndIndex(consumer, i);
 hashRing.compute(hash, (k, v) -> {
 if (v == null) {
 return Lists.newArrayList(consumer);
@@ -79,14 +79,18 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
 }
 }
 
+private static int calculateHashForConsumerAndIndex(Consumer consumer, int 
index) {
+String key = consumer.consumerName() + KEY_SEPARATOR + index;
+return Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+}
+
 @Override
 public void removeConsumer(Consumer consumer) {
 rwLock.writeLock().lock();
 try {
 // Remove all the points that were added for this consumer
 for (int i = 0; i < numberOfPoints; i++) {
-String key = consumer.consumerName() + i;
-int hash = 
Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+int hash = calculateHashForConsumerAndIndex(consumer, i);
 hashRing.compute(hash, (k, v) -> {
 if (v == null) {
 return null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index dbca31416bb..48311c57338 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -21,18 +21,18 @@ package org.apache.pulsar.broker.service;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-
-import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
-import org.apache.pulsar.client.api.Range;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.client.api.Range;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class ConsistentHashingStickyKeyConsumerSelectorTest {
@@ -154,17 +154,17 @@ public class 
ConsistentHashingStickyKeyConsumerSelectorTest {
 }
 Map> expectedResult = new 

Re: [PR] [improve][pip] PIP-336: WASM Function API [pulsar]

2024-02-20 Thread via GitHub


asafm commented on code in PR #21992:
URL: https://github.com/apache/pulsar/pull/21992#discussion_r1495401560


##
pip/pip-331.md:
##
@@ -0,0 +1,129 @@
+# PIP-331: WASM Function API
+
+# Background knowledge
+
+WASM(WebAssembly) bytecode is designed to be encoded in a size- and 
load-time-efficient binary format. WASM aims to leverage the common hardware 
features available on various platforms to execute in browsers at machine code 
speed.
+
+WASI(WebAssembly System Interface) provide a portable interface for 
applications that run within a constrained sandbox environment, which allows 
WASM to run in non browser environments such as Linux. It's portable and secure.
+
+# Motivation
+
+The server and client sides of the Pulsar function use protobuf for 
decoupling. In principle, the language supported by protobuf can be supported 
by the pulsar function, now Pulsar provided the java, python and golang 
function client, but there are still many languages that are not supported.
+
+Before all language adaptations are completed (and it's almost entirely 
certain to be impossible), users cannot write pulsar function in their familiar 
languages.
+
+# Goals
+
+## In Scope
+
+Other languages, as long as their code can be compiled into WASM bytecode 
(such as Rust/golang/C++), users can use these languages to write pulsar 
function.
+
+## Out of Scope
+
+All existing abilities of the Java pulsar function client are not 
reimplemented, the WASM Pulsar functions is under the Java Pulsar functions.
+
+Due to the strict requirements of WASM on parameter types and for simplicity 
reasons, types other than `java.lang.Long` are not used as parameters or return 
value.
+
+# High Level Design
+
+```mermaid 
+flowchart LR;
+
+subgraph develop
+direction TB
+SourceCode ==> |"CompileToWASM"| WasmFile ==> |"RenameFile"| 
MoveToTheResourceDirectory ==> UnitTest
+end
+
+subgraph runtime
+direction TB
+PulsarFunctionJava ==> |"LoadFromResource"| TheWasmFile ==> |"Invoke"| 
TheSourceCode
+end
+
+develop --> runtime
+```
+
+# Detailed Design
+
+## Design & Implementation Details
+
+1. add `WasmLoader` to load WASM file and provide the WASM function to java, 
also provide the java function to WASM if we need.
+
+2. add `AbstractWasmFunction` and `AbstractWasmWindowFunction` as the core 
interface of the WASM function api.
+
+```java
+public abstract class AbstractWasmFunction extends WasmLoader implements 
Function {

Review Comment:
   Well Go have no inheritence - and I'm pretty sure they have a similar need.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][pip] PIP-336: WASM Function API [pulsar]

2024-02-20 Thread via GitHub


asafm commented on code in PR #21992:
URL: https://github.com/apache/pulsar/pull/21992#discussion_r1495399544


##
pip/pip-331.md:
##
@@ -0,0 +1,129 @@
+# PIP-331: WASM Function API
+
+# Background knowledge
+
+WASM(WebAssembly) bytecode is designed to be encoded in a size- and 
load-time-efficient binary format. WASM aims to leverage the common hardware 
features available on various platforms to execute in browsers at machine code 
speed.
+
+WASI(WebAssembly System Interface) provide a portable interface for 
applications that run within a constrained sandbox environment, which allows 
WASM to run in non browser environments such as Linux. It's portable and secure.
+
+# Motivation
+
+The server and client sides of the Pulsar function use protobuf for 
decoupling. In principle, the language supported by protobuf can be supported 
by the pulsar function, now Pulsar provided the java, python and golang 
function client, but there are still many languages that are not supported.
+
+Before all language adaptations are completed (and it's almost entirely 
certain to be impossible), users cannot write pulsar function in their familiar 
languages.
+
+# Goals
+
+## In Scope
+
+Other languages, as long as their code can be compiled into WASM bytecode 
(such as Rust/golang/C++), users can use these languages to write pulsar 
function.
+
+## Out of Scope
+
+All existing abilities of the Java pulsar function client are not 
reimplemented, the WASM Pulsar functions is under the Java Pulsar functions.
+
+Due to the strict requirements of WASM on parameter types and for simplicity 
reasons, types other than `java.lang.Long` are not used as parameters or return 
value.
+
+# High Level Design
+
+```mermaid 
+flowchart LR;
+
+subgraph develop
+direction TB
+SourceCode ==> |"CompileToWASM"| WasmFile ==> |"RenameFile"| 
MoveToTheResourceDirectory ==> UnitTest
+end
+
+subgraph runtime
+direction TB
+PulsarFunctionJava ==> |"LoadFromResource"| TheWasmFile ==> |"Invoke"| 
TheSourceCode
+end
+
+develop --> runtime
+```
+
+# Detailed Design
+
+## Design & Implementation Details
+
+1. add `WasmLoader` to load WASM file and provide the WASM function to java, 
also provide the java function to WASM if we need.
+
+2. add `AbstractWasmFunction` and `AbstractWasmWindowFunction` as the core 
interface of the WASM function api.
+
+```java
+public abstract class AbstractWasmFunction extends WasmLoader implements 
Function {
+
+private static final String PROCESS_METHOD_NAME = "process";
+
+protected static final String INITIALIZE_METHOD_NAME = "initialize";
+
+protected static final String CLOSE_METHOD_NAME = "close";
+
+protected static final Map> ARGUMENTS = new 
ConcurrentHashMap<>();
+
+@Override
+public T process(X input, Context context) {
+return super.getWasmExtern(PROCESS_METHOD_NAME)
+.map(process -> {
+Long argumentId = callWASI(input, context, process);
+return doProcess(input, context, argumentId);
+})
+.orElseThrow(() -> new PulsarWasmException(
+PROCESS_METHOD_NAME + " function not found in " + 
super.getWasmName()));
+}
+
+private Long callWASI(X input,
+  Context context,
+  Extern process) {
+// call WASI function
+final Long argumentId = getArgumentId(input, context);
+ARGUMENTS.put(argumentId, new Argument<>(input, context));

Review Comment:
   Serialization sounds slow, as it should be done per message, no?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated: [fix] [broker] Fix can not subscribe partitioned topic with a suffix-matched regexp (#22025)

2024-02-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 569386640ab [fix] [broker] Fix can not subscribe partitioned topic 
with a suffix-matched regexp (#22025)
569386640ab is described below

commit 569386640ab6205781e8afa89a57fb539292fcec
Author: fengyubiao 
AuthorDate: Mon Feb 19 16:43:39 2024 +0800

[fix] [broker] Fix can not subscribe partitioned topic with a 
suffix-matched regexp (#22025)
---
 .../pulsar/broker/resources/TopicResources.java|  3 ++
 .../pulsar/broker/namespace/NamespaceService.java  |  3 ++
 .../broker/service/PulsarCommandSenderImpl.java|  6 
 .../pulsar/broker/service/TopicListService.java| 22 --
 .../client/impl/PatternTopicsConsumerImplTest.java | 34 ++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  8 ++---
 .../client/impl/MultiTopicsConsumerImpl.java   | 10 +--
 .../impl/PatternMultiTopicsConsumerImpl.java   | 24 +--
 .../pulsar/client/impl/TopicListWatcher.java   |  4 ++-
 .../impl/conf/ConsumerConfigurationData.java   |  2 +-
 .../apache/pulsar/common/protocol/Commands.java|  7 +
 11 files changed, 104 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
index 840ced0a1c1..0963f25c3d3 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java
@@ -50,6 +50,9 @@ public class TopicResources {
 store.registerListener(this::handleNotification);
 }
 
+/***
+ * List persistent topics names under a namespace, the topic name contains 
the partition suffix.
+ */
 public CompletableFuture> 
listPersistentTopicsAsync(NamespaceName ns) {
 String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index d8c3fd169a2..b55eda150af 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1432,6 +1432,9 @@ public class NamespaceService implements AutoCloseable {
 });
 }
 
+/***
+ * List persistent topics names under a namespace, the topic name contains 
the partition suffix.
+ */
 public CompletableFuture> 
getListOfPersistentTopics(NamespaceName namespaceName) {
 return 
pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(namespaceName);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index dd74fc4e71e..105650caaaf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -356,12 +356,18 @@ public class PulsarCommandSenderImpl implements 
PulsarCommandSender {
 writeAndFlush(outBuf);
 }
 
+/***
+ * @param topics topic names which are matching, the topic name contains 
the partition suffix.
+ */
 @Override
 public void sendWatchTopicListSuccess(long requestId, long watcherId, 
String topicsHash, List topics) {
 BaseCommand command = Commands.newWatchTopicListSuccess(requestId, 
watcherId, topicsHash, topics);
 interceptAndWriteCommand(command);
 }
 
+/***
+ * {@inheritDoc}
+ */
 @Override
 public void sendWatchTopicListUpdate(long watcherId,
  List newTopics, List 
deletedTopics, String topicsHash) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
index 7aa50057d73..aea5b9fc65b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.broker.resources.TopicResources;
 import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.topics.TopicList;
 import 

Re: [PR] [improve][pip] PIP-336: WASM Function API [pulsar]

2024-02-20 Thread via GitHub


asafm commented on code in PR #21992:
URL: https://github.com/apache/pulsar/pull/21992#discussion_r1495396614


##
pip/pip-331.md:
##
@@ -0,0 +1,129 @@
+# PIP-331: WASM Function API
+
+# Background knowledge
+
+WASM(WebAssembly) bytecode is designed to be encoded in a size- and 
load-time-efficient binary format. WASM aims to leverage the common hardware 
features available on various platforms to execute in browsers at machine code 
speed.
+
+WASI(WebAssembly System Interface) provide a portable interface for 
applications that run within a constrained sandbox environment, which allows 
WASM to run in non browser environments such as Linux. It's portable and secure.
+
+# Motivation
+
+The server and client sides of the Pulsar function use protobuf for 
decoupling. In principle, the language supported by protobuf can be supported 
by the pulsar function, now Pulsar provided the java, python and golang 
function client, but there are still many languages that are not supported.
+
+Before all language adaptations are completed (and it's almost entirely 
certain to be impossible), users cannot write pulsar function in their familiar 
languages.
+
+# Goals
+
+## In Scope
+
+Other languages, as long as their code can be compiled into WASM bytecode 
(such as Rust/golang/C++), users can use these languages to write pulsar 
function.
+
+## Out of Scope
+
+All existing abilities of the Java pulsar function client are not 
reimplemented, the WASM Pulsar functions is under the Java Pulsar functions.
+
+Due to the strict requirements of WASM on parameter types and for simplicity 
reasons, types other than `java.lang.Long` are not used as parameters or return 
value.
+
+# High Level Design
+
+```mermaid 
+flowchart LR;
+
+subgraph develop
+direction TB
+SourceCode ==> |"CompileToWASM"| WasmFile ==> |"RenameFile"| 
MoveToTheResourceDirectory ==> UnitTest
+end
+
+subgraph runtime
+direction TB
+PulsarFunctionJava ==> |"LoadFromResource"| TheWasmFile ==> |"Invoke"| 
TheSourceCode
+end
+
+develop --> runtime
+```
+
+# Detailed Design
+
+## Design & Implementation Details
+
+1. add `WasmLoader` to load WASM file and provide the WASM function to java, 
also provide the java function to WASM if we need.

Review Comment:
   Then you need to mention that and expand on it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated: [improve][broker] Do not retain the data in the system topic (#22022)

2024-02-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new c4482dd0a18 [improve][broker] Do not retain the data in the system 
topic (#22022)
c4482dd0a18 is described below

commit c4482dd0a18ab8333fc0b8228b378f6ef0f264bc
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Tue Feb 6 15:28:43 2024 +0800

[improve][broker] Do not retain the data in the system topic (#22022)

### Motivation

For some use case, the users need to store all the messages even though 
these message are acked by all subscription.
So they set the retention policy of the namespace to infinite retention 
(setting both time and size limits to `-1`).  But the data in the system topic 
does not need for infinite retention.

### Modifications

For system topics, do not retain messages that have already been 
acknowledged.
---
 .../pulsar/broker/service/BrokerService.java   | 15 +--
 .../pulsar/compaction/CompactionRetentionTest.java | 48 ++
 2 files changed, 59 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 13bce3f67df..0a9d100bf7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1771,10 +1771,17 @@ public class BrokerService implements Closeable {
 }
 
 if (retentionPolicies == null) {
-retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
-() -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-serviceConfig.getDefaultRetentionSizeInMB())
-);
+if (SystemTopicNames.isSystemTopic(topicName)) {
+if (log.isDebugEnabled()) {
+log.debug("{} Disable data retention policy for system 
topic.", topicName);
+}
+retentionPolicies = new RetentionPolicies(0, 0);
+} else {
+retentionPolicies = policies.map(p -> 
p.retention_policies).orElseGet(
+() -> new 
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+
serviceConfig.getDefaultRetentionSizeInMB())
+);
+}
 }
 
 ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index 055c595fbfe..98bf2b819c2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -45,9 +46,13 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -212,6 +217,49 @@ public class CompactionRetentionTest extends 
MockedPulsarServiceBaseTest {
 );
 }
 
+@Test
+public void testRetentionPolicesForSystemTopic() throws Exception {
+String namespace = "my-tenant/my-ns";
+String topicPrefix = "persistent://" + namespace + "/";
+admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, 
-1));
+// Check event topics and transaction internal topics.
+for (String eventTopic : SystemTopicNames.EVENTS_TOPIC_NAMES) {
+checkSystemTopicRetentionPolicy(topicPrefix + eventTopic);
+}
+

Re: [PR] [improve][pip] PIP-336: WASM Function API [pulsar]

2024-02-20 Thread via GitHub


asafm commented on code in PR #21992:
URL: https://github.com/apache/pulsar/pull/21992#discussion_r1495396150


##
pip/pip-331.md:
##
@@ -0,0 +1,129 @@
+# PIP-331: WASM Function API
+
+# Background knowledge
+
+WASM(WebAssembly) bytecode is designed to be encoded in a size- and 
load-time-efficient binary format. WASM aims to leverage the common hardware 
features available on various platforms to execute in browsers at machine code 
speed.
+
+WASI(WebAssembly System Interface) provide a portable interface for 
applications that run within a constrained sandbox environment, which allows 
WASM to run in non browser environments such as Linux. It's portable and secure.
+
+# Motivation
+
+The server and client sides of the Pulsar function use protobuf for 
decoupling. In principle, the language supported by protobuf can be supported 
by the pulsar function, now Pulsar provided the java, python and golang 
function client, but there are still many languages that are not supported.
+
+Before all language adaptations are completed (and it's almost entirely 
certain to be impossible), users cannot write pulsar function in their familiar 
languages.
+
+# Goals
+
+## In Scope
+
+Other languages, as long as their code can be compiled into WASM bytecode 
(such as Rust/golang/C++), users can use these languages to write pulsar 
function.
+
+## Out of Scope
+
+All existing abilities of the Java pulsar function client are not 
reimplemented, the WASM Pulsar functions is under the Java Pulsar functions.
+
+Due to the strict requirements of WASM on parameter types and for simplicity 
reasons, types other than `java.lang.Long` are not used as parameters or return 
value.

Review Comment:
   I think there is a problem here. I can't learn about this feature you 
mentioned from the documentation of wasmtime-java since it's non-existent, and 
the scarce documentation in this PIP doesn't help. How can this be maintained 
by other people going forward?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated: [fix] [broker] Subscription stuck due to called Admin API analyzeSubscriptionBacklog (#22019)

2024-02-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 9e4ebdbacd1 [fix] [broker] Subscription stuck due to called Admin API 
analyzeSubscriptionBacklog (#22019)
9e4ebdbacd1 is described below

commit 9e4ebdbacd17216f65006e8bacdb0265101cb3b5
Author: fengyubiao 
AuthorDate: Mon Feb 19 00:04:10 2024 +0800

[fix] [broker] Subscription stuck due to called Admin API 
analyzeSubscriptionBacklog (#22019)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 29 +++--
 .../service/persistent/PersistentSubscription.java | 30 +++---
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 29 +
 .../admin/AnalyzeBacklogSubscriptionTest.java  | 18 ++---
 .../common/util/collections/BitSetRecyclable.java  |  8 ++
 5 files changed, 99 insertions(+), 15 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 7e930e711ec..da013c07313 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -196,11 +196,11 @@ public class ManagedCursorImpl implements ManagedCursor {
 position.ackSet = null;
 return position;
 };
-private final RangeSetWrapper individualDeletedMessages;
+protected final RangeSetWrapper individualDeletedMessages;
 
 // Maintain the deletion status for batch messages
 // (ledgerId, entryId) -> deletion indexes
-private final ConcurrentSkipListMap 
batchDeletedIndexes;
+protected final ConcurrentSkipListMap 
batchDeletedIndexes;
 private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
 private RateLimiter markDeleteLimiter;
@@ -3617,4 +3617,29 @@ public class ManagedCursorImpl implements ManagedCursor {
 public ManagedLedgerConfig getConfig() {
 return config;
 }
+
+/***
+ * Create a non-durable cursor and copy the ack stats.
+ */
+public ManagedCursor duplicateNonDurableCursor(String 
nonDurableCursorName) throws ManagedLedgerException {
+NonDurableCursorImpl newNonDurableCursor =
+(NonDurableCursorImpl) 
ledger.newNonDurableCursor(getMarkDeletedPosition(), nonDurableCursorName);
+if (individualDeletedMessages != null) {
+this.individualDeletedMessages.forEach(range -> {
+newNonDurableCursor.individualDeletedMessages.addOpenClosed(
+range.lowerEndpoint().getLedgerId(),
+range.lowerEndpoint().getEntryId(),
+range.upperEndpoint().getLedgerId(),
+range.upperEndpoint().getEntryId());
+return true;
+});
+}
+if (batchDeletedIndexes != null) {
+for (Map.Entry entry : 
this.batchDeletedIndexes.entrySet()) {
+BitSetRecyclable copiedBitSet = 
BitSetRecyclable.valueOf(entry.getValue());
+newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), 
copiedBitSet);
+}
+}
+return newNonDurableCursor;
+}
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index dc79146110f..e5d90bf0ef4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -514,9 +515,15 @@ public class PersistentSubscription extends 
AbstractSubscription implements Subs
 return "Null";
 }
 
-@Override
 public CompletableFuture 
analyzeBacklog(Optional position) {
-
+final ManagedLedger managedLedger = topic.getManagedLedger();
+final String newNonDurableCursorName = "analyze-backlog-" + 
UUID.randomUUID();
+ManagedCursor newNonDurableCursor;
+try {
+newNonDurableCursor = ((ManagedCursorImpl) 
cursor).duplicateNonDurableCursor(newNonDurableCursorName);
+} catch (ManagedLedgerException e) {
+return CompletableFuture.failedFuture(e);
+}
 long start = System.currentTimeMillis();
 if 

(pulsar) branch branch-3.2 updated: [improve] [broker] Do not print an Error log when responding to `HTTP-404` when calling `Admin API` and the topic does not exist. (#21995)

2024-02-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 0fbb7fd1b2a [improve] [broker] Do not print an Error log when 
responding to `HTTP-404` when calling `Admin API` and the topic does not exist. 
(#21995)
0fbb7fd1b2a is described below

commit 0fbb7fd1b2ab15d00ca248c80a90edce4365cb8e
Author: fengyubiao 
AuthorDate: Sun Feb 18 15:46:52 2024 +0800

[improve] [broker] Do not print an Error log when responding to `HTTP-404` 
when calling `Admin API` and the topic does not exist. (#21995)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  4 +
 .../broker/admin/impl/PersistentTopicsBase.java| 88 +++---
 .../broker/admin/impl/SchemasResourceBase.java |  2 +-
 .../broker/admin/v2/NonPersistentTopics.java   |  6 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 36 -
 .../pulsar/broker/admin/v3/Transactions.java   | 12 +--
 6 files changed, 75 insertions(+), 73 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1526ae18a90..2ceec189975 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -834,6 +834,10 @@ public abstract class AdminResource extends 
PulsarWebResource {
 == Status.NOT_FOUND.getStatusCode();
 }
 
+protected static boolean isNot307And404Exception(Throwable ex) {
+return !isRedirectException(ex) && !isNotFoundException(ex);
+}
+
 protected static String getTopicNotFoundErrorMessage(String topic) {
 return String.format("Topic %s not found", topic);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 379d6675b57..0cdb140c7c3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -874,7 +874,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we 
need to log it.
-   if (!isRedirectException(ex)) {
+   if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned 
metadata while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -884,7 +884,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
-   if (!isRedirectException(ex)) {
+   if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate the global namespace 
ownership while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -1052,7 +1052,7 @@ public class PersistentTopicsBase extends AdminResource {
 }))
 .exceptionally(ex -> {
 // If the exception is not redirect exception we need to 
log it.
-if (!isRedirectException(ex)) {
+if (!isNot307And404Exception(ex)) {
 log.error("[{}] Failed to unload topic {}, {}", 
clientAppId(), topicName, ex);
 }
 resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1074,7 +1074,7 @@ public class PersistentTopicsBase extends AdminResource {
 }))
 .exceptionally(ex -> {
 // If the exception is not redirect exception we need to 
log it.
-if (!isRedirectException(ex)) {
+if (!isNot307And404Exception(ex)) {
 log.error("[{}] Failed to unload tc {},{}", 
clientAppId(),
 topicName.getPartitionIndex(), ex);
 }
@@ -1176,7 +1176,7 @@ public class PersistentTopicsBase extends AdminResource {
 }
 }).exceptionally(ex -> {
 // If the exception is not redirect exception we 
need to log it.
-if (!isRedirectException(ex)) {
+if (!isNot307And404Exception(ex)) {
 log.error("[{}] Failed to get partitioned 
topic 

(pulsar) branch branch-3.2 updated: [fix] [broker] add timeout for health check read. (#21990)

2024-02-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 1be9692a6f4 [fix] [broker] add timeout for health check read. (#21990)
1be9692a6f4 is described below

commit 1be9692a6f46a23bf9bdecdefe7f7a25d47a16c6
Author: thetumbled <52550727+thetumb...@users.noreply.github.com>
AuthorDate: Tue Jan 30 23:40:09 2024 +0800

[fix] [broker] add timeout for health check read. (#21990)
---
 .../pulsar/broker/admin/impl/BrokersBase.java  | 13 -
 .../broker/admin/AdminApiHealthCheckTest.java  | 63 ++
 2 files changed, 75 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index f056b18f3f1..61b354610ac 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.ApiResponses;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -34,6 +35,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -80,6 +82,12 @@ public class BrokersBase extends AdminResource {
 // log a full thread dump when a deadlock is detected in healthcheck once 
every 10 minutes
 // to prevent excessive logging
 private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 
60L;
+// there is a timeout of 60 seconds default in the client(readTimeoutMs), 
so we need to set the timeout
+// a bit shorter than 60 seconds to avoid the client timeout exception 
thrown before the server timeout exception.
+// or we can't propagate the server timeout exception to the client.
+private static final Duration HEALTH_CHECK_READ_TIMEOUT = 
Duration.ofSeconds(58);
+private static final TimeoutException HEALTH_CHECK_TIMEOUT_EXCEPTION =
+FutureUtil.createTimeoutException("Timeout", BrokersBase.class, 
"healthCheckRecursiveReadNext(...)");
 private volatile long threadDumpLoggedTimestamp;
 
 @GET
@@ -434,7 +442,10 @@ public class BrokersBase extends AdminResource {
 });
 throw 
FutureUtil.wrapToCompletionException(createException);
 }).thenCompose(reader -> 
producer.sendAsync(messageStr)
-.thenCompose(__ -> 
healthCheckRecursiveReadNext(reader, messageStr))
+.thenCompose(__ -> 
FutureUtil.addTimeoutHandling(
+
healthCheckRecursiveReadNext(reader, messageStr),
+HEALTH_CHECK_READ_TIMEOUT, 
pulsar().getBrokerService().executor(),
+() -> 
HEALTH_CHECK_TIMEOUT_EXCEPTION))
 .whenComplete((__, ex) -> {
 closeAndReCheck(producer, reader, 
topicOptional.get(), subscriptionName)
 .whenComplete((unused, 
innerEx) -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
index a780f889de8..357422b11f6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadMXBean;
+import java.lang.reflect.Field;
 import java.time.Duration;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -31,13 +32,21 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import 

(pulsar) branch branch-3.2 updated: [fix][client] Fix ConsumerBuilderImpl#subscribe silent stuck when using pulsar-client:3.0.x with jackson-annotations prior to 2.12.0 (#21985)

2024-02-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 07f2586a23f [fix][client] Fix ConsumerBuilderImpl#subscribe silent 
stuck when using pulsar-client:3.0.x with jackson-annotations prior to 2.12.0 
(#21985)
07f2586a23f is described below

commit 07f2586a23ffba765128d2dee874faf7a6dc8713
Author: 萧易客 
AuthorDate: Tue Jan 30 19:20:29 2024 +0800

[fix][client] Fix ConsumerBuilderImpl#subscribe silent stuck when using 
pulsar-client:3.0.x with jackson-annotations prior to 2.12.0 (#21985)

### Motivation
In summary, `jackson-annotations:2.12.0` or later is now required for 
`pulsar-client 3.0.x`, and this also applies to versions `3.1.x` and `3.2.x`.

Otherwise, `ConsumerBuilderImpl#subscribe` may become stuck without 
displaying any error message.

### Modifications

Modify the `whenComplete` to a combination of `thenAccept` and 
`exceptionally`. The modification is harmless.
---
 .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 14 +++---
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index baabaf67070..84504b632ad 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1025,13 +1025,13 @@ public class MultiTopicsConsumerImpl extends 
ConsumerBase {
 
 private void subscribeTopicPartitions(CompletableFuture 
subscribeResult, String topicName, int numPartitions,
 boolean createIfDoesNotExist) {
-client.preProcessSchemaBeforeSubscribe(client, schema, 
topicName).whenComplete((schema, cause) -> {
-if (null == cause) {
-doSubscribeTopicPartitions(schema, subscribeResult, topicName, 
numPartitions, createIfDoesNotExist);
-} else {
-subscribeResult.completeExceptionally(cause);
-}
-});
+client.preProcessSchemaBeforeSubscribe(client, schema, topicName)
+.thenAccept(schema -> {
+doSubscribeTopicPartitions(schema, subscribeResult, 
topicName, numPartitions, createIfDoesNotExist);
+}).exceptionally(cause -> {
+subscribeResult.completeExceptionally(cause);
+return null;
+});
 }
 
 private void doSubscribeTopicPartitions(Schema schema,



Re: [PR] [improve][pip] PIP-336: WASM Function API [pulsar]

2024-02-20 Thread via GitHub


asafm commented on code in PR #21992:
URL: https://github.com/apache/pulsar/pull/21992#discussion_r1495389552


##
pip/pip-331.md:
##
@@ -0,0 +1,129 @@
+# PIP-331: WASM Function API
+
+# Background knowledge
+
+WASM(WebAssembly) bytecode is designed to be encoded in a size- and 
load-time-efficient binary format. WASM aims to leverage the common hardware 
features available on various platforms to execute in browsers at machine code 
speed.
+
+WASI(WebAssembly System Interface) provide a portable interface for 
applications that run within a constrained sandbox environment, which allows 
WASM to run in non browser environments such as Linux. It's portable and secure.
+
+# Motivation
+
+The server and client sides of the Pulsar function use protobuf for 
decoupling. In principle, the language supported by protobuf can be supported 
by the pulsar function, now Pulsar provided the java, python and golang 
function client, but there are still many languages that are not supported.
+
+Before all language adaptations are completed (and it's almost entirely 
certain to be impossible), users cannot write pulsar function in their familiar 
languages.
+
+# Goals
+
+## In Scope
+
+Other languages, as long as their code can be compiled into WASM bytecode 
(such as Rust/golang/C++), users can use these languages to write pulsar 
function.
+
+## Out of Scope
+
+All existing abilities of the Java pulsar function client are not 
reimplemented, the WASM Pulsar functions is under the Java Pulsar functions.

Review Comment:
   Both repos you mentioned have ~100 stars.
   The wasmtime repo you mentioned doesn't mention Java as a supported 
language. Doesn't sound like a solid ground to stand upon no?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated: [improve] [proxy] Add a check for brokerServiceURL that does not support multi uri yet (#21972)

2024-02-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 6aae7f63285 [improve] [proxy] Add a check for brokerServiceURL that 
does not support multi uri yet (#21972)
6aae7f63285 is described below

commit 6aae7f63285863d1077530f007d1f25f9f4c3e50
Author: fengyubiao 
AuthorDate: Tue Jan 30 19:34:01 2024 +0800

[improve] [proxy] Add a check for brokerServiceURL that does not support 
multi uri yet (#21972)

### Motivation

At the beginning of the design, these two configurations(`brokerServiceURL 
& brokerServiceURLTLS`) do not support setting multiple broker addresses, which 
should instead be set to a “discovery service provider.” see: 
https://github.com/apache/pulsar/pull/1002 and 
https://github.com/apache/pulsar/pull/14682

Users will get the below error if they set A to a multi-broker URLs

```
"2024-01-09 00:20:10,261 -0800 [pulsar-proxy-io-4-7] WARN  
io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, 
and it reached at the tail of the pipeline. It usually means the last handler 
in the pipeline did not handle the exception.
java.lang.IllegalArgumentException: port out of range:-1
at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143) 
~[?:?]
at 
java.net.InetSocketAddress.createUnresolved(InetSocketAddress.java:254) ~[?:?]
at 
org.apache.pulsar.proxy.server.LookupProxyHandler.getAddr(LookupProxyHandler.java:432)
 ~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0]
at 
org.apache.pulsar.proxy.server.LookupProxyHandler.handleGetSchema(LookupProxyHandler.java:357)
 ~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0]
at 
org.apache.pulsar.proxy.server.ProxyConnection.handleGetSchema(ProxyConnection.java:463)
 ~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0]
at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:326)
 ~[io.streamnative-pulsar-common-2.9.2.12.jar:2.9.2.12]
at 
org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:221)
 ~[org.apache.pulsar-pulsar-proxy-2.9.0.jar:2.9.0]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
 ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
 ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372) 
~[io.netty-netty-handler-4.1.74.Final.jar:4.1.74.Final]
at 
io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1246) 
~[io.netty-netty-handler-4.1.74.Final.jar:4.1.74.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1286) 
~[io.netty-netty-handler-4.1.74.Final.jar:4.1.74.Final]
at 
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
 ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
 ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
 ~[io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 ~[io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 

(pulsar) branch branch-3.2 updated: [fix][broker] Fix schema deletion error when deleting a partitioned topic with many partitions and schema (#21977)

2024-02-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 7462f669e9e [fix][broker] Fix schema deletion error when deleting a 
partitioned topic with many partitions and schema (#21977)
7462f669e9e is described below

commit 7462f669e9e9f64db5fbc5e39f535bf2d33c2223
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Mon Jan 29 20:16:05 2024 -0800

[fix][broker] Fix schema deletion error when deleting a partitioned topic 
with many partitions and schema (#21977)
---
 .../pulsar/broker/service/BrokerService.java   | 29 ++
 .../service/schema/BookkeeperSchemaStorage.java|  6 +++--
 .../tests/integration/schema/SchemaTest.java   | 11 
 3 files changed, 28 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 62197900076..13bce3f67df 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -123,8 +123,6 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleC
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SystemTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
-import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
-import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
@@ -3447,22 +3445,21 @@ public class BrokerService implements Closeable {
 }
 
 public CompletableFuture deleteSchema(TopicName topicName) {
+// delete schema at the upper level when deleting the partitioned 
topic.
+if (topicName.isPartitioned()) {
+return CompletableFuture.completedFuture(null);
+}
 String base = topicName.getPartitionedTopicName();
 String id = TopicName.get(base).getSchemaName();
-SchemaRegistryService schemaRegistryService = 
getPulsar().getSchemaRegistryService();
-return 
BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
-.thenCompose(schema -> {
-if (schema != null) {
-// It's different from `SchemasResource.deleteSchema`
-// because when we delete a topic, the schema
-// history is meaningless. But when we delete a schema 
of a topic, a new schema could be
-// registered in the future.
-log.info("Delete schema storage of id: {}", id);
-return 
getPulsar().getSchemaRegistryService().deleteSchemaStorage(id);
-} else {
-return CompletableFuture.completedFuture(null);
-}
-});
+return 
getPulsar().getSchemaRegistryService().deleteSchemaStorage(id).whenComplete((vid,
 ex) -> {
+if (vid != null && ex == null) {
+// It's different from `SchemasResource.deleteSchema`
+// because when we delete a topic, the schema
+// history is meaningless. But when we delete a schema of a 
topic, a new schema could be
+// registered in the future.
+log.info("Deleted schema storage of id: {}", id);
+}
+});
 }
 
 private CompletableFuture checkMaxTopicsPerNamespace(TopicName 
topicName, int numPartitions) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 78e30f6fff8..c509764bf67 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -707,7 +707,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
 message += " - entry=" + entryId;
 }
 boolean recoverable = rc != 
BKException.Code.NoSuchLedgerExistsException
-&& rc != BKException.Code.NoSuchEntryException;
+&& rc != BKException.Code.NoSuchEntryException
+&& rc != 
BKException.Code.NoSuchLedgerExistsOnMetadataServerException;
 return new 

(pulsar) branch branch-3.2 updated: [fix] [broker] Fix reader stuck when read from compacted topic with read compact mode disable (#21969)

2024-02-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new ea681a9d579 [fix] [broker] Fix reader stuck when read from compacted 
topic with read compact mode disable (#21969)
ea681a9d579 is described below

commit ea681a9d579868ff0efd1fd863ad97208c751706
Author: thetumbled <52550727+thetumb...@users.noreply.github.com>
AuthorDate: Wed Jan 31 00:11:07 2024 +0800

[fix] [broker] Fix reader stuck when read from compacted topic with read 
compact mode disable (#21969)
---
 .../apache/pulsar/broker/service/ServerCnx.java| 32 +-
 .../compaction/GetLastMessageIdCompactedTest.java  | 27 ++
 2 files changed, 52 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 9f2b98aeb40..0d9b5ea73e0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2170,7 +2170,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 (PositionImpl) markDeletePosition,
 partitionIndex,
 requestId,
-consumer.getSubscription().getName());
+consumer.getSubscription().getName(),
+consumer.readCompacted());
 }).exceptionally(e -> {
 
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
 ServerError.UnknownError, "Failed to recover 
Transaction Buffer."));
@@ -2188,15 +2189,17 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 PositionImpl markDeletePosition,
 int partitionIndex,
 long requestId,
-String subscriptionName) {
+String subscriptionName,
+boolean readCompacted) {
 
 PersistentTopic persistentTopic = (PersistentTopic) topic;
 ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
 
 // If it's not pointing to a valid entry, respond messageId of the 
current position.
 // If the compaction cursor reach the end of the topic, respond 
messageId from compacted ledger
-CompletableFuture compactionHorizonFuture =
-
persistentTopic.getTopicCompactionService().getLastCompactedPosition();
+CompletableFuture compactionHorizonFuture = readCompacted
+? 
persistentTopic.getTopicCompactionService().getLastCompactedPosition() :
+CompletableFuture.completedFuture(null);
 
 compactionHorizonFuture.whenComplete((compactionHorizon, ex) -> {
 if (ex != null) {
@@ -2205,8 +2208,22 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 return;
 }
 
-if (lastPosition.getEntryId() == -1 || (compactionHorizon != null
-&& lastPosition.compareTo((PositionImpl) 
compactionHorizon) <= 0)) {
+if (lastPosition.getEntryId() == -1 || 
!ml.ledgerExists(lastPosition.getLedgerId())) {
+// there is no entry in the original topic
+if (compactionHorizon != null) {
+// if readCompacted is true, we need to read the last 
entry from compacted topic
+handleLastMessageIdFromCompactionService(persistentTopic, 
requestId, partitionIndex,
+markDeletePosition);
+} else {
+// if readCompacted is false, we need to return 
MessageId.earliest
+
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1, -1, 
partitionIndex, -1,
+markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
+markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+}
+return;
+}
+
+if (compactionHorizon != null && 
lastPosition.compareTo((PositionImpl) compactionHorizon) <= 0) {
 handleLastMessageIdFromCompactionService(persistentTopic, 
requestId, partitionIndex,
 markDeletePosition);
 return;
@@ -2241,7 +2258,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
 batchSizeFuture.whenComplete((batchSize, e) -> {
 if (e != null) {
-if (e.getCause() instanceof 
ManagedLedgerException.NonRecoverableLedgerException) {
+if (e.getCause() instanceof 
ManagedLedgerException.NonRecoverableLedgerException
+  

(pulsar) branch branch-3.2 updated: [fix][fn] Use unified PackageManagement service to download packages (#21955)

2024-02-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new c980ee24b72 [fix][fn] Use unified PackageManagement service to 
download packages (#21955)
c980ee24b72 is described below

commit c980ee24b726bcf6f6f980a60d94ce37d8e73016
Author: jiangpengcheng 
AuthorDate: Wed Jan 31 22:18:47 2024 +0800

[fix][fn] Use unified PackageManagement service to download packages 
(#21955)
---
 .../java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index db31847f91c..fc2873d8271 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -1766,6 +1766,8 @@ public abstract class ComponentImpl implements 
Component {
 + "when getting %s package from %s", 
e.getMessage(),
 ComponentTypeUtils.toString(componentType), 
functionPkgUrl));
 }
+} else if (Utils.hasPackageTypePrefix(existingPackagePath)) {
+componentPackageFile = getPackageFile(existingPackagePath);
 } else if (uploadedInputStream != null) {
 componentPackageFile = 
WorkerUtils.dumpToTmpFile(uploadedInputStream);
 } else if (!existingPackagePath.startsWith(Utils.BUILTIN)) {



(pulsar) branch branch-3.2 updated: [fix][client] Fix multi-topics consumer could receive old messages after seek (#21945)

2024-02-20 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 003efdd734e [fix][client] Fix multi-topics consumer could receive old 
messages after seek (#21945)
003efdd734e is described below

commit 003efdd734ee3a373bf86bdfcd740f2c9ab83771
Author: Yunze Xu 
AuthorDate: Wed Jan 31 00:31:15 2024 +0800

[fix][client] Fix multi-topics consumer could receive old messages after 
seek (#21945)
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 80 +-
 .../client/impl/MultiTopicsConsumerImpl.java   | 66 --
 2 files changed, 125 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 51b32c2b44e..c343ab0d6e2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
@@ -57,22 +58,27 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Set;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -1394,4 +1400,76 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
 }
 }
 
+@DataProvider
+public static Object[][] seekByFunction() {
+return new Object[][] {
+{ true }, { false }
+};
+}
+
+@Test(timeOut = 3, dataProvider = "seekByFunction")
+public void testSeekToNewerPosition(boolean seekByFunction) throws 
Exception {
+final var topic1 = TopicName.get(newTopicName()).toString()
+.replace("my-property", "public").replace("my-ns", "default");
+final var topic2 = TopicName.get(newTopicName()).toString()
+.replace("my-property", "public").replace("my-ns", "default");
+@Cleanup final var producer1 = 
pulsarClient.newProducer(Schema.STRING).topic(topic1).create();
+@Cleanup final var producer2 = 
pulsarClient.newProducer(Schema.STRING).topic(topic2).create();
+producer1.send("1-0");
+producer2.send("2-0");
+producer1.send("1-1");
+producer2.send("2-1");
+final var consumer1 = pulsarClient.newConsumer(Schema.STRING)
+.topics(Arrays.asList(topic1, topic2)).subscriptionName("sub")
+.ackTimeout(1, TimeUnit.SECONDS)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+final var timestamps = new ArrayList();
+for (int i = 0; i < 4; i++) {
+timestamps.add(consumer1.receive().getPublishTime());
+}
+timestamps.sort(Comparator.naturalOrder());
+final var timestamp = timestamps.get(2);
+consumer1.close();
+
+final Function, CompletableFuture> seekAsync = 
consumer -> {
+final var future = seekByFunction ? consumer.seekAsync(__ -> 
timestamp) : consumer.seekAsync(timestamp);
+assertEquals(((ConsumerBase) 
consumer).getIncomingMessageSize(), 0L);
+assertEquals(((ConsumerBase) 
consumer).getTotalIncomingMessages(), 0);
+assertTrue(((ConsumerBase) 
consumer).getUnAckedMessageTracker().isEmpty());
+return future;
+};
+
+@Cleanup final var consumer2 = pulsarClient.newConsumer(Schema.STRING)
+.topics(Arrays.asList(topic1, 
topic2)).subscriptionName("sub-2")
+.ackTimeout(1, TimeUnit.SECONDS)
+   

Re: [PR] [feat] PIP-188 Support blue-green migration [pulsar-client-cpp]

2024-02-20 Thread via GitHub


BewareMyPower commented on code in PR #402:
URL: https://github.com/apache/pulsar-client-cpp/pull/402#discussion_r1495363450


##
lib/HandlerBase.h:
##
@@ -52,6 +52,7 @@ class HandlerBase : public 
std::enable_shared_from_this {
 ClientConnectionWeakPtr getCnx() const;
 void setCnx(const ClientConnectionPtr& cnx);
 void resetCnx() { setCnx(nullptr); }
+void setRedirectedClusterURI(const std::string serviceUrl) { 
redirectedClusterURI_ = serviceUrl; }

Review Comment:
   Use `const std::string&` as the parameter type.



##
lib/HandlerBase.h:
##
@@ -145,6 +146,8 @@ class HandlerBase : public 
std::enable_shared_from_this {
 mutable std::mutex connectionMutex_;
 std::atomic reconnectionPending_;
 ClientConnectionWeakPtr connection_;
+std::string redirectedClusterURI_;

Review Comment:
   This field could be accessed in different threads. You should use a mutex to 
protect it.



##
lib/ProducerImpl.cc:
##
@@ -144,7 +144,11 @@ Future ProducerImpl::connectionOpened(const 
ClientConnectionPtr& c
 return promise.getFuture();
 }
 
+LOG_DEBUG("Creating producer for topic:" << topic() << ", producerName:" 
<< producerName_ << " on "

Review Comment:
   Maybe we can use `LOG_INFO` like Java client?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org