(pulsar) branch master updated: [improve][broker]Ensure namespace deletion doesn't fail (#22627)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 936afecede8 [improve][broker]Ensure namespace deletion doesn't fail (#22627) 936afecede8 is described below commit 936afecede8374b14d13e9d48e9372fec1c27447 Author: Enrico Olivelli AuthorDate: Mon May 13 11:50:39 2024 +0200 [improve][broker]Ensure namespace deletion doesn't fail (#22627) --- .../pulsar/broker/resources/BaseResources.java | 27 - .../broker/resources/LocalPoliciesResources.java | 2 +- .../broker/resources/NamespaceResources.java | 17 +-- .../pulsar/broker/resources/TopicResources.java| 35 -- .../pulsar/broker/admin/impl/NamespacesBase.java | 16 -- .../SystemTopicBasedTopicPoliciesService.java | 3 +- .../apache/pulsar/metadata/api/MetadataStore.java | 22 ++ .../metadata/impl/AbstractMetadataStore.java | 13 8 files changed, 78 insertions(+), 57 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index 4011a482075..00e381e0729 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -197,22 +197,21 @@ public class BaseResources { } protected CompletableFuture deleteIfExistsAsync(String path) { -return cache.exists(path).thenCompose(exists -> { -if (!exists) { -return CompletableFuture.completedFuture(null); +log.info("Deleting path: {}", path); +CompletableFuture future = new CompletableFuture<>(); +cache.delete(path).whenComplete((ignore, ex) -> { +if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) { +log.info("Path {} did not exist in metadata store", path); +future.complete(null); +} else if (ex != null) { +log.info("Failed to delete path from metadata store: {}", path, ex); +future.completeExceptionally(ex); +} else { +log.info("Deleted path from metadata store: {}", path); +future.complete(null); } -CompletableFuture future = new CompletableFuture<>(); -cache.delete(path).whenComplete((ignore, ex) -> { -if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) { -future.complete(null); -} else if (ex != null) { -future.completeExceptionally(ex); -} else { -future.complete(null); -} -}); -return future; }); +return future; } protected boolean exists(String path) throws MetadataStoreException { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java index c6b658c3bd0..ae3479fde59 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java @@ -79,7 +79,7 @@ public class LocalPoliciesResources extends BaseResources { } public CompletableFuture deleteLocalPoliciesAsync(NamespaceName ns) { -return deleteAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString())); +return deleteIfExistsAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString())); } public CompletableFuture deleteLocalPoliciesTenantAsync(String tenant) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 975b23192f9..9d7c60cd344 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -115,7 +115,7 @@ public class NamespaceResources extends BaseResources { } public CompletableFuture deletePoliciesAsync(NamespaceName ns){ -return deleteAsync(joinPath(BASE_POLICIES_PATH, ns.toString())); +return deleteIfExistsAsync(joinPath(BASE_POLICIES_PATH, ns.toString())); } public Optional getPolicies(NamespaceName ns) throws Metadata
(pulsar) branch master updated: [improve][storage] Periodically rollover Cursor ledgers (#22622)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 084daf01629 [improve][storage] Periodically rollover Cursor ledgers (#22622) 084daf01629 is described below commit 084daf016294ee56496ae36e298d4e8758dc8906 Author: Enrico Olivelli AuthorDate: Wed May 1 10:29:11 2024 +0200 [improve][storage] Periodically rollover Cursor ledgers (#22622) --- .../apache/bookkeeper/mledger/ManagedCursor.java | 8 + .../apache/bookkeeper/mledger/ManagedLedger.java | 5 +++ .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 36 + .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 ++ .../mledger/impl/ManagedLedgerBkTest.java | 37 ++ .../pulsar/broker/service/BrokerService.java | 1 + 6 files changed, 90 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 8372592c851..227b5429abf 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -870,4 +870,12 @@ public interface ManagedCursor { default boolean isCursorDataFullyPersistable() { return true; } + +/** + * Called by the system to trigger periodic rollover in absence of activity. + */ +default boolean periodicRollover() { +return false; +} + } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index f91d9ec3f5a..955a0d78502 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -631,6 +631,11 @@ public interface ManagedLedger { */ void trimConsumedLedgersInBackground(CompletableFuture promise); +/** + * Rollover cursors in background if needed. + */ +default void rolloverCursorsInBackground() {} + /** * If a ledger is lost, this ledger will be skipped after enabled "autoSkipNonRecoverableData", and the method is * used to delete information about this ledger in the ManagedCursor. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 69b130a98c8..c2f33639c3d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3113,12 +3113,7 @@ public class ManagedCursorImpl implements ManagedCursor { lh1.getId()); } -if (shouldCloseLedger(lh1)) { -if (log.isDebugEnabled()) { -log.debug("[{}] Need to create new metadata ledger for cursor {}", ledger.getName(), name); -} -startCreatingNewMetadataLedger(); -} +rolloverLedgerIfNeeded(lh1); mbean.persistToLedger(true); mbean.addWriteCursorLedgerSize(data.length); @@ -3136,6 +3131,35 @@ public class ManagedCursorImpl implements ManagedCursor { }, null); } +public boolean periodicRollover() { +LedgerHandle lh = cursorLedger; +if (State.Open.equals(STATE_UPDATER.get(this)) +&& lh != null && lh.getLength() > 0) { +boolean triggered = rolloverLedgerIfNeeded(lh); +if (triggered) { +log.info("[{}] Periodic rollover triggered for cursor {} (length={} bytes)", +ledger.getName(), name, lh.getLength()); +} else { +log.debug("[{}] Periodic rollover skipped for cursor {} (length={} bytes)", +ledger.getName(), name, lh.getLength()); + +} +return triggered; +} +return false; +} + +boolean rolloverLedgerIfNeeded(LedgerHandle lh1) { +if (shouldCloseLedger(lh1)) { +if (log.isDebugEnabled()) { +log.debug("[{}] Need to create new metadata ledger for cursor {}", ledger.getName(), name); +} +startCreatingNewMetadataLedger(); +return true; +} +return false; +} + void persistPositionToMetaStore(MarkDeleteEntry mdEntry, final VoidCallback callback) { final PositionImpl ne
(pulsar) branch branch-3.1 updated: [fix][broker] Fix OpReadEntry.skipCondition NPE issue (#22367)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 53662d34d9f [fix][broker] Fix OpReadEntry.skipCondition NPE issue (#22367) 53662d34d9f is described below commit 53662d34d9f86aae8bdbff5e51dbfd5de2772598 Author: Jiwei Guo AuthorDate: Wed Mar 27 20:12:08 2024 +0800 [fix][broker] Fix OpReadEntry.skipCondition NPE issue (#22367) (cherry picked from commit 404c0572a461908ffe09c483092d8df78356eae9) --- .../src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 7b59c3903d5..a79ba3fb5e2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -209,8 +209,8 @@ class OpReadEntry implements ReadEntriesCallback { entries = null; nextReadPosition = null; maxPosition = null; -recyclerHandle.recycle(this); skipCondition = null; +recyclerHandle.recycle(this); } private static final Logger log = LoggerFactory.getLogger(OpReadEntry.class);
(pulsar-helm-chart) branch master updated: Disable functions by default in values.yaml (#483)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-helm-chart.git The following commit(s) were added to refs/heads/master by this push: new cc0a1ac Disable functions by default in values.yaml (#483) cc0a1ac is described below commit cc0a1acf227e489a06ce0d7648722ed728e1fbcc Author: Lari Hotari AuthorDate: Tue Mar 26 15:17:40 2024 -0700 Disable functions by default in values.yaml (#483) --- .ci/values-common.yaml| 2 ++ charts/pulsar/values.yaml | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.ci/values-common.yaml b/.ci/values-common.yaml index c7e3b70..65f2324 100644 --- a/.ci/values-common.yaml +++ b/.ci/values-common.yaml @@ -36,6 +36,8 @@ affinity: components: autorecovery: false pulsar_manager: false + # enable functions by default in CI + functions: true zookeeper: replicaCount: 1 diff --git a/charts/pulsar/values.yaml b/charts/pulsar/values.yaml index 0d63ba8..1f73983 100755 --- a/charts/pulsar/values.yaml +++ b/charts/pulsar/values.yaml @@ -120,7 +120,9 @@ components: # broker broker: true # functions - functions: true + # WARNING! Before enabling functions, make sure that all of your users are trusted since functions run user code + # and the current security sandbox is not sufficient to protect against malicious code. + functions: false # proxy proxy: true # toolset
(pulsar-manager) branch master updated: Remove JWT validation from production code (#547)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git The following commit(s) were added to refs/heads/master by this push: new 4476f5e Remove JWT validation from production code (#547) 4476f5e is described below commit 4476f5e5537c701744da80b6e9cdf9634f341723 Author: Jonathan Leitschuh AuthorDate: Wed Feb 7 06:51:47 2024 -0500 Remove JWT validation from production code (#547) Signed-off-by: Jonathan Leitschuh --- .../apache/pulsar/manager/service/JwtService.java | 6 - .../manager/service/impl/JwtServiceImpl.java | 31 -- .../{ => impl}/BrokerTokensServiceImplTest.java| 19 ++--- 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/apache/pulsar/manager/service/JwtService.java b/src/main/java/org/apache/pulsar/manager/service/JwtService.java index 64d5162..3c126d7 100644 --- a/src/main/java/org/apache/pulsar/manager/service/JwtService.java +++ b/src/main/java/org/apache/pulsar/manager/service/JwtService.java @@ -13,10 +13,6 @@ */ package org.apache.pulsar.manager.service; -import io.jsonwebtoken.Claims; -import org.springframework.stereotype.Service; - -import java.security.Key; import java.util.Optional; public interface JwtService { @@ -27,8 +23,6 @@ public interface JwtService { String createBrokerToken(String role, String expiryTime); -Claims validateBrokerToken(String token); - void setToken(String key, String value); String getToken(String key); diff --git a/src/main/java/org/apache/pulsar/manager/service/impl/JwtServiceImpl.java b/src/main/java/org/apache/pulsar/manager/service/impl/JwtServiceImpl.java index fa460e5..fe9d816 100644 --- a/src/main/java/org/apache/pulsar/manager/service/impl/JwtServiceImpl.java +++ b/src/main/java/org/apache/pulsar/manager/service/impl/JwtServiceImpl.java @@ -13,6 +13,7 @@ */ package org.apache.pulsar.manager.service.impl; +import com.google.common.annotations.VisibleForTesting; import io.jsonwebtoken.*; import io.jsonwebtoken.security.Keys; import org.apache.pulsar.manager.service.JwtService; @@ -24,6 +25,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import javax.annotation.Nullable; import java.io.IOException; import java.security.Key; import java.util.Date; @@ -111,16 +113,21 @@ public class JwtServiceImpl implements JwtService { } } -public String createBrokerToken(String role, String expiryTime) { -Key signingKey; +@VisibleForTesting +@Nullable +Key getSigningKey() { if (jwtBrokerTokenMode.equals("SECRET")) { -signingKey = decodeBySecretKey(); +return decodeBySecretKey(); } else if (jwtBrokerTokenMode.equals("PRIVATE")){ -signingKey = decodeByPrivateKey(); +return decodeByPrivateKey(); } else { log.info("Default disable JWT auth, please set jwt.broker.token.mode."); return null; } +} + +public String createBrokerToken(String role, String expiryTime) { +Key signingKey = getSigningKey(); if (signingKey == null) { log.error("JWT Auth failed, signingKey is not empty"); return null; @@ -144,20 +151,4 @@ public class JwtServiceImpl implements JwtService { return null; } } - -public Claims validateBrokerToken(String token) { -Key validationKey; -if (jwtBrokerTokenMode.equals("SECRET")) { -validationKey = decodeBySecretKey(); -} else if (jwtBrokerTokenMode.equals("PRIVATE")){ -validationKey = decodeByPrivateKey(); -} else { -log.info("Default disable JWT auth, please set jwt.broker.token.mode."); -return null; -} -Jwt jwt = Jwts.parser() -.setSigningKey(validationKey) -.parse(token); -return jwt.getBody(); -} } diff --git a/src/test/java/org/apache/pulsar/manager/service/BrokerTokensServiceImplTest.java b/src/test/java/org/apache/pulsar/manager/service/impl/BrokerTokensServiceImplTest.java similarity index 78% rename from src/test/java/org/apache/pulsar/manager/service/BrokerTokensServiceImplTest.java rename to src/test/java/org/apache/pulsar/manager/service/impl/BrokerTokensServiceImplTest.java index 618dee7..3b936fc 100644 --- a/src/test/java/org/apache/pulsar/manager/service/BrokerTokensServiceImplTest.java +++ b/src/test/java/org/apache/pulsar/manager/service/impl/BrokerTokensServiceImplTest.java @@ -11,11 +11,14 @@ * See the License for the specific language governing permissions and * limitations und
svn commit: r64431 - /dev/pulsar/pulsar-client-reactive-0.4.0-candidate-1/ /release/pulsar/pulsar-client-reactive-0.4.0/
Author: eolivelli Date: Mon Oct 9 17:42:20 2023 New Revision: 64431 Log: Release Reactive client for Apache Pulsar 0.4.0 Added: release/pulsar/pulsar-client-reactive-0.4.0/ - copied from r64430, dev/pulsar/pulsar-client-reactive-0.4.0-candidate-1/ Removed: dev/pulsar/pulsar-client-reactive-0.4.0-candidate-1/
[pulsar-client-go] branch master updated: connectionTimeout respects net.Dialer default timeout (#1095)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 7cf643be connectionTimeout respects net.Dialer default timeout (#1095) 7cf643be is described below commit 7cf643be20b0eed9b37e168d04884e89f534efdd Author: ming AuthorDate: Fri Sep 15 11:32:28 2023 -0400 connectionTimeout respects net.Dialer default timeout (#1095) --- pulsar/client_impl.go | 7 +++ pulsar/internal/connection.go | 8 +++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 801eab3f..7daf6f62 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -31,7 +31,6 @@ import ( ) const ( - defaultConnectionTimeout = 10 * time.Second defaultOperationTimeout= 30 * time.Second defaultKeepAliveInterval = 30 * time.Second defaultMemoryLimitBytes= 64 * 1024 * 1024 @@ -117,10 +116,10 @@ func newClient(options ClientOptions) (Client, error) { return nil, err } + // the default timeout respects Go's default timeout which is no timeout + // Missing user specified timeout renders 0 values that matches + // net.Dailer's default if time.Duration value is not initialized connectionTimeout := options.ConnectionTimeout - if connectionTimeout.Nanoseconds() == 0 { - connectionTimeout = defaultConnectionTimeout - } operationTimeout := options.OperationTimeout if operationTimeout.Nanoseconds() == 0 { diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index e2ae7ac8..840ecc4f 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -256,7 +256,11 @@ func (c *connection) connect() bool { if c.tlsOptions == nil { // Clear text connection - cnx, err = net.DialTimeout("tcp", c.physicalAddr.Host, c.connectionTimeout) + if c.connectionTimeout.Nanoseconds() > 0 { + cnx, err = net.DialTimeout("tcp", c.physicalAddr.Host, c.connectionTimeout) + } else { + cnx, err = net.Dial("tcp", c.physicalAddr.Host) + } } else { // TLS connection tlsConfig, err = c.getTLSConfig() @@ -265,6 +269,8 @@ func (c *connection) connect() bool { return false } + // time.Duration is initialized to 0 by default, net.Dialer's default timeout is no timeout + // therefore if c.connectionTimeout is 0, it means no timeout d := {Timeout: c.connectionTimeout} cnx, err = tls.DialWithDialer(d, "tcp", c.physicalAddr.Host, tlsConfig) }
[pulsar] branch master updated (ecd16d68e29 -> 57fbee446ae)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from ecd16d68e29 [fix][client] fix negative message re-delivery twice issue (#20750) add 57fbee446ae [improve][schema] Prevent useless encode/decode of Avro schema in NATIVE_AVRO (#20675) No new revisions were added by this update. Summary of changes: .../client/impl/schema/NativeAvroBytesSchema.java | 46 ++ .../src/main/resources/findbugsExclude.xml | 3 +- 2 files changed, 23 insertions(+), 26 deletions(-)
[pulsar] branch master updated (f4386c868b3 -> 0a39b819a9e)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from f4386c868b3 [fix][broker] Support getStats/update partitioned topic with `-partition-` (#19235) add 0a39b819a9e [fix][fn] JavaInstanceStarter --tls_allow_insecure default to false (#20267) No new revisions were added by this update. Summary of changes: .../java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
svn commit: r61482 - /dev/pulsar/pulsar-client-reactive-0.3.0-candidate-1/ /release/pulsar/pulsar-client-reactive-0.3.0/
Author: eolivelli Date: Wed Apr 26 13:30:31 2023 New Revision: 61482 Log: Release Reactive client for Apache Pulsar 0.3.0 Added: release/pulsar/pulsar-client-reactive-0.3.0/ - copied from r61481, dev/pulsar/pulsar-client-reactive-0.3.0-candidate-1/ Removed: dev/pulsar/pulsar-client-reactive-0.3.0-candidate-1/
[pulsar] branch master updated (67eb0fb4e83 -> 5ef3a21a068)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 67eb0fb4e83 [cleanup][proxy] ProxyConnection should not call super.exceptionCaught (#19990) add 5ef3a21a068 [fix][build] Client modules should be built with Java 8 (#19991) No new revisions were added by this update. Summary of changes: pom.xml | 4 ++-- pulsar-client-auth-athenz/pom.xml | 41 ++- pulsar-client-auth-sasl/pom.xml | 39 + 3 files changed, 81 insertions(+), 3 deletions(-)
[pulsar] branch master updated (ee5ac8607eb -> 38485e09ce1)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from ee5ac8607eb [improve][broker] PIP-192 Improved Auto Unload Logic (#19813) add 38485e09ce1 [improve][build] Create source jar for pulsar-client-all shaded jar (#19956) No new revisions were added by this update. Summary of changes: pulsar-client-all/pom.xml | 2 ++ 1 file changed, 2 insertions(+)
[pulsar] branch master updated (bbf52736f75 -> 7cb48fd9d41)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from bbf52736f75 [improve][broker][PIP-195] Merge multiple buckets at once (#19927) add 7cb48fd9d41 [improve][io] KCA: option to collapse partitioned topics (#19923) No new revisions were added by this update. Summary of changes: pulsar-io/kafka-connect-adaptor/pom.xml| 7 ++ .../pulsar/io/kafka/connect/KafkaConnectSink.java | 20 - .../connect/PulsarKafkaConnectSinkConfig.java | 5 ++ .../io/kafka/connect/KafkaConnectSinkTest.java | 88 ++ 4 files changed, 118 insertions(+), 2 deletions(-)
[pulsar] branch master updated (c172a775fa0 -> f1f8dab972b)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from c172a775fa0 [fix][broker] Fix can't send ErrorCommand when message is null value (#19899) add f1f8dab972b [improve][io][broker] Updated org.reflections-reflections library version (#19898) No new revisions were added by this update. Summary of changes: pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[pulsar] branch branch-2.10 updated: [cherry-pick][branch-2.10] KCA: picking fixes from master (#19788)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new 8f47bd12e25 [cherry-pick][branch-2.10] KCA: picking fixes from master (#19788) 8f47bd12e25 is described below commit 8f47bd12e254ed943650720a362dab429b8255bd Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com> AuthorDate: Tue Mar 14 00:20:33 2023 -0700 [cherry-pick][branch-2.10] KCA: picking fixes from master (#19788) --- .../kafka/connect/AbstractKafkaConnectSource.java | 48 --- .../pulsar/io/kafka/connect/KafkaConnectSink.java | 22 - .../kafka/connect/PulsarKafkaSinkTaskContext.java | 14 -- .../io/kafka/connect/KafkaConnectSinkTest.java | 44 ++ .../io/kafka/connect/KafkaConnectSourceTest.java | 54 ++ 5 files changed, 152 insertions(+), 30 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java index 8c0b2e3cc93..2364e1d62a6 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.io.kafka.connect; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -30,7 +32,9 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceTaskContext; @@ -56,6 +60,7 @@ public abstract class AbstractKafkaConnectSource implements Source { // kafka connect related variables private SourceTaskContext sourceTaskContext; +private SourceConnector connector; @Getter private SourceTask sourceTask; public Converter keyConverter; @@ -72,6 +77,8 @@ public abstract class AbstractKafkaConnectSource implements Source { // number of outstandingRecords that have been polled but not been acked private final AtomicInteger outstandingRecords = new AtomicInteger(0); +public static final String CONNECTOR_CLASS = "kafkaConnectorSourceClass"; + @Override public void open(Map config, SourceContext sourceContext) throws Exception { Map stringConfig = new HashMap<>(); @@ -81,12 +88,6 @@ public abstract class AbstractKafkaConnectSource implements Source { } }); -// get the source class name from config and create source task from reflection -sourceTask = Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG)) -.asSubclass(SourceTask.class) -.getDeclaredConstructor() -.newInstance(); - topicNamespace = stringConfig.get(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG); // initialize the key and value converter @@ -130,8 +131,36 @@ public abstract class AbstractKafkaConnectSource implements Source { sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader, pulsarKafkaWorkerConfig); +final Map taskConfig; +if (config.get(CONNECTOR_CLASS) != null) { +String kafkaConnectorFQClassName = config.get(CONNECTOR_CLASS).toString(); +Class clazz = Class.forName(kafkaConnectorFQClassName); +connector = (SourceConnector) clazz.getConstructor().newInstance(); + +Class taskClass = connector.taskClass(); +sourceTask = (SourceTask) taskClass.getConstructor().newInstance(); + +connector.initialize(new PulsarKafkaSinkContext()); +connector.start(stringConfig); + +List> configs = connector.taskConfigs(1); +checkNotNull(configs); +checkArgument(configs.size() == 1); +taskConfig = configs.get(0); +} else { +// for backward compatibility with old configuration +// that use the task directly + +// get the source class name from config and create source task from reflection +sourceTask = Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG)) +.as
[pulsar] branch master updated (9feb85b19ca -> 90b0f0a1757)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 9feb85b19ca [fix][client] Fix topic list watcher fail log (#19733) add 90b0f0a1757 [fix][io] KCA: Option to use kafka connector's SourceConnector class to create task and task config (#19772) No new revisions were added by this update. Summary of changes: .../kafka/connect/AbstractKafkaConnectSource.java | 48 --- .../io/kafka/connect/KafkaConnectSourceTest.java | 54 ++ 2 files changed, 77 insertions(+), 25 deletions(-)
svn commit: r60502 - /dev/pulsar/pulsar-adapters-2.11.0-candidate-3/ /release/pulsar/pulsar-adapters-2.11.0/
Author: eolivelli Date: Thu Mar 9 10:31:43 2023 New Revision: 60502 Log: release 2.11.0 Added: release/pulsar/pulsar-adapters-2.11.0/ - copied from r60501, dev/pulsar/pulsar-adapters-2.11.0-candidate-3/ Removed: dev/pulsar/pulsar-adapters-2.11.0-candidate-3/
[pulsar] branch master updated: [fix][io] KCA: 'desanitize' topic name for the pulsar's ctx calls (#19756)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d4930a31c05 [fix][io] KCA: 'desanitize' topic name for the pulsar's ctx calls (#19756) d4930a31c05 is described below commit d4930a31c052dd8fcd5982b649898967a24f8961 Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com> AuthorDate: Thu Mar 9 01:09:34 2023 -0800 [fix][io] KCA: 'desanitize' topic name for the pulsar's ctx calls (#19756) --- .../pulsar/io/kafka/connect/KafkaConnectSink.java | 22 ++- .../kafka/connect/PulsarKafkaSinkTaskContext.java | 14 +-- .../io/kafka/connect/KafkaConnectSinkTest.java | 44 ++ 3 files changed, 75 insertions(+), 5 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java index 37d0987e610..efbad2ef47a 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java @@ -95,6 +95,12 @@ public class KafkaConnectSink implements Sink { CacheBuilder.newBuilder().maximumSize(1000) .expireAfterAccess(30, TimeUnit.MINUTES).build(); +// Can't really safely expire these entries. If we do, we could end up with +// a sanitized topic name that used in e.g. resume() after a long pause but can't be +// // re-resolved into a form usable for Pulsar. +private final Cache desanitizedTopicCache = +CacheBuilder.newBuilder().build(); + private int maxBatchBitsForOffset = 12; private boolean useIndexAsOffset = true; @@ -184,7 +190,18 @@ public class KafkaConnectSink implements Sink { }); task = (SinkTask) taskClass.getConstructor().newInstance(); taskContext = -new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open); +new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open, kafkaName -> { +if (sanitizeTopicName) { +String pulsarTopicName = desanitizedTopicCache.getIfPresent(kafkaName); +if (log.isDebugEnabled()) { +log.debug("desanitizedTopicCache got: kafkaName: {}, pulsarTopicName: {}", +kafkaName, pulsarTopicName); +} +return pulsarTopicName != null ? pulsarTopicName : kafkaName; +} else { +return kafkaName; +} +}); task.initialize(taskContext); task.start(configs.get(0)); @@ -486,6 +503,9 @@ public class KafkaConnectSink implements Sink { if (sanitizedName.matches("^[^a-zA-Z_].*")) { sanitizedName = "_" + sanitizedName; } +// do this once, sanitize() can be called on already sanitized name +// so avoid replacing with (sanitizedName -> sanitizedName). +desanitizedTopicCache.get(sanitizedName, () -> name); return sanitizedName; }); } catch (ExecutionException e) { diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java index 99a8bf29082..7a908b553a8 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.function.Function; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -49,6 +50,7 @@ public class PulsarKafkaSinkTaskContext implements SinkTaskContext { private final SinkContext ctx; private final OffsetBackingStore offsetStore; +private Function desanitizeTopicName; private final String topicNamespace; private final Consumer> onPartitionChange; private final AtomicBoolean runRepartition = new AtomicBoolean(false); @@ -57,11 +59,13 @@ public class PulsarKafkaSinkTaskContext implements SinkTaskContext {
[pulsar-adapters] branch master updated: Update to Pulsar 2.11.0 (#40)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git The following commit(s) were added to refs/heads/master by this push: new 3350cb5 Update to Pulsar 2.11.0 (#40) 3350cb5 is described below commit 3350cb5cf002886f55fceb2a17b4d7912145e5e4 Author: Christophe Bornet AuthorDate: Wed Mar 1 04:37:59 2023 -0400 Update to Pulsar 2.11.0 (#40) --- .github/workflows/ci-maven-cache-update.yaml | 4 +- .github/workflows/integration-test.yaml| 16 +++ .github/workflows/unit-test.yaml | 4 +- README.md | 4 +- examples/kafka-streams/pom.xml | 4 +- examples/pom.xml | 2 +- examples/spark/pom.xml | 2 +- pom.xml| 52 -- pulsar-client-kafka-compat/pom.xml | 5 ++- .../pulsar-client-kafka-shaded/pom.xml | 2 +- .../pulsar-client-kafka-shaded_0_8/pom.xml | 2 +- .../pulsar-client-kafka-shaded_0_9/pom.xml | 2 +- .../pulsar-client-kafka-tests/pom.xml | 2 +- .../pulsar-client-kafka-tests_0_8/pom.xml | 8 +++- .../pulsar-client-kafka-tests_0_9/pom.xml | 2 +- .../pulsar-client-kafka/pom.xml| 2 +- .../producer/PulsarCliebtKafkaConfigTest.java | 3 +- .../pulsar-client-kafka_0_8/pom.xml| 8 +++- .../clients/producer/PulsarKafkaProducerTest.java | 3 +- .../pulsar-client-kafka_0_9/pom.xml| 2 +- pulsar-log4j2-appender/pom.xml | 2 +- .../appender/PulsarAppenderClientConfTest.java | 3 +- pulsar-spark/pom.xml | 11 - pulsar-storm/pom.xml | 2 +- tests/pom.xml | 2 +- tests/pulsar-kafka-compat-client-test/pom.xml | 2 +- tests/pulsar-spark-test/pom.xml| 19 +++- tests/pulsar-storm-test/pom.xml| 4 +- 28 files changed, 120 insertions(+), 54 deletions(-) diff --git a/.github/workflows/ci-maven-cache-update.yaml b/.github/workflows/ci-maven-cache-update.yaml index 517893b..ddc1647 100644 --- a/.github/workflows/ci-maven-cache-update.yaml +++ b/.github/workflows/ci-maven-cache-update.yaml @@ -89,12 +89,12 @@ jobs: # on growing from old entries which wouldn't never expire if the old # cache would be used as the starting point for a new cache entry - - name: Set up JDK 11 + - name: Set up JDK 17 uses: actions/setup-java@v2 if: ${{ (github.event_name == 'schedule' || steps.changes.outputs.poms == 'true') && steps.cache.outputs.cache-hit != 'true' }} with: distribution: 'adopt' - java-version: 11 + java-version: 17 - name: Download dependencies if: ${{ (github.event_name == 'schedule' || steps.changes.outputs.poms == 'true') && steps.cache.outputs.cache-hit != 'true' }} diff --git a/.github/workflows/integration-test.yaml b/.github/workflows/integration-test.yaml index f1a9a51..b6616c0 100644 --- a/.github/workflows/integration-test.yaml +++ b/.github/workflows/integration-test.yaml @@ -64,28 +64,28 @@ jobs: restore-keys: | ${{ runner.os }}-m2-dependencies- - - name: Set up JDK 11 + - name: Set up JDK 17 uses: actions/setup-java@v2 if: ${{ steps.check_changes.outputs.docs_only != 'true' }} with: distribution: 'adopt' - java-version: 11 + java-version: 17 - - name: install org.apache.pulsar.tests:integration:jar:tests:2.8.0 + - name: install org.apache.pulsar.tests:integration:jar:tests:2.11.0 if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | cd ~ - git clone --depth 50 --single-branch --branch v2.8.0 https://github.com/apache/pulsar + git clone --depth 50 --single-branch --branch v2.11.0 https://github.com/apache/pulsar cd pulsar mvn -B -ntp -f tests/pom.xml -pl org.apache.pulsar.tests:tests-parent,org.apache.pulsar.tests:integration install - name: build apachepulsar/pulsar-test-latest-version:latest if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | - docker pull apachepulsar/pulsar-all:2.8.0 - docker pull apachepulsar/pulsar:2.8.0 - docker tag apachepulsar/pulsar-all:2.8.0 apachepulsar/pulsar-all:latest - docker tag apachepulsar/pulsar:2.8.0 apachepulsar/pulsar:latest + docker pull apachepulsar/pulsar-all:2.11.0 + docker pull apachepulsar/pulsar:2.11.0 + docker tag apachepulsar/pulsar-all:2.11.0 apachepulsar/p
[pulsar] branch master updated: [fix][client] Fix race condition that leads to caching failed CompletableFutures in ConnectionPool (#19661)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 69fb3c2ca3f [fix][client] Fix race condition that leads to caching failed CompletableFutures in ConnectionPool (#19661) 69fb3c2ca3f is described below commit 69fb3c2ca3faa32ff12fd1270730b3517ea69220 Author: Enrico Olivelli AuthorDate: Tue Feb 28 12:17:33 2023 +0100 [fix][client] Fix race condition that leads to caching failed CompletableFutures in ConnectionPool (#19661) --- .../java/org/apache/pulsar/client/impl/ConnectionPool.java | 13 + 1 file changed, 13 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 3a9a2b9b7ab..1420d81c688 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -216,6 +216,15 @@ public class ConnectionPool implements AutoCloseable { pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap<>()); CompletableFuture completableFuture = innerPool .computeIfAbsent(randomKey, k -> createConnection(logicalAddress, physicalAddress, randomKey)); +if (completableFuture.isCompletedExceptionally()) { +// we cannot cache a failed connection, so we remove it from the pool +// there is a race condition in which +// cleanupConnection is called before caching this result +// and so the clean up fails +cleanupConnection(logicalAddress, randomKey, completableFuture); +return completableFuture; +} + return completableFuture.thenCompose(clientCnx -> { // If connection already release, create a new one. if (clientCnx.getIdleState().isReleased()) { @@ -274,6 +283,10 @@ public class ConnectionPool implements AutoCloseable { }).exceptionally(exception -> { log.warn("[{}] Connection handshake failed: {}", cnx.channel(), exception.getMessage()); cnxFuture.completeExceptionally(exception); +// this cleanupConnection may happen before that the +// CompletableFuture is cached into the "pool" map, +// it is not enough to clean it here, we need to clean it +// in the "pool" map when the CompletableFuture is cached cleanupConnection(logicalAddress, connectionKey, cnxFuture); cnx.ctx().close(); return null;
[pulsar] branch master updated: [fix][sec] Upgrade kafka client to 3.4.0 to fix CVE-2023-25194 (#19527)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new ca0b25ecba5 [fix][sec] Upgrade kafka client to 3.4.0 to fix CVE-2023-25194 (#19527) ca0b25ecba5 is described below commit ca0b25ecba50dd86de28ad221d1c29ec6419a973 Author: Masahiro Sakamoto AuthorDate: Wed Feb 15 22:40:19 2023 +0900 [fix][sec] Upgrade kafka client to 3.4.0 to fix CVE-2023-25194 (#19527) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index bf1ad305e3b..07e7ad634df 100644 --- a/pom.xml +++ b/pom.xml @@ -167,7 +167,7 @@ flexible messaging model and an intuitive client API. 2.2.0 3.11.2 4.4.20 -2.8.2 +3.4.0 5.5.3 1.12.262 1.10.2
[pulsar] branch master updated: [fix][broker] catch exception for brokerInterceptor (#19147)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5d1fc6d5f3b [fix][broker] catch exception for brokerInterceptor (#19147) 5d1fc6d5f3b is described below commit 5d1fc6d5f3b5b68e9760a572172cf163a1e9785a Author: AloysZhang AuthorDate: Tue Feb 14 16:15:30 2023 +0800 [fix][broker] catch exception for brokerInterceptor (#19147) --- .../BrokerInterceptorWithClassLoader.java | 6 ++ .../broker/intercept/BrokerInterceptors.java | 6 ++ .../apache/pulsar/broker/service/ServerCnx.java| 22 ++-- .../intercept/ExceptionsBrokerInterceptor.java | 102 ++ .../intercept/ExceptionsBrokerInterceptorTest.java | 117 + .../org/apache/pulsar/client/impl/ClientCnx.java | 20 6 files changed, 267 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java index a74730d23e1..faee5799289 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.intercept; +import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import java.io.IOException; import java.util.Map; @@ -208,4 +209,9 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor { log.warn("Failed to close the broker interceptor class loader", e); } } + +@VisibleForTesting +public BrokerInterceptor getInterceptor() { +return interceptor; +} } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java index e7f82742a97..cef3f0eb609 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.intercept; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; import java.io.IOException; @@ -277,4 +278,9 @@ public class BrokerInterceptors implements BrokerInterceptor { private boolean interceptorsEnabled() { return interceptors != null && !interceptors.isEmpty(); } + +@VisibleForTesting +public Map getInterceptors() { +return interceptors; +} } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 1351c6fe715..4c81b46601e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1185,7 +1185,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { remoteAddress, topicName, subscriptionName); commandSender.sendSuccessResponse(requestId); if (brokerInterceptor != null) { -brokerInterceptor.consumerCreated(this, consumer, metadata); +try { + brokerInterceptor.consumerCreated(this, consumer, metadata); +} catch (Throwable t) { +log.error("Exception occur when intercept consumer created.", t); +} } } else { // The consumer future was completed before by a close command @@ -1223,8 +1227,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { } // If client timed out, the future would have been completed by subsequent close. -// Send error -// back to client, only if not completed already. +// Send error back to client, only if not completed already. if (consumerFuture.completeExceptionally(exception)) { commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErro
[pulsar] branch master updated: [improve][broker] PIP-192: Write the child ownership to `ServiceUnitStateChannel` instead of ZK when handling bundle split (#18858)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 950ff441da2 [improve][broker] PIP-192: Write the child ownership to `ServiceUnitStateChannel` instead of ZK when handling bundle split (#18858) 950ff441da2 is described below commit 950ff441da28e144bdfb71c317a9bc339d4f05b7 Author: Kai Wang AuthorDate: Mon Feb 13 19:30:36 2023 +0800 [improve][broker] PIP-192: Write the child ownership to `ServiceUnitStateChannel` instead of ZK when handling bundle split (#18858) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 16 +-- .../channel/ServiceUnitStateChannelImpl.java | 133 + .../pulsar/broker/namespace/NamespaceService.java | 74 +--- .../channel/ServiceUnitStateChannelTest.java | 53 +++- 4 files changed, 221 insertions(+), 55 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index d5cf6a3e74d..5446060ac65 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1028,7 +1028,9 @@ public abstract class NamespacesBase extends AdminResource { validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, authoritative, false)) .thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, - getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), splitBoundaries)); +pulsar().getNamespaceService() + .getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), +splitBoundaries)); }); } @@ -1109,18 +,6 @@ public abstract class NamespacesBase extends AdminResource { .getBundleWithHighestThroughputAsync(namespaceName); } -private NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) { -NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName); -if (algorithm == null) { -algorithm = NamespaceBundleSplitAlgorithm.of( - pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm()); -} -if (algorithm == null) { -algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO; -} -return algorithm; -} - protected void internalSetPublishRate(PublishRate maxPublishMessageRate) { validateSuperUserAccess(); log.info("[{}] Set namespace publish-rate {}/{}", clientAppId(), namespaceName, maxPublishMessageRate); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index d5bcd3e1436..d10138bda68 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.channel; +import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; @@ -35,7 +37,9 @@ import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -48,6 +52,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.
[pulsar-client-reactive] branch main updated: Update README.adoc (#121)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-reactive.git The following commit(s) were added to refs/heads/main by this push: new eddc06b Update README.adoc (#121) eddc06b is described below commit eddc06b550285b65a5cba62c464d4e4b8ff1f4a1 Author: Christophe Bornet AuthorDate: Tue Feb 7 12:01:10 2023 +0100 Update README.adoc (#121) --- README.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.adoc b/README.adoc index 7c5ab30..feecce1 100644 --- a/README.adoc +++ b/README.adoc @@ -19,7 +19,7 @@ = Reactive client for Apache Pulsar :github: https://github.com/apache/pulsar-client-reactive -:latest_version: 0.1.0 +:latest_version: 0.2.0 Reactive client for Apache Pulsar which is compatible with the Reactive Streams specification. This uses Project Reactor as the Reactive Streams implementation.
[pulsar] branch master updated: [improve] PIP-241: add TopicEventListener / topic events for the BrokerService (#19153)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3e44d1e6e2b [improve] PIP-241: add TopicEventListener / topic events for the BrokerService (#19153) 3e44d1e6e2b is described below commit 3e44d1e6e2ba4599d547c83cf7cb25350f0cc560 Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com> AuthorDate: Thu Feb 2 10:04:52 2023 -0800 [improve] PIP-241: add TopicEventListener / topic events for the BrokerService (#19153) --- .../pulsar/broker/service/BrokerService.java | 73 - .../broker/service/TopicEventsDispatcher.java | 137 + .../pulsar/broker/service/TopicEventsListener.java | 62 .../pulsar/broker/TopicEventsListenerTest.java | 311 + .../pulsar/broker/service/BrokerTestBase.java | 2 +- 5 files changed, 579 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f7020963fb7..27a1518cb81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -110,6 +110,8 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; +import org.apache.pulsar.broker.service.TopicEventsListener.EventStage; +import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; @@ -281,6 +283,8 @@ public class BrokerService implements Closeable { private Set brokerEntryMetadataInterceptors; private Set brokerEntryPayloadProcessors; +private final TopicEventsDispatcher topicEventsDispatcher = new TopicEventsDispatcher(); + public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception { this.pulsar = pulsar; this.preciseTopicPublishRateLimitingEnable = @@ -398,6 +402,16 @@ public class BrokerService implements Closeable { this.bundlesQuotas = new BundlesQuotas(pulsar.getLocalMetadataStore()); } +public void addTopicEventListener(TopicEventsListener... listeners) { +topicEventsDispatcher.addTopicEventListener(listeners); +getTopics().keys().forEach(topic -> +TopicEventsDispatcher.notify(listeners, topic, TopicEvent.LOAD, EventStage.SUCCESS, null)); +} + +public void removeTopicEventListener(TopicEventsListener... listeners) { +topicEventsDispatcher.removeTopicEventListener(listeners); +} + // This call is used for starting additional protocol handlers public void startProtocolHandlers( Map>> protocolHandlers) { @@ -1024,21 +1038,41 @@ public class BrokerService implements Closeable { return loadOrCreatePersistentTopic(tpName, createIfMissing, properties); }); } else { -return topics.computeIfAbsent(topicName.toString(), (name) -> { +return topics.computeIfAbsent(topicName.toString(), (name) -> { +topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE); if (topicName.isPartitioned()) { final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> { if (topicName.getPartitionIndex() < metadata.partitions) { -return createNonPersistentTopic(name); +topicEventsDispatcher +.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE); + +CompletableFuture> res = createNonPersistentTopic(name); + +CompletableFuture> eventFuture = topicEventsDispatcher +.notifyOnCompletion(res, topicName.toString(), TopicEvent.CREATE); +topicEventsDispatcher +.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD); +
[pulsar-site] branch main updated: Update Pulsar PMC members and committers (#387)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new c3c261b6222 Update Pulsar PMC members and committers (#387) c3c261b6222 is described below commit c3c261b6222c1e27ac6598f831703a12c186f164 Author: Lari Hotari AuthorDate: Fri Jan 20 14:51:19 2023 +0200 Update Pulsar PMC members and committers (#387) Process to update data/team.js file: 1. Logged in to https://whimsy.apache.org/roster/committee/pulsar with browser 2. Appended ".json" to URL so that browser goes to https://whimsy.apache.org/roster/committee/pulsar.json 3. Clicked "Save as..." and stored the JSON as ~/Downloads/pulsar.json 4. Ran this command in a bash shell: { echo -n "module.exports = " && cat ~/Downloads/pulsar.json | jq '{"pmc": [.roster| to_entries | sort_by(.key) | .[] | select(.value.role|startswith("PMC")) | {"name":.value.name, "apacheId": .key}], "committers": [.roster| to_entries | sort_by(.key) | .[] | select(.value.role=="Committer") | {"name":.value.name, "apacheId": .key}]}' } | perl -pe 's/$/;\n/ if eof' > data/team.js --- data/team.js | 28 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/data/team.js b/data/team.js index 0eaccb3c00d..2bb44da80c8 100644 --- a/data/team.js +++ b/data/team.js @@ -4,6 +4,10 @@ module.exports = { "name": "Sahaya Andrews", "apacheId": "andrews" }, +{ + "name": "Bo Cong", + "apacheId": "bogong" +}, { "name": "Brad McMillen", "apacheId": "bradtm" @@ -80,6 +84,10 @@ module.exports = { "name": "Matteo Merli", "apacheId": "mmerli" }, +{ + "name": "Nicolò Boschi", + "apacheId": "nicoloboschi" +}, { "name": "Nozomi Kurihara", "apacheId": "nkurihar" @@ -124,6 +132,10 @@ module.exports = { "name": "David Fisher", "apacheId": "wave" }, +{ + "name": "Yunze Xu", + "apacheId": "xyz" +}, { "name": "Yuki Shiga", "apacheId": "yushiga" @@ -151,12 +163,12 @@ module.exports = { "apacheId": "ayegorov" }, { - "name": "Daniel Blankensteiner", - "apacheId": "blankensteiner" + "name": "Baodi Shi", + "apacheId": "baodi" }, { - "name": "Bo Cong", - "apacheId": "bogong" + "name": "Daniel Blankensteiner", + "apacheId": "blankensteiner" }, { "name": "Christophe Bornet", @@ -210,10 +222,6 @@ module.exports = { "name": "Qiang Zhao", "apacheId": "mattisonchao" }, -{ - "name": "Nicolò Boschi", - "apacheId": "nicoloboschi" -}, { "name": "Neng Lu", "apacheId": "nlu90" @@ -254,10 +262,6 @@ module.exports = { "name": "Marvin Cai", "apacheId": "xxc" }, -{ - "name": "Yunze Xu", - "apacheId": "xyz" -}, { "name": "Yijie Shen", "apacheId": "yjshen"
[pulsar] branch branch-2.10 updated: [fix][broker] AbstractBatchedMetadataStore - use AlreadyClosedException instead of IllegalStateException (#19284)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new 83c6d7968f0 [fix][broker] AbstractBatchedMetadataStore - use AlreadyClosedException instead of IllegalStateException (#19284) 83c6d7968f0 is described below commit 83c6d7968f0b59d9770432b81949e606ace4cdf9 Author: Enrico Olivelli AuthorDate: Thu Jan 19 14:35:00 2023 +0100 [fix][broker] AbstractBatchedMetadataStore - use AlreadyClosedException instead of IllegalStateException (#19284) (cherry picked from commit d3e112e94ae251fb3ee7668061d336e0400f9a5c) --- .../pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index 616cac289ef..31fea0749a8 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.impl.AbstractMetadataStore; @@ -74,7 +75,8 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore public void close() throws Exception { if (enabled) { // Fail all the pending items -Exception ex = new IllegalStateException("Metadata store is getting closed"); +MetadataStoreException ex = +new MetadataStoreException.AlreadyClosedException("Metadata store is getting closed"); readOps.drain(op -> op.getFuture().completeExceptionally(ex)); writeOps.drain(op -> op.getFuture().completeExceptionally(ex));
[pulsar] branch branch-2.11 updated: [fix][broker] AbstractBatchedMetadataStore - use AlreadyClosedException instead of IllegalStateException (#19284)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 58e87d150ce [fix][broker] AbstractBatchedMetadataStore - use AlreadyClosedException instead of IllegalStateException (#19284) 58e87d150ce is described below commit 58e87d150ce5d986e34c52dfbde391affe1a4faf Author: Enrico Olivelli AuthorDate: Thu Jan 19 14:35:00 2023 +0100 [fix][broker] AbstractBatchedMetadataStore - use AlreadyClosedException instead of IllegalStateException (#19284) (cherry picked from commit d3e112e94ae251fb3ee7668061d336e0400f9a5c) --- .../pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index c9d245b8caf..f0a6dc426cc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataEventSynchronizer; import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.impl.AbstractMetadataStore; @@ -80,7 +81,8 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore public void close() throws Exception { if (enabled) { // Fail all the pending items -Exception ex = new IllegalStateException("Metadata store is getting closed"); +MetadataStoreException ex = +new MetadataStoreException.AlreadyClosedException("Metadata store is getting closed"); readOps.drain(op -> op.getFuture().completeExceptionally(ex)); writeOps.drain(op -> op.getFuture().completeExceptionally(ex));
[pulsar] branch master updated: [fix][broker] AbstractBatchedMetadataStore - use AlreadyClosedException instead of IllegalStateException (#19284)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d3e112e94ae [fix][broker] AbstractBatchedMetadataStore - use AlreadyClosedException instead of IllegalStateException (#19284) d3e112e94ae is described below commit d3e112e94ae251fb3ee7668061d336e0400f9a5c Author: Enrico Olivelli AuthorDate: Thu Jan 19 14:35:00 2023 +0100 [fix][broker] AbstractBatchedMetadataStore - use AlreadyClosedException instead of IllegalStateException (#19284) --- .../pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index 52cd81abc51..93aeb28c39b 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataEventSynchronizer; import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.impl.AbstractMetadataStore; @@ -84,7 +85,8 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore public void close() throws Exception { if (enabled) { // Fail all the pending items -Exception ex = new IllegalStateException("Metadata store is getting closed"); +MetadataStoreException ex = +new MetadataStoreException.AlreadyClosedException("Metadata store is getting closed"); readOps.drain(op -> op.getFuture().completeExceptionally(ex)); writeOps.drain(op -> op.getFuture().completeExceptionally(ex));
[pulsar] branch master updated: [improve][broker] Refactor update topic partitions endpoint. (#19166)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 4d7c7d0db52 [improve][broker] Refactor update topic partitions endpoint. (#19166) 4d7c7d0db52 is described below commit 4d7c7d0db5204f4ad52fd5858394cd9455f3837d Author: Qiang Zhao AuthorDate: Wed Jan 18 16:34:36 2023 +0800 [improve][broker] Refactor update topic partitions endpoint. (#19166) --- .../apache/pulsar/broker/admin/AdminResource.java | 15 - .../broker/admin/impl/PersistentTopicsBase.java| 348 + .../pulsar/broker/admin/v1/PersistentTopics.java | 38 +-- .../pulsar/broker/admin/v2/PersistentTopics.java | 35 +-- .../pulsar/broker/admin/PersistentTopicsTest.java | 2 +- 5 files changed, 186 insertions(+), 252 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 1632831b634..f00717377ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -165,21 +165,6 @@ public abstract class AdminResource extends PulsarWebResource { return FutureUtil.waitForAll(futures); } -protected CompletableFuture tryCreateExtendedPartitionsAsync(int oldNumPartitions, int numPartitions) { -if (!topicName.isPersistent()) { -return CompletableFuture.completedFuture(null); -} -if (numPartitions <= oldNumPartitions) { -return CompletableFuture.failedFuture(new RestException(Status.NOT_ACCEPTABLE, -"Number of new partitions must be greater than existing number of partitions")); -} -List> futures = new ArrayList<>(numPartitions - oldNumPartitions); -for (int i = oldNumPartitions; i < numPartitions; i++) { -futures.add(tryCreatePartitionAsync(i)); -} -return FutureUtil.waitForAll(futures); -} - private CompletableFuture tryCreatePartitionAsync(final int partition) { CompletableFuture result = new CompletableFuture<>(); getPulsarResources().getTopicResources().createPersistentTopicAsync(topicName.getPartition(partition)) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 50724142b0c..73f25914d7a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -23,6 +23,7 @@ import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInte import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectReader; import com.github.zafarkhaja.semver.Version; +import com.google.common.base.Throwables; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import java.io.IOException; @@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.annotation.Nonnull; import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Response; @@ -129,7 +131,6 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionPolicies; -import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicStats; @@ -408,89 +409,160 @@ public class PersistentTopicsBase extends AdminResource { * already exist and number of new partitions must be greater than existing number of partitions. Decrementing * number of partitions requires deletion of topic which is not supported. * - * Already created partitioned producers and consumers can't see newly created partitions and it requires to - * recreate them at application so, newly created producers and consumers can connect to newly added partitions as - * well. Therefore, it can violate partition ordering at producers until all producers are restarted at application. - * - * @param expectPartitions - * @param updateLocalTopicOnly - * @param authoritative - * @param force +
[pulsar] branch master updated: [fix][broker] Allow user lookup topic name with `-partition-` but no metadata (#19171)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new a93f30f836e [fix][broker] Allow user lookup topic name with `-partition-` but no metadata (#19171) a93f30f836e is described below commit a93f30f836e1766da9a8b0d9c16e58fc872a Author: Qiang Zhao AuthorDate: Thu Jan 12 17:21:21 2023 +0800 [fix][broker] Allow user lookup topic name with `-partition-` but no metadata (#19171) --- .../pulsar/broker/namespace/NamespaceService.java | 15 +++ .../service/persistent/PersistentTopicTest.java| 31 ++ 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 27df77e815d..84bce75bf5a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1165,11 +1165,16 @@ public class NamespaceService implements AutoCloseable { public CompletableFuture checkTopicExists(TopicName topic) { if (topic.isPersistent()) { if (topic.isPartitioned()) { -return pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() - .partitionedTopicExistsAsync(TopicName.get(topic.getPartitionedTopicName())) -.thenCompose(exists -> exists -? pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic) -: CompletableFuture.completedFuture(false)); +return pulsar.getBrokerService() + .fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName())) +.thenCompose(metadata -> { +// Allow creating the non-partitioned persistent topic that name includes `-partition-` +if (metadata.partitions == 0 +|| topic.getPartitionIndex() < metadata.partitions) { +return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); +} +return CompletableFuture.completedFuture(false); +}); } else { return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 19c5bd5c9aa..6f9c260c8ff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -422,4 +422,35 @@ public class PersistentTopicTest extends BrokerTestBase { } Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4); } + +@Test +public void testCompatibilityWithPartitionKeyword() throws PulsarAdminException, PulsarClientException { +final String topicName = "persistent://prop/ns-abc/testCompatibilityWithPartitionKeyword"; +TopicName topicNameEntity = TopicName.get(topicName); +String partition2 = topicNameEntity.getPartition(2).toString(); +// Create a non-partitioned topic with -partition- keyword +Producer producer = pulsarClient.newProducer() +.topic(partition2) +.create(); +List topics = admin.topics().getList("prop/ns-abc"); +// Close previous producer to simulate reconnect +producer.close(); +// Disable auto topic creation +conf.setAllowAutoTopicCreation(false); +// Check the topic exist in the list. +Assert.assertTrue(topics.contains(partition2)); +// Check this topic has no partition metadata. +Assert.assertThrows(PulsarAdminException.NotFoundException.class, +() -> admin.topics().getPartitionedTopicMetadata(topicName)); +// Reconnect to the broker and expect successful because the topic has existed in the broker. +producer = pulsarClient.newProducer() +.topic(partition2) +.create(); +producer.close(); +// Check the topic exist in the list again. +Assert.assertTrue(topics.contains(partition2)); +// Check this topic has no partition metadata aga
[pulsar] branch master updated: [fix][broker] Reject create non existent persistent partitions. (#19086)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 56a7b89be5f [fix][broker] Reject create non existent persistent partitions. (#19086) 56a7b89be5f is described below commit 56a7b89be5fecd41fc200379c96b15e3c0ace7c3 Author: Qiang Zhao AuthorDate: Mon Jan 9 19:34:19 2023 +0800 [fix][broker] Reject create non existent persistent partitions. (#19086) --- .../broker/admin/impl/PersistentTopicsBase.java| 266 ++--- .../pulsar/broker/service/BrokerService.java | 19 +- .../apache/pulsar/broker/admin/AdminApi2Test.java | 12 +- .../nonpersistent/NonPersistentTopicTest.java | 23 ++ .../service/persistent/PersistentTopicTest.java| 20 ++ 5 files changed, 193 insertions(+), 147 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 3fb551967b9..81c9638632e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -411,67 +411,85 @@ public class PersistentTopicsBase extends AdminResource { * recreate them at application so, newly created producers and consumers can connect to newly added partitions as * well. Therefore, it can violate partition ordering at producers until all producers are restarted at application. * - * @param numPartitions + * @param expectPartitions * @param updateLocalTopicOnly * @param authoritative * @param force */ -protected CompletableFuture internalUpdatePartitionedTopicAsync(int numPartitions, +protected CompletableFuture internalUpdatePartitionedTopicAsync(int expectPartitions, boolean updateLocalTopicOnly, boolean authoritative, boolean force) { -if (numPartitions <= 0) { -return FutureUtil.failedFuture(new RestException(Status.NOT_ACCEPTABLE, -"Number of partitions should be more than 0")); +if (expectPartitions <= 0) { +return FutureUtil.failedFuture( +new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0")); } return validateTopicOwnershipAsync(topicName, authoritative) -.thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION, -PolicyOperation.WRITE)) +.thenCompose(__ -> +validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION, PolicyOperation.WRITE)) .thenCompose(__ -> { if (!updateLocalTopicOnly && !force) { -return validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions); +return validatePartitionTopicUpdateAsync(topicName.getLocalName(), expectPartitions); } else { return CompletableFuture.completedFuture(null); } }).thenCompose(__ -> pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)) .thenCompose(topicMetadata -> { final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); -if (maxPartitions > 0 && numPartitions > maxPartitions) { +if (maxPartitions > 0 && expectPartitions > maxPartitions) { throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions); } -// Only do the validation if it's the first hop. -if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) { -return getNamespaceReplicatedClustersAsync(topicName.getNamespaceObject()) -.thenApply(clusters -> { -if (!clusters.contains(pulsar().getConfig().getClusterName())) { -log.error("[{}] local cluster is not part of replicated cluster for namespace {}", -clientAppId(), topicName); -throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate" -+ " cluster list"); -
[pulsar] branch master updated: [fix][broker] Topic could be in fenced state forever if deletion fails (#19129)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new a6516a8d198 [fix][broker] Topic could be in fenced state forever if deletion fails (#19129) a6516a8d198 is described below commit a6516a8d19896316907c9904d7ed823e9282aef2 Author: Nicolò Boschi AuthorDate: Wed Jan 4 16:25:36 2023 +0100 [fix][broker] Topic could be in fenced state forever if deletion fails (#19129) --- .../broker/service/persistent/PersistentTopic.java | 7 ++- .../pulsar/broker/service/PersistentTopicTest.java | 65 +- .../metadata/impl/FaultInjectionMetadataStore.java | 20 ++- 3 files changed, 73 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 42f73a70328..04c95126490 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1268,9 +1268,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal topic, exception.getMessage()); deleteLedgerComplete(ctx); } else { -unfenceTopicToResume(); log.error("[{}] Error deleting topic", topic, exception); +unfenceTopicToResume(); deleteFuture.completeExceptionally( new PersistenceException(exception)); } @@ -1289,6 +1289,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal }); return deleteFuture; +}).whenComplete((value, ex) -> { +if (ex != null) { +log.error("[{}] Error deleting topic", topic, ex); +unfenceTopicToResume(); +} }); } finally { lock.writeLock().unlock(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index adb31f2d1a5..8e3c3c73054 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -21,7 +21,6 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; -import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -131,21 +130,22 @@ import org.apache.pulsar.common.api.proto.ProducerAccessMode; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicContext; import org.apache.pulsar.compaction.Compactor; -import org.apache.pulsar.metadata.api.MetadataStore; -import org.apache.pulsar.metadata.impl.Z
[pulsar] branch master updated: [improve] Introduce the sync() API to ensure consistency on reads during critical metadata operation paths (#18518)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 492a9c3e44b [improve] Introduce the sync() API to ensure consistency on reads during critical metadata operation paths (#18518) 492a9c3e44b is described below commit 492a9c3e44bef2334a77164afc8b033cc8f8d82f Author: Enrico Olivelli AuthorDate: Tue Dec 27 11:55:17 2022 +0100 [improve] Introduce the sync() API to ensure consistency on reads during critical metadata operation paths (#18518) --- .../apache/pulsar/broker/resources/BaseResources.java | 7 +++ .../pulsar/broker/resources/NamespaceResources.java| 16 +--- .../org/apache/pulsar/metadata/api/MetadataStore.java | 11 +++ .../apache/pulsar/metadata/impl/ZKMetadataStore.java | 18 ++ 4 files changed, 49 insertions(+), 3 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index e705581a9d5..42add4271f6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -93,6 +93,13 @@ public class BaseResources { return cache.get(path); } +protected CompletableFuture> refreshAndGetAsync(String path) { +return store.sync(path).thenCompose(___ -> { +cache.invalidate(path); +return cache.get(path); +}); +} + protected void set(String path, Function modifyFunction) throws MetadataStoreException { try { setAsync(path, modifyFunction).get(operationTimeoutSec, TimeUnit.SECONDS); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 899cf01bc4b..dd1c428380b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -258,8 +258,18 @@ public class NamespaceResources extends BaseResources { } public CompletableFuture> getPartitionedTopicMetadataAsync(TopicName tn) { -return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), -tn.getEncodedLocalName())); +return getPartitionedTopicMetadataAsync(tn, false); +} + +public CompletableFuture> getPartitionedTopicMetadataAsync(TopicName tn, + boolean refresh) { +if (refresh) { +return refreshAndGetAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), +tn.getEncodedLocalName())); +} else { +return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), +tn.getEncodedLocalName())); +} } public boolean partitionedTopicExists(TopicName tn) throws MetadataStoreException { @@ -317,7 +327,7 @@ public class NamespaceResources extends BaseResources { if (tn.isPartitioned()) { tn = TopicName.get(tn.getPartitionedTopicName()); } -return getPartitionedTopicMetadataAsync(tn) +return getPartitionedTopicMetadataAsync(tn, true) .thenApply(mdOpt -> mdOpt.map(partitionedTopicMetadata -> partitionedTopicMetadata.deleted) .orElse(false)); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java index 66c9f44d342..33942c19520 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java @@ -50,6 +50,17 @@ public interface MetadataStore extends AutoCloseable { */ CompletableFuture> get(String path); + +/** + * Ensure that the next value read from the local client will be up-to-date with the latest version of the value + * as it can be seen by all the other clients. + * @param path + * @return a handle to the operation + */ +default CompletableFuture sync(String path) { +return CompletableFuture.completedFuture(null); +} + /** * Return all the nodes (lexicographically sorted) that are children
[pulsar] branch master updated (3011946a5c3 -> b1f9e351fa4)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 3011946a5c3 [fix][sec] Upgrade scala-library to get rid of CVE-2022-36944 (#18021) add b1f9e351fa4 [improve][cli] pulsar-perf: check for invalid CLI options (#18889) No new revisions were added by this update. Summary of changes: .../java/org/apache/pulsar/testclient/PerformanceConsumer.java | 9 + .../java/org/apache/pulsar/testclient/PerformanceProducer.java | 9 + .../java/org/apache/pulsar/testclient/PerformanceReader.java | 9 + 3 files changed, 27 insertions(+)
[pulsar-site] branch main updated: Add missing cbornet to committers (#333)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new 0e0048eacfe Add missing cbornet to committers (#333) 0e0048eacfe is described below commit 0e0048eacfe8c278ae4a56ab659dfcea2336d4d1 Author: Christophe Bornet AuthorDate: Wed Dec 14 15:33:05 2022 +0100 Add missing cbornet to committers (#333) --- site2/website-next/blog/2022-12-01-pulsar-summit-asia-2022-recap.md | 3 ++- site2/website-next/data/team.js | 4 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/site2/website-next/blog/2022-12-01-pulsar-summit-asia-2022-recap.md b/site2/website-next/blog/2022-12-01-pulsar-summit-asia-2022-recap.md index 035d4ad70fd..9ba7558f253 100644 --- a/site2/website-next/blog/2022-12-01-pulsar-summit-asia-2022-recap.md +++ b/site2/website-next/blog/2022-12-01-pulsar-summit-asia-2022-recap.md @@ -65,7 +65,7 @@ Apache Pulsar has been adopted by organizations and users across the globe since Figure 1. Pulsar GitHub repo contributors -So far this year, we have welcomed 15 new Apache Pulsar [Committers](https://www.apache.org/foundation/how-it-works.html#committers) to the Pulsar family. They have made continuous contributions to the Pulsar community and as Pulsar Committers, they now have write access to the Pulsar repository. They are: +So far this year, we have welcomed 16 new Apache Pulsar [Committers](https://www.apache.org/foundation/how-it-works.html#committers) to the Pulsar family. They have made continuous contributions to the Pulsar community and as Pulsar Committers, they now have write access to the Pulsar repository. They are: * [@RobertIndie](https://github.com/RobertIndie) * [@yuruguo](https://github.com/yuruguo) @@ -82,6 +82,7 @@ So far this year, we have welcomed 15 new Apache Pulsar [Committers](https://www * [@dlg99](https://github.com/dlg99) * [@nicoloboschi](https://github.com/nicoloboschi) * [@liudezhi2098](https://github.com/liudezhi2098) +* [@cbornet](https://github.com/cbornet) We also have 4 new members joining the Apache Pulsar [Project Management Committee (PMC)](https://www.apache.org/foundation/how-it-works.html#pmc-members) for their merit for the evolution of the project. They are: diff --git a/site2/website-next/data/team.js b/site2/website-next/data/team.js index a54eae9773a..25c8ca36777 100644 --- a/site2/website-next/data/team.js +++ b/site2/website-next/data/team.js @@ -170,6 +170,10 @@ module.exports = { apacheId: "cckellogg", org: "Splunk", }, +{ + name: "Christophe Bornet", + apacheId: "cbornet" +}, { name: "davidkj", apacheId: "davekj",
[pulsar] branch master updated: [improve][broker] The interval of scheduled task should be greater than 0 (#18728)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 31daf514186 [improve][broker] The interval of scheduled task should be greater than 0 (#18728) 31daf514186 is described below commit 31daf514186dd433e111f78a3b042c0a312e711d Author: Ruguo Yu AuthorDate: Mon Dec 5 01:07:05 2022 +0800 [improve][broker] The interval of scheduled task should be greater than 0 (#18728) --- .../src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 5 + 1 file changed, 5 insertions(+) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 94e0900e505..1b6bdc9986d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -554,6 +554,7 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_POLICIES, +minValue = 1, doc = "How often to check for topics that have reached the quota." + " It only takes effects when `backlogQuotaCheckEnabled` is true" ) @@ -616,6 +617,7 @@ public class ServiceConfiguration implements PulsarConfiguration { private boolean brokerDeleteInactivePartitionedTopicMetadataEnabled = false; @FieldContext( category = CATEGORY_POLICIES, +minValue = 1, dynamic = true, doc = "How often to check for inactive topics" ) @@ -665,6 +667,7 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_POLICIES, +minValue = 1, doc = "How frequently to proactively check and purge expired messages" ) private int messageExpiryCheckIntervalInMinutes = 5; @@ -768,6 +771,7 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_POLICIES, +minValue = 1, doc = "Time of inactivity after which the broker will discard the deduplication information" + " relative to a disconnected producer. Default is 6 hours.") private int brokerDeduplicationProducerInactivityTimeoutMinutes = 360; @@ -2706,6 +2710,7 @@ public class ServiceConfiguration implements PulsarConfiguration { private boolean exposePublisherStats = true; @FieldContext( category = CATEGORY_METRICS, +minValue = 1, doc = "Stats update frequency in seconds" ) private int statsUpdateFrequencyInSecs = 60;
[pulsar] branch master updated (8d3df45df3b -> 7b06489b764)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 8d3df45df3b [improve][misc] Update caffeine from 2.9.1 to 3.1.2 (#18647) add 7b06489b764 [fix][broker] namespace not found will cause request timeout (#18512) No new revisions were added by this update. Summary of changes: .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 9 - .../org/apache/pulsar/broker/admin/PersistentTopicsTest.java | 11 +++ 2 files changed, 19 insertions(+), 1 deletion(-)
[pulsar] branch master updated (f89a2fac959 -> a0a1c357d16)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from f89a2fac959 [fix][proxy] Remove duplicate test dependency org.awaitility:awaitility in pulsar-proxy (#18606) add a0a1c357d16 [fix][misc] Fix NoClassDefFoundError: io/netty/incubator/channel/uring/IOUringEventLoopGroup (#18609) No new revisions were added by this update. Summary of changes: pom.xml | 5 + pulsar-common/pom.xml | 5 + 2 files changed, 10 insertions(+)
[pulsar] branch master updated (598ca5dacd0 -> fe5a8ceae6d)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 598ca5dacd0 [improve][doc] add notes to wireshark scripts README (#18563) add fe5a8ceae6d [improve][client] allow customize subscription name for TableView (#18596) No new revisions were added by this update. Summary of changes: .../apache/pulsar/client/api/TableViewBuilder.java | 9 .../pulsar/client/impl/TableViewBuilderImpl.java | 7 ++ .../client/impl/TableViewConfigurationData.java| 2 ++ .../apache/pulsar/client/impl/TableViewImpl.java | 3 ++- ...st.java => TableViewConfigurationDataTest.java} | 27 ++ site2/docs/client-libraries-java.md| 1 + 6 files changed, 39 insertions(+), 10 deletions(-) copy pulsar-client/src/test/java/org/apache/pulsar/client/impl/{AuthenticationUtilTest.java => TableViewConfigurationDataTest.java} (50%)
[pulsar] branch branch-2.11 updated: [improve][client][branch-2.11]PIP-189: No batching if only one message in batch (#18548)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 708012096eb [improve][client][branch-2.11]PIP-189: No batching if only one message in batch (#18548) 708012096eb is described below commit 708012096eb781c645697a6f2b74bfc2574ff66e Author: houxiaoyu AuthorDate: Mon Nov 21 16:25:49 2022 +0800 [improve][client][branch-2.11]PIP-189: No batching if only one message in batch (#18548) --- .../RGUsageMTAggrWaitForAllMsgsTest.java | 4 +- .../pulsar/broker/service/BatchMessageTest.java| 37 +++- .../broker/service/BrokerEntryMetadataE2ETest.java | 67 - .../pulsar/client/api/ClientDeduplicationTest.java | 7 +- .../apache/pulsar/client/api/TopicReaderTest.java | 6 +- .../client/cli/PulsarClientToolForceBatchNum.java | 104 + .../pulsar/client/cli/PulsarClientToolTest.java| 9 +- .../client/impl/BatchMessageContainerImpl.java | 38 +++- .../apache/pulsar/client/impl/ProducerImpl.java| 3 + 9 files changed, 240 insertions(+), 35 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java index 27f9e905262..1acc5ad0039 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java @@ -769,8 +769,8 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase { Assert.assertNotEquals(ninthPercentileValue, 0); } -// Empirically, there appears to be a 42-byte overhead for metadata, imposed by Pulsar runtime. -private static final int PER_MESSAGE_METADATA_OHEAD = 42; +// Empirically, there appears to be a 31-byte overhead for metadata, imposed by Pulsar runtime. +private static final int PER_MESSAGE_METADATA_OHEAD = 31; private static final int PUBLISH_INTERVAL_SECS = 10; private static final int NUM_PRODUCERS = 4; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 0d18e243884..e9c5032063f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -56,6 +56,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; @@ -865,6 +866,37 @@ public class BatchMessageTest extends BrokerTestBase { producer.close(); } +@Test(dataProvider = "containerBuilder") +public void testBatchSendOneMessage(BatcherBuilder builder) throws Exception { +final String topicName = "persistent://prop/ns-abc/testBatchSendOneMessage-" + UUID.randomUUID(); +final String subscriptionName = "sub-1"; + +Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) +.subscriptionType(SubscriptionType.Shared).subscribe(); + +Producer producer = pulsarClient.newProducer().topic(topicName) +.batchingMaxPublishDelay(1, TimeUnit.SECONDS).batchingMaxMessages(10).enableBatching(true) +.batcherBuilder(builder) +.create(); +String msg = "my-message"; +MessageId messageId = producer.newMessage().value(msg.getBytes()).property("key1", "value1").send(); + +Assert.assertTrue(messageId instanceof MessageIdImpl); +Assert.assertFalse(messageId instanceof BatchMessageIdImpl); + +Message received = consumer.receive(); +assertEquals(received.getSequenceId(), 0); +consumer.acknowledge(received); + +Assert.assertEquals(new String(received.getData()), msg); +Assert.assertFalse(received.getProperties().isEmpty()); +Assert.assertEquals(received.getProperties().get("key1"), "value1"); +Assert.assertFalse(received.getMessageId() instanceof BatchMessageIdImpl); + +producer.close(); +consumer.close(); +} + @Test(dataProvider = "containerBuilder") public void testRetrieveSequenceIdGenerated(BatcherBuilder builder) throws
[pulsar] branch branch-2.9 updated: [fix][offload] Fix memory leak while Offloading ledgers (#18500)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.9 by this push: new 31c0a2ea97a [fix][offload] Fix memory leak while Offloading ledgers (#18500) 31c0a2ea97a is described below commit 31c0a2ea97a494b48363fd3e51c854d7a7626ad3 Author: Enrico Olivelli AuthorDate: Wed Nov 16 13:44:43 2022 +0100 [fix][offload] Fix memory leak while Offloading ledgers (#18500) (cherry picked from commit 6ff7d459697c2496de29ef077eb0f574632ebe6d) --- .../jcloud/impl/BlockAwareSegmentInputStreamImpl.java | 2 +- .../jcloud/impl/BlockAwareSegmentInputStreamTest.java | 15 +++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index b69f9f5e785..e5dbcb64347 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -261,7 +261,7 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre // And through debug, writeBlobStore.uploadMultipartPart in the offload method also will trigger // the close method. // So we add the close variable to avoid release paddingBuf twice. -if (!close.compareAndSet(false, true)) { +if (close.compareAndSet(false, true)) { super.close(); dataBlockHeaderStream.close(); if (!entriesByteBuf.isEmpty()) { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index 0cd4bbd70a9..fff1ce8b7aa 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -30,6 +30,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; @@ -798,4 +799,18 @@ public class BlockAwareSegmentInputStreamTest { inputStream.close(); } + +@Test +public void testCloseReleaseResources() throws Exception { +ReadHandle readHandle = new MockReadHandle(1, 10, 10); + +BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, 1024); +inputStream.read(); +Field field = BlockAwareSegmentInputStreamImpl.class.getDeclaredField("paddingBuf"); +field.setAccessible(true); +ByteBuf paddingBuf = (ByteBuf) field.get(inputStream); +assertEquals(1, paddingBuf.refCnt()); +inputStream.close(); +assertEquals(0, paddingBuf.refCnt()); +} }
[pulsar] branch branch-2.10 updated: [fix][offload] Fix memory leak while Offloading ledgers (#18500)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new fcdf50e125c [fix][offload] Fix memory leak while Offloading ledgers (#18500) fcdf50e125c is described below commit fcdf50e125c69c86cdd86bdd85e1bcfa133eb8b3 Author: Enrico Olivelli AuthorDate: Wed Nov 16 13:44:43 2022 +0100 [fix][offload] Fix memory leak while Offloading ledgers (#18500) (cherry picked from commit 6ff7d459697c2496de29ef077eb0f574632ebe6d) --- .../jcloud/impl/BlockAwareSegmentInputStreamImpl.java | 2 +- .../jcloud/impl/BlockAwareSegmentInputStreamTest.java | 15 +++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index e4f935c1113..a3b89539d31 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -260,7 +260,7 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre // And through debug, writeBlobStore.uploadMultipartPart in the offload method also will trigger // the close method. // So we add the close variable to avoid release paddingBuf twice. -if (!close.compareAndSet(false, true)) { +if (close.compareAndSet(false, true)) { super.close(); dataBlockHeaderStream.close(); if (!entriesByteBuf.isEmpty()) { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index 0cd4bbd70a9..fff1ce8b7aa 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -30,6 +30,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; @@ -798,4 +799,18 @@ public class BlockAwareSegmentInputStreamTest { inputStream.close(); } + +@Test +public void testCloseReleaseResources() throws Exception { +ReadHandle readHandle = new MockReadHandle(1, 10, 10); + +BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, 1024); +inputStream.read(); +Field field = BlockAwareSegmentInputStreamImpl.class.getDeclaredField("paddingBuf"); +field.setAccessible(true); +ByteBuf paddingBuf = (ByteBuf) field.get(inputStream); +assertEquals(1, paddingBuf.refCnt()); +inputStream.close(); +assertEquals(0, paddingBuf.refCnt()); +} }
[pulsar] branch branch-2.11 updated: [fix][offload] Fix memory leak while Offloading ledgers (#18500)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 7ad0f5c8d68 [fix][offload] Fix memory leak while Offloading ledgers (#18500) 7ad0f5c8d68 is described below commit 7ad0f5c8d68f9fd5022895eca11c12a0a6d8b609 Author: Enrico Olivelli AuthorDate: Wed Nov 16 13:44:43 2022 +0100 [fix][offload] Fix memory leak while Offloading ledgers (#18500) (cherry picked from commit 6ff7d459697c2496de29ef077eb0f574632ebe6d) --- .../jcloud/impl/BlockAwareSegmentInputStreamImpl.java | 2 +- .../jcloud/impl/BlockAwareSegmentInputStreamTest.java | 15 +++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index 52d069b0f99..3c92051c95d 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -278,7 +278,7 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre // And through debug, writeBlobStore.uploadMultipartPart in the offload method also will trigger // the close method. // So we add the close variable to avoid release paddingBuf twice. -if (!close.compareAndSet(false, true)) { +if (close.compareAndSet(false, true)) { super.close(); dataBlockHeaderStream.close(); if (!entriesByteBuf.isEmpty()) { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index 0cd4bbd70a9..fff1ce8b7aa 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -30,6 +30,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; @@ -798,4 +799,18 @@ public class BlockAwareSegmentInputStreamTest { inputStream.close(); } + +@Test +public void testCloseReleaseResources() throws Exception { +ReadHandle readHandle = new MockReadHandle(1, 10, 10); + +BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, 1024); +inputStream.read(); +Field field = BlockAwareSegmentInputStreamImpl.class.getDeclaredField("paddingBuf"); +field.setAccessible(true); +ByteBuf paddingBuf = (ByteBuf) field.get(inputStream); +assertEquals(1, paddingBuf.refCnt()); +inputStream.close(); +assertEquals(0, paddingBuf.refCnt()); +} }
[pulsar] branch master updated: [fix][offload] Fix memory leak while Offloading ledgers (#18500)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 6ff7d459697 [fix][offload] Fix memory leak while Offloading ledgers (#18500) 6ff7d459697 is described below commit 6ff7d459697c2496de29ef077eb0f574632ebe6d Author: Enrico Olivelli AuthorDate: Wed Nov 16 13:44:43 2022 +0100 [fix][offload] Fix memory leak while Offloading ledgers (#18500) --- .../jcloud/impl/BlockAwareSegmentInputStreamImpl.java | 2 +- .../jcloud/impl/BlockAwareSegmentInputStreamTest.java | 15 +++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index 5f778cab51f..d07fbdb9247 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -278,7 +278,7 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre // And through debug, writeBlobStore.uploadMultipartPart in the offload method also will trigger // the close method. // So we add the close variable to avoid release paddingBuf twice. -if (!close.compareAndSet(false, true)) { +if (close.compareAndSet(false, true)) { super.close(); dataBlockHeaderStream.close(); if (!entriesByteBuf.isEmpty()) { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index 47989e03605..5ca4d6da20b 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -30,6 +30,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; @@ -798,4 +799,18 @@ public class BlockAwareSegmentInputStreamTest { inputStream.close(); } + +@Test +public void testCloseReleaseResources() throws Exception { +ReadHandle readHandle = new MockReadHandle(1, 10, 10); + +BlockAwareSegmentInputStreamImpl inputStream = new BlockAwareSegmentInputStreamImpl(readHandle, 0, 1024); +inputStream.read(); +Field field = BlockAwareSegmentInputStreamImpl.class.getDeclaredField("paddingBuf"); +field.setAccessible(true); +ByteBuf paddingBuf = (ByteBuf) field.get(inputStream); +assertEquals(1, paddingBuf.refCnt()); +inputStream.close(); +assertEquals(0, paddingBuf.refCnt()); +} }
[pulsar] branch master updated (8246e3bdd21 -> 46bb63d6bd3)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 8246e3bdd21 [fix][client] Fixes batch_size not checked in MessageId#fromByteArrayWithTopic (#18405) add 46bb63d6bd3 [fix][test] Enable Cassandra sink tests (#18430) No new revisions were added by this update. Summary of changes: .../pulsar/io/cassandra/CassandraAbstractSink.java| 2 +- .../integration/io/sinks/CassandraSinkTester.java | 19 +++ .../integration/io/sinks/PulsarIOSinkRunner.java | 4 ++-- .../tests/integration/io/sinks/PulsarSinksTest.java | 4 ++-- 4 files changed, 16 insertions(+), 13 deletions(-)
[pulsar] branch master updated (67a3de716d5 -> 3fdbc9fca6b)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 67a3de716d5 [fix][cli] Fix CLI client produce don't able to use multiple -m send multiple messages (#18238) add 3fdbc9fca6b [fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. (#18193) No new revisions were added by this update. Summary of changes: .../broker/resources/NamespaceResources.java | 74 .../pulsar/broker/resources/TopicResources.java| 2 + .../pulsar/broker/admin/impl/NamespacesBase.java | 6 +- .../broker/admin/impl/PersistentTopicsBase.java| 152 .../pulsar/broker/lookup/TopicLookupBase.java | 9 +- .../pulsar/broker/namespace/NamespaceService.java | 10 +- .../pulsar/broker/service/BrokerService.java | 86 + .../apache/pulsar/broker/service/ServerCnx.java| 8 +- .../service/nonpersistent/NonPersistentTopic.java | 4 +- .../broker/service/persistent/PersistentTopic.java | 197 +++-- .../broker/admin/AdminApiMultiBrokersTest.java | 87 + .../apache/pulsar/broker/admin/NamespacesTest.java | 1 + .../broker/lookup/http/HttpTopicLookupv2Test.java | 2 + .../broker/service/BacklogQuotaManagerTest.java| 18 +- .../pulsar/broker/service/BrokerTestBase.java | 3 +- .../broker/service/ExclusiveProducerTest.java | 2 - .../pulsar/broker/service/PersistentTopicTest.java | 9 +- .../pulsar/client/impl/TopicsConsumerImplTest.java | 28 --- .../common/partition/PartitionedTopicMetadata.java | 1 + .../integration/topics/TestTopicDeletion.java | 183 +++ 20 files changed, 632 insertions(+), 250 deletions(-) create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java
[pulsar-client-go] branch master updated: Fix marshalling time.Time{} to uint64 (#865)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 0b0720a Fix marshalling time.Time{} to uint64 (#865) 0b0720a is described below commit 0b0720ab73d7f6378b8b6ac37acbafe547c268c8 Author: Ayman Khalil AuthorDate: Tue Oct 25 01:25:14 2022 -0700 Fix marshalling time.Time{} to uint64 (#865) --- pulsar/internal/utils.go | 4 pulsar/internal/utils_test.go | 5 + 2 files changed, 9 insertions(+) diff --git a/pulsar/internal/utils.go b/pulsar/internal/utils.go index 0763ced..e4a4994 100644 --- a/pulsar/internal/utils.go +++ b/pulsar/internal/utils.go @@ -28,6 +28,10 @@ import ( // TimestampMillis return a time unix nano. func TimestampMillis(t time.Time) uint64 { + // calling UnixNano on the zero Time is undefined + if t.IsZero() { + return 0 + } return uint64(t.UnixNano()) / uint64(time.Millisecond) } diff --git a/pulsar/internal/utils_test.go b/pulsar/internal/utils_test.go index 65babd0..90d6f2d 100644 --- a/pulsar/internal/utils_test.go +++ b/pulsar/internal/utils_test.go @@ -55,3 +55,8 @@ func TestParseRelativeTimeInSeconds(t *testing.T) { assert.Nil(t, err) assert.Equal(t, time.Duration(10)*time.Hour*24*7*365, timestamp) } + +func TestTimestampMillis(t *testing.T) { + assert.Equal(t, uint64(0), TimestampMillis(time.Time{})) + assert.Equal(t, uint64(7), TimestampMillis(time.Unix(0, 7*int64(time.Millisecond +}
[pulsar] branch master updated (fa328a42d57 -> 08540329c2c)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from fa328a42d57 When accumulating acks, update the batch index in batchDeletedIndexes and check whether it is greater than the batch index of the previous ack (#18042) add 08540329c2c [Fix][Tiered Storage] Eagerly Delete Offloaded Segments On Topic Deletion (#17915) No new revisions were added by this update. Summary of changes: .../bookkeeper/mledger/ManagedLedgerFactory.java | 20 ++ .../mledger/impl/ManagedLedgerFactoryImpl.java | 112 +-- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 121 +++- .../bookkeeper/mledger/offload/OffloadUtils.java | 28 +++ .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 8 +- .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 61 ++ .../pulsar/broker/service/BrokerService.java | 32 ++-- .../broker/service/persistent/PersistentTopic.java | 87 + .../pulsar/broker/service/PersistentTopicTest.java | 2 + .../tests/integration/offload/TestBaseOffload.java | 210 +++-- .../integration/offload/TestFileSystemOffload.java | 5 +- .../integration/offload/TestOffloadDeletionFS.java | 144 ++ .../tests/integration/offload/TestS3Offload.java | 2 +- .../offload/TestUniversalConfigurations.java | 2 +- .../suites/PulsarTieredStorageTestSuite.java | 4 +- 15 files changed, 684 insertions(+), 154 deletions(-) create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java
[pulsar] branch master updated: Make BookieId work with PulsarRegistrationDriver (second take) (#17922)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 09f5eeb0c94 Make BookieId work with PulsarRegistrationDriver (second take) (#17922) 09f5eeb0c94 is described below commit 09f5eeb0c946ee890483e087f802e30a2a2b60ab Author: Enrico Olivelli AuthorDate: Sat Oct 15 18:24:20 2022 +0200 Make BookieId work with PulsarRegistrationDriver (second take) (#17922) * Make BookieId work with PulsarRegistrationDriver (#17762) * Make BookieId work with PulsarRegistrationDriver * Switch to MetadataCache * checkstyle * Do not execute lookup on MetadataCache in the getBookieServiceInfo caller thread --- .../bookkeeper/BookieServiceInfoSerde.java | 55 +- .../bookkeeper/PulsarRegistrationClient.java | 119 - .../bookkeeper/PulsarRegistrationClientTest.java | 62 +++ 3 files changed, 230 insertions(+), 6 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java index 78a33179e76..b7e3024b637 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.discover.BookieServiceInfo; +import org.apache.bookkeeper.discover.BookieServiceInfoUtils; import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.Stat; @@ -63,7 +64,57 @@ public class BookieServiceInfoSerde implements MetadataSerde } @Override -public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException { -return null; +public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException { +// see https://github.com/apache/bookkeeper/blob/ +// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ +// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311 +String bookieId = extractBookiedIdFromPath(path); +if (bookieServiceInfo == null || bookieServiceInfo.length == 0) { +return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId); +} + +BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo); +BookieServiceInfo bsi = new BookieServiceInfo(); +List endpoints = builder.getEndpointsList().stream() +.map(e -> { +BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); +endpoint.setId(e.getId()); +endpoint.setPort(e.getPort()); +endpoint.setHost(e.getHost()); +endpoint.setProtocol(e.getProtocol()); +endpoint.setAuth(e.getAuthList()); +endpoint.setExtensions(e.getExtensionsList()); +return endpoint; +}) +.collect(Collectors.toList()); + +bsi.setEndpoints(endpoints); +bsi.setProperties(builder.getPropertiesMap()); + +return bsi; + +} + +/** + * Extract the BookieId + * The path should look like /ledgers/available/bookieId + * or /ledgers/available/readonly/bookieId. + * But the prefix depends on the configuration. + * @param path + * @return the bookieId + */ +private static String extractBookiedIdFromPath(String path) throws IOException { +// https://github.com/apache/bookkeeper/blob/ +// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ +// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258 +if (path == null) { +path = ""; +} +int last = path.lastIndexOf("/"); +if (last >= 0) { +return path.substring(last + 1); +} else { +throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node"); +} } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index 52b50e3ea4b..f314c0efaf0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +
[pulsar] branch master updated: [bug][broker] fix memory leak in case of error conditions in PendingReadsManager (#17995)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7ac8e3e003b [bug][broker] fix memory leak in case of error conditions in PendingReadsManager (#17995) 7ac8e3e003b is described below commit 7ac8e3e003bde0d32049bf0fc09e25cfed4ded58 Author: Enrico Olivelli AuthorDate: Wed Oct 12 10:58:05 2022 +0200 [bug][broker] fix memory leak in case of error conditions in PendingReadsManager (#17995) Co-authored-by: Enrico Olivelli --- .../apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java| 5 + 1 file changed, 5 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java index 4c374d8ace6..30216871e76 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java @@ -356,6 +356,8 @@ public class PendingReadsManager { @Override public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx3) { +entries.forEach(Entry::release); + entriesFromLeft.forEach(Entry::release); callback.readEntriesFailed(exception, ctx); } }; @@ -366,6 +368,7 @@ public class PendingReadsManager { @Override public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx4) { +entries.forEach(Entry::release); callback.readEntriesFailed(exception, ctx); } }; @@ -388,6 +391,7 @@ public class PendingReadsManager { @Override public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx6) { +entries.forEach(Entry::release); callback.readEntriesFailed(exception, ctx); } }; @@ -410,6 +414,7 @@ public class PendingReadsManager { @Override public void readEntriesFailed(ManagedLedgerException exception, Object dummyCtx8) { +entries.forEach(Entry::release); callback.readEntriesFailed(exception, ctx); } };
[pulsar] branch master updated (88e357a6937 -> 9a8b68a77e7)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 88e357a6937 [fix][ci] Use GitHub Actions versions that aren't deprecated (#18002) add 9a8b68a77e7 [improve][CI] Generate unit test code coverage reports and upload to Codecov (#17382) No new revisions were added by this update. Summary of changes: .github/workflows/pulsar-ci.yaml | 6 + build/run_unit_group.sh| 4 +-- .../templates/pulsar.proxy.service => codecov.yml | 29 ++ pom.xml| 4 ++- 4 files changed, 29 insertions(+), 14 deletions(-) copy deployment/terraform-ansible/templates/pulsar.proxy.service => codecov.yml (70%)
[pulsar] branch master updated: [improve][connector] JDBC sink: allow any jdbc driver (#17951)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5c31c9af041 [improve][connector] JDBC sink: allow any jdbc driver (#17951) 5c31c9af041 is described below commit 5c31c9af04176f81d1616dde64567d0e0ce1f2c8 Author: Nicolò Boschi AuthorDate: Tue Oct 11 09:52:59 2022 +0200 [improve][connector] JDBC sink: allow any jdbc driver (#17951) --- .../apache/pulsar/io/jdbc/JdbcAbstractSink.java| 2 - .../org/apache/pulsar/io/jdbc/JdbcDriverType.java | 62 -- .../java/org/apache/pulsar/io/jdbc/JdbcUtils.java | 10 3 files changed, 74 deletions(-) diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index 1d12909d5e2..3f7f62e3abb 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -89,8 +89,6 @@ public abstract class JdbcAbstractSink implements Sink { properties.setProperty("password", password); } - - Class.forName(JdbcUtils.getDriverClassName(jdbcSinkConfig.getJdbcUrl())); connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), properties); connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions()); log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit()); diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcDriverType.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcDriverType.java deleted file mode 100644 index 1a4710622b7..000 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcDriverType.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.jdbc; - -import lombok.Getter; - -@Getter -public enum JdbcDriverType { - -CLICKHOUSE("jdbc:clickhouse:", "ru.yandex.clickhouse.ClickHouseDriver"), -DB2("jdbc:db2:", "com.ibm.db2.jcc.DB2Driver"), -DERBY_CLIENT("jdbc:derby://", "org.apache.derby.jdbc.ClientDriver"), -DERBY_EMBEDDED("jdbc:derby:", "org.apache.derby.jdbc.EmbeddedDriver"), -FIREBIRD("jdbc:firebird:", "org.firebirdsql.jdbc.FBDriver"), -FIREBIRD_SQL("jdbc:firebirdsql:", "org.firebirdsql.jdbc.FBDriver"), -H2("jdbc:h2:", "org.h2.Driver"), -HSQL("jdbc:hsqldb:", "org.hsqldb.jdbcDriver"), -INFORMIX("jdbc:informix-sqli:", "com.informix.jdbc.IfxDriver"), -JTDS("jdbc:jtds:", "net.sourceforge.jtds.jdbc.Driver"), -MARIADB("jdbc:mariadb:", "org.mariadb.jdbc.Driver"), -MYSQL("jdbc:mysql:", "com.mysql.cj.jdbc.Driver"), -MYSQL_GOOGLE("jdbc:google:", "com.mysql.jdbc.GoogleDriver"), -ORACLE("jdbc:oracle", "oracle.jdbc.OracleDriver"), -POSTGRESQL("jdbc:postgresql:", "org.postgresql.Driver"), -REDSHIFT("jdbc:redshift:", "com.amazon.redshift.jdbc42.Driver"), -SAPHANA("jdbc:sap:", "com.sap.db.jdbc.Driver"), -SNOWFLAKE("jdbc:snowflake:", "net.snowflake.client.jdbc.SnowflakeDriver"), -SQLDROID("jdbc:sqldroid:", "org.sqldroid.SQLDroidDriver"), -SQLLITE("jdbc:sqlite:", "org.sqlite.JDBC"), -SQLSERVER("jdbc:sqlserver:", "com.microsoft.sqlserver.jdbc.SQLServerDriver"), -SYBASE("jdbc:sybase:", "com.sybase.jdbc4.jdbc.SybDriver"), -TEST_CONTAINERS("jdbc:tc:", "org.testcontainers.jdbc.ContainerDatabaseDriver"), -OPENMLDB("jdbc:openmldb
[pulsar] branch branch-2.11 updated: Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 324cab95b2c Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914) 324cab95b2c is described below commit 324cab95b2cd2899d500d393114073b15cb825e2 Author: Enrico Olivelli AuthorDate: Tue Oct 4 08:22:03 2022 +0200 Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914) This reverts commit 8d7ac33751c62383b510a04ec223981bd70cd4db. (cherry picked from commit 9d6c34ea5d77bb96ecc21b1ec3a18fa4b730e7bd) --- .../bookkeeper/BookieServiceInfoSerde.java | 55 +--- .../bookkeeper/PulsarRegistrationClient.java | 36 - .../bookkeeper/PulsarRegistrationClientTest.java | 59 -- 3 files changed, 2 insertions(+), 148 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java index b7e3024b637..78a33179e76 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.discover.BookieServiceInfo; -import org.apache.bookkeeper.discover.BookieServiceInfoUtils; import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.Stat; @@ -64,57 +63,7 @@ public class BookieServiceInfoSerde implements MetadataSerde } @Override -public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException { -// see https://github.com/apache/bookkeeper/blob/ -// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ -// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311 -String bookieId = extractBookiedIdFromPath(path); -if (bookieServiceInfo == null || bookieServiceInfo.length == 0) { -return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId); -} - -BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo); -BookieServiceInfo bsi = new BookieServiceInfo(); -List endpoints = builder.getEndpointsList().stream() -.map(e -> { -BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); -endpoint.setId(e.getId()); -endpoint.setPort(e.getPort()); -endpoint.setHost(e.getHost()); -endpoint.setProtocol(e.getProtocol()); -endpoint.setAuth(e.getAuthList()); -endpoint.setExtensions(e.getExtensionsList()); -return endpoint; -}) -.collect(Collectors.toList()); - -bsi.setEndpoints(endpoints); -bsi.setProperties(builder.getPropertiesMap()); - -return bsi; - -} - -/** - * Extract the BookieId - * The path should look like /ledgers/available/bookieId - * or /ledgers/available/readonly/bookieId. - * But the prefix depends on the configuration. - * @param path - * @return the bookieId - */ -private static String extractBookiedIdFromPath(String path) throws IOException { -// https://github.com/apache/bookkeeper/blob/ -// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ -// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258 -if (path == null) { -path = ""; -} -int last = path.lastIndexOf("/"); -if (last >= 0) { -return path.substring(last + 1); -} else { -throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node"); -} +public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException { +return null; } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index 1c692404318..52b50e3ea4b 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/b
[pulsar] branch branch-2.10 updated: Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new 850c9448a5a Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914) 850c9448a5a is described below commit 850c9448a5ac32e2f94988b8bf80955c93ef9d6c Author: Enrico Olivelli AuthorDate: Tue Oct 4 08:22:03 2022 +0200 Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914) This reverts commit 8d7ac33751c62383b510a04ec223981bd70cd4db. (cherry picked from commit 9d6c34ea5d77bb96ecc21b1ec3a18fa4b730e7bd) --- .../bookkeeper/BookieServiceInfoSerde.java | 55 +--- .../bookkeeper/PulsarRegistrationClient.java | 36 - .../bookkeeper/PulsarRegistrationClientTest.java | 59 -- 3 files changed, 2 insertions(+), 148 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java index b7e3024b637..78a33179e76 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.discover.BookieServiceInfo; -import org.apache.bookkeeper.discover.BookieServiceInfoUtils; import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.Stat; @@ -64,57 +63,7 @@ public class BookieServiceInfoSerde implements MetadataSerde } @Override -public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException { -// see https://github.com/apache/bookkeeper/blob/ -// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ -// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311 -String bookieId = extractBookiedIdFromPath(path); -if (bookieServiceInfo == null || bookieServiceInfo.length == 0) { -return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId); -} - -BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo); -BookieServiceInfo bsi = new BookieServiceInfo(); -List endpoints = builder.getEndpointsList().stream() -.map(e -> { -BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); -endpoint.setId(e.getId()); -endpoint.setPort(e.getPort()); -endpoint.setHost(e.getHost()); -endpoint.setProtocol(e.getProtocol()); -endpoint.setAuth(e.getAuthList()); -endpoint.setExtensions(e.getExtensionsList()); -return endpoint; -}) -.collect(Collectors.toList()); - -bsi.setEndpoints(endpoints); -bsi.setProperties(builder.getPropertiesMap()); - -return bsi; - -} - -/** - * Extract the BookieId - * The path should look like /ledgers/available/bookieId - * or /ledgers/available/readonly/bookieId. - * But the prefix depends on the configuration. - * @param path - * @return the bookieId - */ -private static String extractBookiedIdFromPath(String path) throws IOException { -// https://github.com/apache/bookkeeper/blob/ -// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ -// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258 -if (path == null) { -path = ""; -} -int last = path.lastIndexOf("/"); -if (last >= 0) { -return path.substring(last + 1); -} else { -throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node"); -} +public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException { +return null; } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index 1c692404318..52b50e3ea4b 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/b
[pulsar] branch master updated: Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9d6c34ea5d7 Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914) 9d6c34ea5d7 is described below commit 9d6c34ea5d77bb96ecc21b1ec3a18fa4b730e7bd Author: Enrico Olivelli AuthorDate: Tue Oct 4 08:22:03 2022 +0200 Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914) This reverts commit 8d7ac33751c62383b510a04ec223981bd70cd4db. --- .../bookkeeper/BookieServiceInfoSerde.java | 55 +--- .../bookkeeper/PulsarRegistrationClient.java | 36 - .../bookkeeper/PulsarRegistrationClientTest.java | 59 -- 3 files changed, 2 insertions(+), 148 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java index b7e3024b637..78a33179e76 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.discover.BookieServiceInfo; -import org.apache.bookkeeper.discover.BookieServiceInfoUtils; import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.Stat; @@ -64,57 +63,7 @@ public class BookieServiceInfoSerde implements MetadataSerde } @Override -public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException { -// see https://github.com/apache/bookkeeper/blob/ -// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ -// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311 -String bookieId = extractBookiedIdFromPath(path); -if (bookieServiceInfo == null || bookieServiceInfo.length == 0) { -return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId); -} - -BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo); -BookieServiceInfo bsi = new BookieServiceInfo(); -List endpoints = builder.getEndpointsList().stream() -.map(e -> { -BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); -endpoint.setId(e.getId()); -endpoint.setPort(e.getPort()); -endpoint.setHost(e.getHost()); -endpoint.setProtocol(e.getProtocol()); -endpoint.setAuth(e.getAuthList()); -endpoint.setExtensions(e.getExtensionsList()); -return endpoint; -}) -.collect(Collectors.toList()); - -bsi.setEndpoints(endpoints); -bsi.setProperties(builder.getPropertiesMap()); - -return bsi; - -} - -/** - * Extract the BookieId - * The path should look like /ledgers/available/bookieId - * or /ledgers/available/readonly/bookieId. - * But the prefix depends on the configuration. - * @param path - * @return the bookieId - */ -private static String extractBookiedIdFromPath(String path) throws IOException { -// https://github.com/apache/bookkeeper/blob/ -// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ -// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258 -if (path == null) { -path = ""; -} -int last = path.lastIndexOf("/"); -if (last >= 0) { -return path.substring(last + 1); -} else { -throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node"); -} +public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException { +return null; } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index 1c692404318..52b50e3ea4b 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java @@ -25,21 +25,15 @@ import io.netty.util.concurrent.DefaultThreadF
[pulsar] branch master updated: [fix] Remove pulsar-broker-common dependency from pulsar-client (#17855)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 74d6305b748 [fix] Remove pulsar-broker-common dependency from pulsar-client (#17855) 74d6305b748 is described below commit 74d6305b748568ac2fa389e4639be0586ef3c1ca Author: Nicolò Boschi AuthorDate: Fri Sep 30 13:46:28 2022 +0200 [fix] Remove pulsar-broker-common dependency from pulsar-client (#17855) * [fix] Remove pulsar-broker-common dependency from pulsar-client * fix newline * add enforcer rule * Move packages-core to jdk8 bytecode * checkstyle * use variables * style * Fix annotation discovery * Fix kafka module compile --- pom.xml| 3 +- .../pulsar/utils/CmdGenerateDocumentation.java | 2 +- pulsar-client-all/pom.xml | 27 + pulsar-client/pom.xml | 7 -- .../client/impl/conf/CmdGenerateDocumentation.java | 2 +- .../common/util}/BaseGenerateDocumentation.java| 129 ++--- pulsar-io/kafka/pom.xml| 10 ++ pulsar-package-management/core/pom.xml | 8 ++ .../proxy/util/CmdGenerateDocumentation.java | 2 +- 9 files changed, 139 insertions(+), 51 deletions(-) diff --git a/pom.xml b/pom.xml index 8bb53b47943..a32dd8f377a 100644 --- a/pom.xml +++ b/pom.xml @@ -254,7 +254,7 @@ flexible messaging model and an intuitive client API. 3.0.0 4.0.rc2 1.0 -3.0.0 +3.1.0 @@ -281,6 +281,7 @@ flexible messaging model and an intuitive client API. 0.4 7.1.0 0.9.15 +1.6.1 rename-netty-native-libs.sh diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/CmdGenerateDocumentation.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/CmdGenerateDocumentation.java index 3f5df60faea..c784ff84408 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/CmdGenerateDocumentation.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/CmdGenerateDocumentation.java @@ -21,9 +21,9 @@ package org.apache.pulsar.utils; import com.beust.jcommander.Parameters; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.BaseGenerateDocumentation; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.util.BaseGenerateDocumentation; import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration; @Data diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml index 6645488d704..1adaafbb90d 100644 --- a/pulsar-client-all/pom.xml +++ b/pulsar-client-all/pom.xml @@ -397,6 +397,33 @@ + +org.apache.maven.plugins +maven-enforcer-plugin +${maven-enforcer-plugin.version} + + +enforce-bytecode-version + + enforce + + + + + ${pulsar.client.compiler.release} + + + + + + + +org.codehaus.mojo +extra-enforcer-rules +${extra-enforcer-rules.version} + + + diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index c0b21c64e6d..8cf75e89f52 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -45,13 +45,6 @@ ${project.parent.version} - - org.apache.pulsar - pulsar-broker-common - ${project.parent.version} - compile - - ${project.groupId} bouncy-castle-bc diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/CmdGenerateDocumentation.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/CmdGenerateDocumentation.java index 14059c0db64..28ad0263cf6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/CmdGenerateDocumentation.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/CmdGenerateDocumentation.java @@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl.conf; import com.beust.jcommander.Parameters; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.broker.BaseGenerateDocumentation; +import org.apache.pulsar.common.util.BaseGenerateDocumentation; @Data @Parameters(commandDescription = "Generate documentation automatically.") diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BaseGenerateDocumentation.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/BaseGenerateDocumentation.java similarity index
[pulsar] branch master updated (3de690d44de -> 9026d1954d1)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 3de690d44de [fix][flaky-test]ProxyConnectionThrottlingTest.testInboundConnection (#17883) add 9026d1954d1 [Fix][Tiered Storage] Eagerly Delete Offloaded Segments On Topic Deletion (#15914) No new revisions were added by this update. Summary of changes: .../bookkeeper/mledger/ManagedLedgerFactory.java | 20 ++ .../mledger/impl/ManagedLedgerFactoryImpl.java | 110 +-- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 78 .../bookkeeper/mledger/offload/OffloadUtils.java | 28 +++ .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 8 +- .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 61 ++ .../pulsar/broker/service/BrokerService.java | 32 ++-- .../broker/service/persistent/PersistentTopic.java | 108 ++- .../pulsar/broker/service/PersistentTopicTest.java | 2 + .../tests/integration/offload/TestBaseOffload.java | 210 +++-- .../integration/offload/TestFileSystemOffload.java | 5 +- .../integration/offload/TestOffloadDeletionFS.java | 144 ++ .../tests/integration/offload/TestS3Offload.java | 2 +- .../offload/TestUniversalConfigurations.java | 2 +- .../suites/PulsarTieredStorageTestSuite.java | 4 +- 15 files changed, 666 insertions(+), 148 deletions(-) create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestOffloadDeletionFS.java
[pulsar] branch master updated: [Improve][Standalone] Standalone Add param of --metadata-url for runing with metadata (#17077)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3e51af5876f [Improve][Standalone] Standalone Add param of --metadata-url for runing with metadata (#17077) 3e51af5876f is described below commit 3e51af5876f1c1da1a46d1278bc210d449321cf8 Author: Lan AuthorDate: Mon Sep 26 17:44:35 2022 +0800 [Improve][Standalone] Standalone Add param of --metadata-url for runing with metadata (#17077) --- .../main/java/org/apache/pulsar/PulsarStandalone.java | 17 - 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java index 2f3d15b1eaf..4fd22eba785 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java @@ -223,6 +223,10 @@ public class PulsarStandalone implements AutoCloseable { description = "Directory for storing metadata") private String metadataDir = "data/metadata"; +@Parameter(names = { "--metadata-url" }, +description = "Metadata store url") +private String metadataStoreUrl = ""; + @Parameter(names = {"--zookeeper-port"}, description = "Local zookeeper's port", hidden = true) private int zkPort = 2181; @@ -290,7 +294,7 @@ public class PulsarStandalone implements AutoCloseable { if (!this.isOnlyBroker()) { if (usingNewDefaultsPIP117) { -startBookieWithRocksDB(); +startBookieWithMetadataStore(); } else { startBookieWithZookeeper(); } @@ -434,10 +438,13 @@ public class PulsarStandalone implements AutoCloseable { } } - -private void startBookieWithRocksDB() throws Exception { -log.info("Starting BK with RocksDb metadata store"); -String metadataStoreUrl = "rocksdb://" + Paths.get(metadataDir).toAbsolutePath(); +private void startBookieWithMetadataStore() throws Exception { +if (StringUtils.isBlank(metadataStoreUrl)){ +log.info("Starting BK with RocksDb metadata store"); +metadataStoreUrl = "rocksdb://" + Paths.get(metadataDir).toAbsolutePath(); +} else { +log.info("Starting BK with metadata store:", metadataStoreUrl); +} bkCluster = BKCluster.builder() .metadataServiceUri(metadataStoreUrl) .bkPort(bkPort)
[pulsar] branch master updated: Make BookieId work with PulsarRegistrationDriver (#17762)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8d7ac33751c Make BookieId work with PulsarRegistrationDriver (#17762) 8d7ac33751c is described below commit 8d7ac33751c62383b510a04ec223981bd70cd4db Author: Enrico Olivelli AuthorDate: Sat Sep 24 22:07:49 2022 +0200 Make BookieId work with PulsarRegistrationDriver (#17762) * Make BookieId work with PulsarRegistrationDriver * Switch to MetadataCache * checkstyle --- .../bookkeeper/BookieServiceInfoSerde.java | 55 +++- .../bookkeeper/PulsarRegistrationClient.java | 36 + .../bookkeeper/PulsarRegistrationClientTest.java | 59 ++ 3 files changed, 148 insertions(+), 2 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java index 78a33179e76..b7e3024b637 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.discover.BookieServiceInfo; +import org.apache.bookkeeper.discover.BookieServiceInfoUtils; import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.Stat; @@ -63,7 +64,57 @@ public class BookieServiceInfoSerde implements MetadataSerde } @Override -public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException { -return null; +public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException { +// see https://github.com/apache/bookkeeper/blob/ +// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ +// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311 +String bookieId = extractBookiedIdFromPath(path); +if (bookieServiceInfo == null || bookieServiceInfo.length == 0) { +return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId); +} + +BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo); +BookieServiceInfo bsi = new BookieServiceInfo(); +List endpoints = builder.getEndpointsList().stream() +.map(e -> { +BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint(); +endpoint.setId(e.getId()); +endpoint.setPort(e.getPort()); +endpoint.setHost(e.getHost()); +endpoint.setProtocol(e.getProtocol()); +endpoint.setAuth(e.getAuthList()); +endpoint.setExtensions(e.getExtensionsList()); +return endpoint; +}) +.collect(Collectors.toList()); + +bsi.setEndpoints(endpoints); +bsi.setProperties(builder.getPropertiesMap()); + +return bsi; + +} + +/** + * Extract the BookieId + * The path should look like /ledgers/available/bookieId + * or /ledgers/available/readonly/bookieId. + * But the prefix depends on the configuration. + * @param path + * @return the bookieId + */ +private static String extractBookiedIdFromPath(String path) throws IOException { +// https://github.com/apache/bookkeeper/blob/ +// 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/ +// src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258 +if (path == null) { +path = ""; +} +int last = path.lastIndexOf("/"); +if (last >= 0) { +return path.substring(last + 1); +} else { +throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node"); +} } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index 52b50e3ea4b..1c692404318 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java @@ -25,15 +25,21 @@ import io.netty.util.concurrent.DefaultThreadFactory; im
[pulsar] branch master updated (a8b265da323 -> 63d4cf20e7b)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from a8b265da323 [fix] [python client] Better Python garbage collection management for C++-owned objects (#16535) add 63d4cf20e7b ManagedLedger: move to FENCED state in case of BadVersionException (#17736) No new revisions were added by this update. Summary of changes: .../bookkeeper/mledger/ManagedLedgerException.java | 4 ++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 54 ++--- .../mledger/impl/ManagedLedgerErrorsTest.java | 70 ++ .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 48 +++ .../broker/service/persistent/PersistentTopic.java | 42 ++--- .../broker/service/BrokerBkEnsemblesTests.java | 2 - 6 files changed, 189 insertions(+), 31 deletions(-)
[pulsar] branch master updated: [fix][metadata] Set revalidateAfterReconnection true for certain failures (#17664)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new c40c7ee9fb3 [fix][metadata] Set revalidateAfterReconnection true for certain failures (#17664) c40c7ee9fb3 is described below commit c40c7ee9fb35eafea9c9923dfcf62706ea5d36bf Author: Michael Marshall AuthorDate: Sun Sep 18 06:27:59 2022 -0700 [fix][metadata] Set revalidateAfterReconnection true for certain failures (#17664) --- .../org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java index bc3f47d41dc..22eaccc278b 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java @@ -196,6 +196,7 @@ public class ResourceLockImpl implements ResourceLock { // We failed to revalidate the lock due to connectivity issue // Continue assuming we hold the lock, until we can revalidate it, either // on Reconnected or SessionReestablished events. +revalidateAfterReconnection = true; log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path, ex.getCause().getMessage()); }
[pulsar] branch master updated (fac14fbb759 -> d7c09be15c3)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from fac14fbb759 [fix][test] Fix flaky test ResourceGroupUsageAggregationTest. testProduceConsumeUsageOnRG (#17617) add d7c09be15c3 [improve][cli] Pulsar shell: allow to create a new config (--file) with a relative path (#17675) No new revisions were added by this update. Summary of changes: bin/pulsar-shell | 1 + bin/pulsar-shell.cmd | 1 + .../src/main/java/org/apache/pulsar/shell/ConfigShell.java| 11 ++- .../java/org/apache/pulsar/shell/JCommanderCompleter.java | 3 +-- 4 files changed, 13 insertions(+), 3 deletions(-)
[pulsar] branch master updated: [bugfix] Prevent Automatic Topic Creation during namespace deletion (#17609)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new af983049ccd [bugfix] Prevent Automatic Topic Creation during namespace deletion (#17609) af983049ccd is described below commit af983049ccd52da1e795032d9a7ba674c6df4b04 Author: Enrico Olivelli AuthorDate: Thu Sep 15 17:38:19 2022 +0200 [bugfix] Prevent Automatic Topic Creation during namespace deletion (#17609) --- .../pulsar/broker/service/BrokerService.java | 44 +++--- .../apache/pulsar/broker/admin/AdminApi2Test.java | 34 .../pulsar/broker/admin/TopicAutoCreationTest.java | 99 ++ 3 files changed, 163 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 17ce9c1ee5a..77cc8f11ff5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2766,6 +2766,9 @@ public class BrokerService implements Closeable { if (pulsar.getNamespaceService() == null) { return FutureUtil.failedFuture(new NamingException("namespace service is not ready")); } +Optional policies = +pulsar.getPulsarResources().getNamespaceResources() +.getPoliciesIfCached(topicName.getNamespaceObject()); return pulsar.getNamespaceService().checkTopicExists(topicName) .thenCompose(topicExists -> { return fetchPartitionedTopicMetadataAsync(topicName) @@ -2780,10 +2783,12 @@ public class BrokerService implements Closeable { if (metadata.partitions == 0 && !topicExists && !topicName.isPartitioned() -&& pulsar.getBrokerService().isAllowAutoTopicCreation(topicName) -&& pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) { +&& pulsar.getBrokerService().isAllowAutoTopicCreation(topicName, policies) +&& pulsar.getBrokerService() + .isDefaultTopicTypePartitioned(topicName, policies)) { - pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName) +pulsar.getBrokerService() + .createDefaultPartitionedTopicAsync(topicName, policies) .thenAccept(md -> future.complete(md)) .exceptionally(ex -> { if (ex.getCause() @@ -2813,8 +2818,9 @@ public class BrokerService implements Closeable { } @SuppressWarnings("deprecation") -private CompletableFuture createDefaultPartitionedTopicAsync(TopicName topicName) { -final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName); +private CompletableFuture createDefaultPartitionedTopicAsync(TopicName topicName, + Optional policies) { +final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies); final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0"); @@ -3000,11 +3006,23 @@ public class BrokerService implements Closeable { } public boolean isAllowAutoTopicCreation(final TopicName topicName) { +Optional policies = +pulsar.getPulsarResources().getNamespaceResources() +.getPoliciesIfCached(topicName.getNamespaceObject()); +return isAllowAutoTopicCreation(topicName, policies); +} + +public boolean isAllowAutoTopicCreation(final TopicName topicName, final Optional policies) { +if (policies.isPresent() && policies.get().deleted) { +log.info("Preventing AutoTopicCreation on a namespace that is being deleted {}", +topicName.getNamespaceObject()); +return false; +} //System topic can always be created automatically i
[pulsar] branch master updated (ff49c5c35f6 -> 4cf8b8040e8)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from ff49c5c35f6 [cleanup][broker][Modernizer] fix violations in pulsar-broker (#17275) add 4cf8b8040e8 [transactions] Fix minor leak of FlushContext (#17399) No new revisions were added by this update. Summary of changes: .../transaction/coordinator/impl/TxnLogBufferedWriter.java| 11 ++- 1 file changed, 6 insertions(+), 5 deletions(-)
[pulsar] branch master updated (cbbcd41cfc8 -> 2a7aea7bc14)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from cbbcd41cfc8 [improve][broker] Cancel the loadShedding task when closing pulsar service (#17632) add 2a7aea7bc14 [fix][cli] Pulsar shell: support relative paths (cliextensions) (#17648) No new revisions were added by this update. Summary of changes: bin/pulsar-shell | 2 ++ bin/pulsar-shell.cmd | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-)
[pulsar] branch master updated (5a87c47705a -> 8e0ae805513)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 5a87c47705a [broker] Do not log stacktrace for 'Failed to flush mark-delete position' case (#17432) add 8e0ae805513 [fix][metadata] Don't execute Bookkeeper metadata callbacks on Zookeeper event thread (#17620) No new revisions were added by this update. Summary of changes: .../metadata/bookkeeper/PulsarRegistrationClient.java| 16 1 file changed, 12 insertions(+), 4 deletions(-)
[pulsar] branch master updated: [broker] Do not log stacktrace for 'Failed to flush mark-delete position' case (#17432)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 5a87c47705a [broker] Do not log stacktrace for 'Failed to flush mark-delete position' case (#17432) 5a87c47705a is described below commit 5a87c47705a156534c7be42dce2cc71707e80998 Author: Enrico Olivelli AuthorDate: Wed Sep 14 10:02:58 2022 +0200 [broker] Do not log stacktrace for 'Failed to flush mark-delete position' case (#17432) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 16 ++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 59da6fc81da..107390084d6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1729,7 +1729,7 @@ public class ManagedCursorImpl implements ManagedCursor { */ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) { if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) { -throw new IllegalArgumentException( +throw new MarkDeletingMarkedPosition( "Mark deleting an already mark-deleted position. Current mark-delete: " + markDeletePosition + " -- attempted mark delete: " + newMarkDeletePosition); } @@ -1800,6 +1800,12 @@ public class ManagedCursorImpl implements ManagedCursor { asyncMarkDelete(position, Collections.emptyMap(), callback, ctx); } +private final class MarkDeletingMarkedPosition extends IllegalArgumentException { +public MarkDeletingMarkedPosition(String s) { +super(s); +} +} + @Override public void asyncMarkDelete(final Position position, Map properties, final MarkDeleteCallback callback, final Object ctx) { @@ -3289,7 +3295,13 @@ public class ManagedCursorImpl implements ManagedCursor { @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { -log.warn("[{}][{}] Failed to flush mark-delete position", ledger.getName(), name, exception); +if (exception.getCause() instanceof MarkDeletingMarkedPosition) { +// this is not actually a problem, we should not log a stacktrace +log.info("[{}][{}] Cannot flush mark-delete position: {}", ledger.getName(), +name, exception.getCause().getMessage()); +} else { +log.warn("[{}][{}] Failed to flush mark-delete position", ledger.getName(), name, exception); +} } }, null); }
[pulsar] branch master updated: Issue 17588: Allow deletion of a namespace that was left in deleted status (#17592)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 869339dbde7 Issue 17588: Allow deletion of a namespace that was left in deleted status (#17592) 869339dbde7 is described below commit 869339dbde719a11768687c58f6cd1d8a341 Author: Enrico Olivelli AuthorDate: Wed Sep 14 08:42:02 2022 +0200 Issue 17588: Allow deletion of a namespace that was left in deleted status (#17592) --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 2 +- .../java/org/apache/pulsar/broker/web/PulsarWebResource.java | 7 ++- .../java/org/apache/pulsar/broker/admin/NamespacesTest.java | 12 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 3467d25777e..9f3d36043b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1238,7 +1238,7 @@ public class NamespaceService implements AutoCloseable { public CompletableFuture> getListOfNonPersistentTopics(NamespaceName namespaceName) { -return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName) +return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName, true) .thenCompose(peerClusterData -> { // if peer-cluster-data is present it means namespace is owned by that peer-cluster and request // should redirect to the peer-cluster diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 85638831bbb..2184ca44f2a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -856,6 +856,11 @@ public abstract class PulsarWebResource { public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, NamespaceName namespace) { +return checkLocalOrGetPeerReplicationCluster(pulsarService, namespace, false); +} +public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, + NamespaceName namespace, + boolean allowDeletedNamespace) { if (!namespace.isGlobal() || NamespaceService.isHeartbeatNamespace(namespace)) { return CompletableFuture.completedFuture(null); } @@ -867,7 +872,7 @@ public abstract class PulsarWebResource { .getPoliciesAsync(namespace).thenAccept(policiesResult -> { if (policiesResult.isPresent()) { Policies policies = policiesResult.get(); -if (policies.deleted) { +if (!allowDeletedNamespace && policies.deleted) { String msg = String.format("Namespace %s is deleted", namespace.toString()); log.warn(msg); validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 73cf4914ffe..c43faa35279 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -1200,6 +1200,18 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { topicList = admin.topics().getList(namespace); assertTrue(topicList.isEmpty()); +// simulate a partially deleted namespace, we should be able to recover +pulsar.getPulsarResources().getNamespaceResources() +.setPolicies(NamespaceName.get(namespace), old -> { +old.deleted = true; +return old; +}); +admin.namespaces().deleteNamespace(namespace, true); + +admin.namespaces().createNamespace(namespace, 100); +topicList = admin.topics().getList(namespace); +assertTrue(topicList.isEmpty()); + // reset back to false pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); }
[pulsar] branch master updated: Move testAutoSchemaFunction and testAvroSchemaFunction to Java functions IT (#17372)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new c50a1e451d0 Move testAutoSchemaFunction and testAvroSchemaFunction to Java functions IT (#17372) c50a1e451d0 is described below commit c50a1e451d0e144e4ae1eaa56b8fe358aaebbb29 Author: Christophe Bornet AuthorDate: Tue Sep 13 18:03:53 2022 +0200 Move testAutoSchemaFunction and testAvroSchemaFunction to Java functions IT (#17372) --- .../tests/integration/functions/PulsarFunctionsTest.java | 9 +++-- .../integration/functions/java/PulsarFunctionsJavaTest.java| 10 ++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 908d95784d6..83a43d2adfa 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -86,7 +86,6 @@ import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; -import org.testng.annotations.Test; /** * A test base for testing functions. @@ -1260,10 +1259,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { result.assertNoStderr(); } -@Test(groups = "function") -public void testAutoSchemaFunction() throws Exception { +protected void testAutoSchemaFunction() throws Exception { String inputTopicName = "test-autoschema-input-" + randomName(8); -String outputTopicName = "test-autoshcema-output-" + randomName(8); +String outputTopicName = "test-autoschema-output-" + randomName(8); String functionName = "test-autoschema-fn-" + randomName(8); final int numMessages = 10; @@ -1326,8 +1324,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { } } -@Test(groups = "function") -public void testAvroSchemaFunction() throws Exception { +protected void testAvroSchemaFunction() throws Exception { log.info("testAvroSchemaFunction start ..."); final String inputTopic = "test-avroschema-input-" + randomName(8); final String outputTopic = "test-avroschema-output-" + randomName(8); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java index 13ae7253b36..923870cabb5 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java @@ -203,4 +203,14 @@ public abstract class PulsarFunctionsJavaTest extends PulsarFunctionsTest { testRecordFunction(); } +@Test(groups = {"java_function", "function"}) +public void testAutoSchemaFunctionTest() throws Exception { +testAutoSchemaFunction(); +} + +@Test(groups = {"java_function", "function"}) +public void testAvroSchemaFunctionTest() throws Exception { +testAvroSchemaFunction(); +} + }
[pulsar] branch master updated: [fix][functions] Fix K8S download function method with auth enabled (#17597)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d23b7604d5a [fix][functions] Fix K8S download function method with auth enabled (#17597) d23b7604d5a is described below commit d23b7604d5ad919e93f73c044de04f6c58b702ae Author: Nicolò Boschi AuthorDate: Tue Sep 13 09:04:30 2022 +0200 [fix][functions] Fix K8S download function method with auth enabled (#17597) --- .../runtime/kubernetes/KubernetesRuntime.java | 24 +++ .../runtime/kubernetes/KubernetesRuntimeTest.java | 74 -- 2 files changed, 83 insertions(+), 15 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index f7791e716d9..e6e85d66d0e 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -875,17 +875,7 @@ public class KubernetesRuntime implements Runtime { ArrayList cmd = new ArrayList<>(Arrays.asList( pulsarRootDir + configAdminCLI, "--admin-url", -pulsarAdminUrl, -"functions", -"download", -"--tenant", -tenant, -"--namespace", -namespace, -"--name", -name, -"--destination-file", -userCodeFilePath)); +pulsarAdminUrl)); // add auth plugin and parameters if necessary if (authenticationEnabled && authConfig != null) { @@ -900,6 +890,18 @@ public class KubernetesRuntime implements Runtime { } } +cmd.addAll(Arrays.asList( +"functions", +"download", +"--tenant", +tenant, +"--namespace", +namespace, +"--name", +name, +"--destination-file", +userCodeFilePath)); + if (transformFunction) { cmd.add("--transform-function"); } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index 33451a316e4..cf86623eafd 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -64,6 +64,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH; @@ -183,10 +184,21 @@ public class KubernetesRuntimeTest { } } + KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding, double cpuOverCommitRatio, double memoryOverCommitRatio, Optional manifestCustomizer, String downloadDirectory) throws Exception { +return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, +manifestCustomizer, downloadDirectory, null, null); +} + +KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding, +double cpuOverCommitRatio, double memoryOverCommitRatio, + Optional manifestCustomizer, +String downloadDirectory, + Consumer workerConfigConsumer, + AuthenticationConfig authenticationConfig) throws Exception { KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory()); doNothing().when(factory).setupClient(); @@ -226,7 +238,11 @@ public class KubernetesRuntimeTes
[pulsar] branch branch-2.10 updated: Revert "Issue 17588: Allow deletion of a namespace that was left in deleted status"
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new 8bd3b7adb53 Revert "Issue 17588: Allow deletion of a namespace that was left in deleted status" 8bd3b7adb53 is described below commit 8bd3b7adb53221a0eab89aaf514fc1a6a3bc7e6a Author: Enrico Olivelli AuthorDate: Tue Sep 13 08:33:39 2022 +0200 Revert "Issue 17588: Allow deletion of a namespace that was left in deleted status" This reverts commit 62c8b60a61d6b072d90197703d1f24d7375d19ba. --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 2 +- .../java/org/apache/pulsar/broker/web/PulsarWebResource.java | 7 +-- .../java/org/apache/pulsar/broker/admin/NamespacesTest.java | 12 3 files changed, 2 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index dbe41b42c8a..cf7ac381d43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1219,7 +1219,7 @@ public class NamespaceService implements AutoCloseable { public CompletableFuture> getListOfNonPersistentTopics(NamespaceName namespaceName) { -return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName, true) +return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName) .thenCompose(peerClusterData -> { // if peer-cluster-data is present it means namespace is owned by that peer-cluster and request // should be redirect to the peer-cluster diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 7cc019f2e25..fca5b47703e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -803,11 +803,6 @@ public abstract class PulsarWebResource { public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, NamespaceName namespace) { -return checkLocalOrGetPeerReplicationCluster(pulsarService, namespace, false); -} -public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, - NamespaceName namespace, - boolean allowDeletedNamespace) { if (!namespace.isGlobal()) { return CompletableFuture.completedFuture(null); } @@ -823,7 +818,7 @@ public abstract class PulsarWebResource { .getPoliciesAsync(namespace).thenAccept(policiesResult -> { if (policiesResult.isPresent()) { Policies policies = policiesResult.get(); -if (!allowDeletedNamespace && policies.deleted) { +if (policies.deleted) { String msg = String.format("Namespace %s is deleted", namespace.toString()); log.warn(msg); validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 50a9a1d554d..b754a592ba0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -1172,18 +1172,6 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { topicList = admin.topics().getList(namespace); assertTrue(topicList.isEmpty()); -// simulate a partially deleted namespace, we should be able to recover -pulsar.getPulsarResources().getNamespaceResources() -.setPolicies(NamespaceName.get(namespace), old -> { -old.deleted = true; -return old; -}); -admin.namespaces().deleteNamespace(namespace, true); - -admin.namespaces().createNamespace(namespace, 100); -topicList = admin.topics().getList(namespace); -assertTrue(topicList.isEmpty()); - // reset back to false pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); }
[pulsar] branch branch-2.10 updated: Issue 17588: Allow deletion of a namespace that was left in deleted status
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new 62c8b60a61d Issue 17588: Allow deletion of a namespace that was left in deleted status 62c8b60a61d is described below commit 62c8b60a61d6b072d90197703d1f24d7375d19ba Author: Enrico Olivelli AuthorDate: Mon Sep 12 15:20:29 2022 +0200 Issue 17588: Allow deletion of a namespace that was left in deleted status --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 2 +- .../java/org/apache/pulsar/broker/web/PulsarWebResource.java | 7 ++- .../java/org/apache/pulsar/broker/admin/NamespacesTest.java | 12 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index cf7ac381d43..dbe41b42c8a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1219,7 +1219,7 @@ public class NamespaceService implements AutoCloseable { public CompletableFuture> getListOfNonPersistentTopics(NamespaceName namespaceName) { -return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName) +return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName, true) .thenCompose(peerClusterData -> { // if peer-cluster-data is present it means namespace is owned by that peer-cluster and request // should be redirect to the peer-cluster diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index fca5b47703e..7cc019f2e25 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -803,6 +803,11 @@ public abstract class PulsarWebResource { public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, NamespaceName namespace) { +return checkLocalOrGetPeerReplicationCluster(pulsarService, namespace, false); +} +public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, + NamespaceName namespace, + boolean allowDeletedNamespace) { if (!namespace.isGlobal()) { return CompletableFuture.completedFuture(null); } @@ -818,7 +823,7 @@ public abstract class PulsarWebResource { .getPoliciesAsync(namespace).thenAccept(policiesResult -> { if (policiesResult.isPresent()) { Policies policies = policiesResult.get(); -if (policies.deleted) { +if (!allowDeletedNamespace && policies.deleted) { String msg = String.format("Namespace %s is deleted", namespace.toString()); log.warn(msg); validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index b754a592ba0..50a9a1d554d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -1172,6 +1172,18 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { topicList = admin.topics().getList(namespace); assertTrue(topicList.isEmpty()); +// simulate a partially deleted namespace, we should be able to recover +pulsar.getPulsarResources().getNamespaceResources() +.setPolicies(NamespaceName.get(namespace), old -> { +old.deleted = true; +return old; +}); +admin.namespaces().deleteNamespace(namespace, true); + +admin.namespaces().createNamespace(namespace, 100); +topicList = admin.topics().getList(namespace); +assertTrue(topicList.isEmpty()); + // reset back to false pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); }
[pulsar] branch master updated: [improve][test] try remove whitebox on MockZooKeeper (#17579)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new c1434fdf076 [improve][test] try remove whitebox on MockZooKeeper (#17579) c1434fdf076 is described below commit c1434fdf0762aa356343682f89d00389198037e5 Author: tison AuthorDate: Sun Sep 11 21:14:12 2022 +0800 [improve][test] try remove whitebox on MockZooKeeper (#17579) --- .../java/org/apache/zookeeper/MockZooKeeper.java | 34 ++ 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java index 39f3845c967..190720e5bcc 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java @@ -46,6 +46,7 @@ import java.util.function.BiPredicate; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.zookeeper.AsyncCallback.Children2Callback; import org.apache.zookeeper.AsyncCallback.ChildrenCallback; import org.apache.zookeeper.AsyncCallback.DataCallback; @@ -59,7 +60,6 @@ import org.apache.zookeeper.data.Stat; import org.objenesis.Objenesis; import org.objenesis.ObjenesisStd; import org.objenesis.instantiator.ObjectInstantiator; -import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,10 +144,7 @@ public class MockZooKeeper extends ZooKeeper { public static MockZooKeeper newInstance(ExecutorService executor, int readOpDelayMs) { try { -MockZooKeeper zk = createMockZooKeeperInstance(executor, readOpDelayMs); -ObjectInstantiator clientCnxnObjectInstantiator = objenesis.getInstantiatorOf(ClientCnxn.class); -Whitebox.setInternalState(zk, "cnxn", clientCnxnObjectInstantiator.newInstance()); -return zk; +return createMockZooKeeperInstance(executor, readOpDelayMs); } catch (RuntimeException e) { throw e; } catch (Exception e) { @@ -1018,33 +1015,32 @@ public class MockZooKeeper extends ZooKeeper { try { for (org.apache.zookeeper.Op op : ops) { switch (op.getType()) { -case ZooDefs.OpCode.create: { +case ZooDefs.OpCode.create -> { org.apache.zookeeper.Op.Create opc = ((org.apache.zookeeper.Op.Create) op); CreateMode cm = CreateMode.fromFlag(opc.flags); String path = this.create(op.getPath(), opc.data, null, cm); res.add(new OpResult.CreateResult(path)); -break; } -case ZooDefs.OpCode.delete: -this.delete(op.getPath(), Whitebox.getInternalState(op, "version")); +case ZooDefs.OpCode.delete -> { +this.delete(op.getPath(), (int) FieldUtils.readField(op, "version", true)); res.add(new OpResult.DeleteResult()); -break; -case ZooDefs.OpCode.setData: { -Stat stat = this.setData(op.getPath(), Whitebox.getInternalState(op, "data"), -Whitebox.getInternalState(op, "version")); +} +case ZooDefs.OpCode.setData -> { +Stat stat = this.setData( +op.getPath(), +(byte[]) FieldUtils.readField(op, "data", true), +(int) FieldUtils.readField(op, "version", true)); res.add(new OpResult.SetDataResult(stat)); -break; } -case ZooDefs.OpCode.getChildren: { +case ZooDefs.OpCode.getChildren -> { try { List children = this.getChildren(op.getPath(), null); res.add(new OpResult.GetChildrenResult(children)); } catch (KeeperException e) { res.add(new OpResult.ErrorResult(e.code().intValue())); } -break; } -case ZooDefs.OpCode.getData: { +case ZooDefs.OpCode.getData -> { Stat stat = new Stat(); try { byte[] payload = this.getData(op.getPath(), null, stat); @@ -1052,9 +1048
[pulsar] branch master updated: Revert "[fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. (#17308)" (#17566)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8d9e3c22b90 Revert "[fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. (#17308)" (#17566) 8d9e3c22b90 is described below commit 8d9e3c22b904d979a901a0c19fd6707df512b13a Author: Enrico Olivelli AuthorDate: Sat Sep 10 00:07:20 2022 +0200 Revert "[fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. (#17308)" (#17566) This reverts commit 9529850bd48557dcba124a157a69b75b8f41da3b. --- .../broker/service/persistent/PersistentTopic.java | 7 - .../broker/admin/AdminApiMultiBrokersTest.java | 98 --- .../apache/pulsar/broker/admin/NamespacesTest.java | 1 - .../broker/service/ExclusiveProducerTest.java | 8 +- .../pulsar/client/impl/TopicsConsumerImplTest.java | 28 .../integration/topics/TestTopicDeletion.java | 183 - 6 files changed, 29 insertions(+), 296 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 31dda5539a3..4d48f9c627e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1138,13 +1138,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal .map(PersistentSubscription::getName).toList(); return FutureUtil.failedFuture( new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs)); -} else if (TopicName.get(topic).isPartitioned() -&& (getProducers().size() > 0 || getNumberOfConsumers() > 0) -&& getBrokerService().isAllowAutoTopicCreation(topic)) { -// to avoid inconsistent metadata as a result -return FutureUtil.failedFuture( -new TopicBusyException("Partitioned topic has active consumers or producers and " -+ "auto-creation of topic is allowed")); } fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java index 5a0bde6f913..2cbff955ecf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.admin; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; @@ -28,8 +26,6 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.pulsar.broker.MultiBrokerBaseTest; @@ -37,16 +33,9 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.admin.internal.TopicsImpl; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; -import org.apache.pulsar.common.policies.data.RetentionPolicies; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -65,7 +54,6 @@ public class AdminApiMultiBrokersTest extends MultiBrokerBaseTest { @Override protected void doInitConf() throws Exception { super.doInitConf(); -this.conf.setManagedLedgerMaxEntriesPerLedger(10); } @Override @@ -134,90 +122,4 @@ public class AdminApiMultiBrokersTest extends MultiBrokerBaseTest {
[pulsar] branch master updated: [PIP-193] Support Transform Function with LocalRunner (#17445)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 327648cbb26 [PIP-193] Support Transform Function with LocalRunner (#17445) 327648cbb26 is described below commit 327648cbb268ca1fa1770d3254d00a818adc2e19 Author: Christophe Bornet AuthorDate: Thu Sep 8 12:45:48 2022 +0200 [PIP-193] Support Transform Function with LocalRunner (#17445) --- .../worker/PulsarFunctionLocalRunTest.java | 15 +++ .../org/apache/pulsar/functions/LocalRunner.java | 113 ++--- 2 files changed, 90 insertions(+), 38 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java index 5d6395a9813..d90b971b5fa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java @@ -896,6 +896,11 @@ public class PulsarFunctionLocalRunTest { } private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism, String className) throws Exception { +testPulsarSinkLocalRun(jarFilePathUrl, parallelism, className, null, null); +} + +private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism, String className, +String transformFunction, String transformFunctionClassName) throws Exception { final String namespacePortion = "io"; final String replNamespace = tenant + "/" + namespacePortion; final String sourceTopic = "persistent://" + replNamespace + "/input"; @@ -921,6 +926,9 @@ public class PulsarFunctionLocalRunTest { sinkConfig.setArchive(jarFilePathUrl); sinkConfig.setParallelism(parallelism); +sinkConfig.setTransformFunction(transformFunction); +sinkConfig.setTransformFunctionClassName(transformFunctionClassName); + int metricsPort = FunctionCommon.findAvailablePort(); @Cleanup LocalRunner localRunner = LocalRunner.builder() @@ -933,6 +941,7 @@ public class PulsarFunctionLocalRunTest { .tlsHostNameVerificationEnabled(false) .brokerServiceUrl(pulsar.getBrokerServiceUrlTls()) .connectorsDirectory(workerConfig.getConnectorsDirectory()) +.functionsDirectory(workerConfig.getFunctionsDirectory()) .metricsPortStart(metricsPort) .build(); @@ -1083,6 +1092,12 @@ public class PulsarFunctionLocalRunTest { public void testPulsarSinkStatsByteBufferType() throws Throwable { runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 1, StatsNullSink.class.getName())); } + +//@Test(timeOut = 2, groups = "builtin") +@Test(groups = "builtin") +public void testPulsarSinkWithFunction() throws Throwable { +testPulsarSinkLocalRun(null, 1, StatsNullSink.class.getName(), "builtin://exclamation", "org.apache.pulsar.functions.api.examples.RecordFunction"); +} public static class TestErrorSink implements Sink { private Map config; diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java index a308c98b3da..ae38b8cd652 100644 --- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java +++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions; +import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.pulsar.common.functions.Utils.inferMissingArguments; import com.beust.jcommander.IStringConverter; import com.beust.jcommander.JCommander; @@ -48,6 +49,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import lombok.Builder; +import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.Utils; @@ -92,8 +94,8 @@ public class LocalRunner implements AutoCloseable { private final String functionsDir; private final Thread shutdownHook; private final int instanceLivenessCheck; -private ClassLoader userCodeClassLoader; -private boolean userCodeClassLoaderCreated; +private UserCodeClassLoader userCodeClassLoader; +private UserCodeClassLoader transformFunctionCodeClassLoader; private RuntimeFactory runtime
[pulsar] branch branch-2.11 updated: Revert "Enable Log4j2 async loggers (#15188)" (#17474)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 6b625c29e06 Revert "Enable Log4j2 async loggers (#15188)" (#17474) 6b625c29e06 is described below commit 6b625c29e06c173d6b449bffa969c1d312ef3146 Author: Christophe Bornet AuthorDate: Mon Sep 5 16:18:26 2022 +0200 Revert "Enable Log4j2 async loggers (#15188)" (#17474) This reverts commit 1e9a1f087e8c6fd9e78f6e3f0635113ba7927bbf. (cherry picked from commit 35d3d5cb859c77c468a7a15c1497ee089365d83b) --- bin/pulsar | 5 + conf/log4j2.yaml | 1 - distribution/server/pom.xml | 5 - distribution/server/src/assemble/LICENSE.bin.txt | 2 -- 4 files changed, 1 insertion(+), 12 deletions(-) diff --git a/bin/pulsar b/bin/pulsar index 205659f8d18..0658762c96b 100755 --- a/bin/pulsar +++ b/bin/pulsar @@ -316,11 +316,8 @@ OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR" OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL" OPTS="$OPTS -Dpulsar.log.root.level=$PULSAR_LOG_ROOT_LEVEL" OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT" - - # Configure log4j2 to disable servlet webapp detection so that Garbage free logging can be used -PULSAR_LOG4J_CONF=${PULSAR_LOG4J_CONF:-"-Dlog4j2.is.webapp=false -Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector -Dlog4j2.enableThreadlocals=true -Dlog4j2.enableDirectEncoders=true"} -OPTS="$OPTS $PULSAR_LOG4J_CONF" +OPTS="$OPTS -Dlog4j2.is.webapp=false" # Functions related logging OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR" diff --git a/conf/log4j2.yaml b/conf/log4j2.yaml index 177c6adc44a..71698d72067 100644 --- a/conf/log4j2.yaml +++ b/conf/log4j2.yaml @@ -53,7 +53,6 @@ Configuration: Console: name: Console target: SYSTEM_OUT - direct: true PatternLayout: Pattern: "%d{ISO8601_OFFSET_DATE_TIME_HHMM} [%t] %-5level %logger{36} - %msg%n" diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml index 52959729307..d30eae88907 100644 --- a/distribution/server/pom.xml +++ b/distribution/server/pom.xml @@ -64,11 +64,6 @@ ${jline.version} - - com.lmax - disruptor - - org.apache.zookeeper zookeeper-prometheus-metrics diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 261607eadd4..bdebbe84cde 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -543,8 +543,6 @@ The Apache Software License, Version 2.0 - io.etcd-jetcd-core-0.5.11.jar * IPAddress - com.github.seancfoley-ipaddress-5.3.3.jar - * LMAX Disruptor -- com.lmax-disruptor-3.4.3.jar * RxJava - io.reactivex.rxjava3-rxjava-3.0.1.jar * RabbitMQ Java Client
[pulsar] branch master updated (0e4e88b9b06 -> 35d3d5cb859)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 0e4e88b9b06 [improve][sec] suppress CVE-2021-3563 of openstack-keystone-2.5.0 (#17458) add 35d3d5cb859 Revert "Enable Log4j2 async loggers (#15188)" (#17474) No new revisions were added by this update. Summary of changes: bin/pulsar | 5 + conf/log4j2.yaml | 1 - distribution/server/pom.xml | 5 - distribution/server/src/assemble/LICENSE.bin.txt | 2 -- 4 files changed, 1 insertion(+), 12 deletions(-)
[pulsar] branch master updated (3a3a993b093 -> 1bf9a262d97)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 3a3a993b093 [managed-ledger] Do not send duplicate reads to BK/offloaders (#17241) add 1bf9a262d97 Fix OutputRecordSinkRecord getValue and getSchema (#17434) No new revisions were added by this update. Summary of changes: .../apache/pulsar/functions/instance/OutputRecordSinkRecord.java | 9 +++-- 1 file changed, 3 insertions(+), 6 deletions(-)
[pulsar] branch master updated: [managed-ledger] Do not send duplicate reads to BK/offloaders (#17241)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3a3a993b093 [managed-ledger] Do not send duplicate reads to BK/offloaders (#17241) 3a3a993b093 is described below commit 3a3a993b093291f2721a20eb5a981a3b7db557d9 Author: Enrico Olivelli AuthorDate: Fri Sep 2 15:09:34 2022 +0200 [managed-ledger] Do not send duplicate reads to BK/offloaders (#17241) --- .../mledger/impl/cache/PendingReadsManager.java| 447 .../mledger/impl/cache/RangeEntryCacheImpl.java| 106 +++-- .../impl/cache/PendingReadsManagerTest.java| 462 + .../client/api/MessageDispatchThrottlingTest.java | 8 + 4 files changed, 986 insertions(+), 37 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java new file mode 100644 index 000..4c374d8ace6 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java @@ -0,0 +1,447 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import io.prometheus.client.Counter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.AllArgsConstructor; +import lombok.Value; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.EntryImpl; + +/** + * PendingReadsManager tries to prevent sending duplicate reads to BK. + */ +public class PendingReadsManager { + +private static final Counter COUNT_ENTRIES_READ_FROM_BK = Counter +.build() +.name("pulsar_ml_cache_pendingreads_entries_read") +.help("Total number of entries read from BK") +.register(); + +private static final Counter COUNT_ENTRIES_NOTREAD_FROM_BK = Counter +.build() +.name("pulsar_ml_cache_pendingreads_entries_notread") +.help("Total number of entries not read from BK") +.register(); + +private static final Counter COUNT_PENDING_READS_MATCHED = Counter +.build() +.name("pulsar_ml_cache_pendingreads_matched") +.help("Pending reads reused with perfect range match") +.register(); +private static final Counter COUNT_PENDING_READS_MATCHED_INCLUDED = Counter +.build() +.name("pulsar_ml_cache_pendingreads_matched_included") +.help("Pending reads reused by attaching to a read with a larger range") +.register(); +private static final Counter COUNT_PENDING_READS_MISSED = Counter +.build() +.name("pulsar_ml_cache_pendingreads_missed") +.help("Pending reads that didn't find a match") +.register(); + +private static final Counter COUNT_PENDING_READS_MATCHED_OVERLAPPING_MISS_LEFT = Counter +.build() +.name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_left") +.help("Pending reads that didn't find a match but they partially overlap with another read") +.register(); + +private static final Counter COUNT_PENDING_READS_MATCHED_BUT_OVERLAPPING_MISS_RIGHT = Counter +.build() + .name("pulsar_ml_cache_pendingreads_matched_overlapping_miss_right") +.help
[pulsar] branch master updated: [fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. (#17308)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9529850bd48 [fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. (#17308) 9529850bd48 is described below commit 9529850bd48557dcba124a157a69b75b8f41da3b Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com> AuthorDate: Thu Sep 1 00:57:30 2022 -0700 [fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. (#17308) --- .../broker/service/persistent/PersistentTopic.java | 7 + .../broker/admin/AdminApiMultiBrokersTest.java | 98 +++ .../apache/pulsar/broker/admin/NamespacesTest.java | 1 + .../broker/service/ExclusiveProducerTest.java | 8 +- .../pulsar/client/impl/TopicsConsumerImplTest.java | 28 .../integration/topics/TestTopicDeletion.java | 183 + 6 files changed, 296 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c8cd487c5ee..4d63ddafd01 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1139,6 +1139,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal .map(PersistentSubscription::getName).toList(); return FutureUtil.failedFuture( new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs)); +} else if (TopicName.get(topic).isPartitioned() +&& (getProducers().size() > 0 || getNumberOfConsumers() > 0) +&& getBrokerService().isAllowAutoTopicCreation(topic)) { +// to avoid inconsistent metadata as a result +return FutureUtil.failedFuture( +new TopicBusyException("Partitioned topic has active consumers or producers and " ++ "auto-creation of topic is allowed")); } fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java index 2cbff955ecf..5a0bde6f913 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.admin; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; @@ -26,6 +28,8 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.pulsar.broker.MultiBrokerBaseTest; @@ -33,9 +37,16 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.internal.TopicsImpl; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -54,6 +65,7 @@ public class AdminApiMultiBrokersTest extends MultiBrokerBaseTest { @Override protected void doInitConf() throws Exception { super.doInitConf(); +this.conf.setManagedLedgerMaxEntriesPerLedger(10); } @Override @@ -122,4 +134,90 @@ public class AdminApiMultiBrokersTest extends MultiBrokerBaseTest { Assert.assertEquals(lookupResultSet.size(), 1); } +@Test +p
[pulsar] branch branch-2.11 updated: KCA: handle kafka's logical schemas (#16485)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new bdd63cad439 KCA: handle kafka's logical schemas (#16485) bdd63cad439 is described below commit bdd63cad439c9f39a5024a1bb3d8fceb6e0bb721 Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com> AuthorDate: Wed Aug 31 23:40:03 2022 -0700 KCA: handle kafka's logical schemas (#16485) (cherry picked from commit fe68a8e872ff39369d5d401c8fc68da866c9b2dd) --- .../io/kafka/connect/schema/KafkaConnectData.java | 58 ++ .../connect/schema/PulsarSchemaToKafkaSchema.java | 82 ++-- .../io/kafka/connect/KafkaConnectSinkTest.java | 219 - 3 files changed, 290 insertions(+), 69 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java index 671495c6df6..557cfbb9dd8 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java @@ -30,9 +30,13 @@ import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericData; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.DataException; import org.apache.pulsar.client.api.schema.GenericRecord; @@ -122,6 +126,34 @@ public class KafkaConnectData { } public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) { +// special case for a few classes defined in org.apache.kafka.connect.data +// and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema +if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) { +if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) { +if (nativeObject instanceof java.util.Date) { +return nativeObject; +} +return Timestamp.toLogical(kafkaSchema, ((Number) nativeObject).longValue()); +} else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) { +if (nativeObject instanceof java.util.Date) { +return nativeObject; +} +return Date.toLogical(kafkaSchema, ((Number) nativeObject).intValue()); +} else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) { +if (nativeObject instanceof java.util.Date) { +return nativeObject; +} +return Time.toLogical(kafkaSchema, ((Number) nativeObject).intValue()); +} else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) { +if (nativeObject instanceof java.math.BigDecimal) { +return nativeObject; +} +return Decimal.toLogical(kafkaSchema, (byte[]) nativeObject); +} +throw new IllegalStateException("Unsupported Kafka Logical Schema " + kafkaSchema.name() ++ " for value " + nativeObject); +} + if (nativeObject instanceof Number) { // This is needed in case // jackson decided to fit value into some other type internally @@ -242,6 +274,32 @@ public class KafkaConnectData { return defaultOrThrow(kafkaSchema); } +// special case for a few classes defined in org.apache.kafka.connect.data +// and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema +// time/date as String not supported as the format to parse is not clear +// (add it as a config param?) +if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) { +if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) { +return Timestamp.toLogical(kafkaSchema, jsonNode.longValue()); +} else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) { +return Date.toLogical(kafkaSchema, jsonNode.intValue()); +} else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) { +return Time.toLogical(kafkaSchema, jsonNode.intValue()); +} else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) { +if (jsonNode.isNumber()) { +
[pulsar] branch master updated: KCA: handle kafka's logical schemas (#16485)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new fe68a8e872f KCA: handle kafka's logical schemas (#16485) fe68a8e872f is described below commit fe68a8e872ff39369d5d401c8fc68da866c9b2dd Author: Andrey Yegorov <8622884+dl...@users.noreply.github.com> AuthorDate: Wed Aug 31 23:40:03 2022 -0700 KCA: handle kafka's logical schemas (#16485) --- .../io/kafka/connect/schema/KafkaConnectData.java | 58 ++ .../connect/schema/PulsarSchemaToKafkaSchema.java | 82 ++-- .../io/kafka/connect/KafkaConnectSinkTest.java | 219 - 3 files changed, 290 insertions(+), 69 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java index 671495c6df6..557cfbb9dd8 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java @@ -30,9 +30,13 @@ import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericData; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.DataException; import org.apache.pulsar.client.api.schema.GenericRecord; @@ -122,6 +126,34 @@ public class KafkaConnectData { } public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) { +// special case for a few classes defined in org.apache.kafka.connect.data +// and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema +if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) { +if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) { +if (nativeObject instanceof java.util.Date) { +return nativeObject; +} +return Timestamp.toLogical(kafkaSchema, ((Number) nativeObject).longValue()); +} else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) { +if (nativeObject instanceof java.util.Date) { +return nativeObject; +} +return Date.toLogical(kafkaSchema, ((Number) nativeObject).intValue()); +} else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) { +if (nativeObject instanceof java.util.Date) { +return nativeObject; +} +return Time.toLogical(kafkaSchema, ((Number) nativeObject).intValue()); +} else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) { +if (nativeObject instanceof java.math.BigDecimal) { +return nativeObject; +} +return Decimal.toLogical(kafkaSchema, (byte[]) nativeObject); +} +throw new IllegalStateException("Unsupported Kafka Logical Schema " + kafkaSchema.name() ++ " for value " + nativeObject); +} + if (nativeObject instanceof Number) { // This is needed in case // jackson decided to fit value into some other type internally @@ -242,6 +274,32 @@ public class KafkaConnectData { return defaultOrThrow(kafkaSchema); } +// special case for a few classes defined in org.apache.kafka.connect.data +// and listed as LOGICAL_TYPE_CLASSES in org.apache.kafka.connect.data.ConnectSchema +// time/date as String not supported as the format to parse is not clear +// (add it as a config param?) +if (PulsarSchemaToKafkaSchema.matchesToKafkaLogicalSchema(kafkaSchema)) { +if (Timestamp.LOGICAL_NAME.equals(kafkaSchema.name())) { +return Timestamp.toLogical(kafkaSchema, jsonNode.longValue()); +} else if (Date.LOGICAL_NAME.equals(kafkaSchema.name())) { +return Date.toLogical(kafkaSchema, jsonNode.intValue()); +} else if (Time.LOGICAL_NAME.equals(kafkaSchema.name())) { +return Time.toLogical(kafkaSchema, jsonNode.intValue()); +} else if (Decimal.LOGICAL_NAME.equals(kafkaSchema.name())) { +if (jsonNode.isNumber()) { +return jsonNode.decimalValue(); +
[pulsar] branch master updated: PIP-201 : Extensions mechanism for Pulsar Admin CLI tools (#17158)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 57bcc977ce9 PIP-201 : Extensions mechanism for Pulsar Admin CLI tools (#17158) 57bcc977ce9 is described below commit 57bcc977ce956d2bee120d1a88a9f1cb509fe5ec Author: Enrico Olivelli AuthorDate: Wed Aug 31 11:20:36 2022 +0200 PIP-201 : Extensions mechanism for Pulsar Admin CLI tools (#17158) --- bin/pulsar-admin-common.sh | 2 +- conf/client.conf | 6 + pom.xml| 4 + .../pom.xml| 58 ++ .../cli/extensions/CommandExecutionContext.java| 33 +++ .../pulsar/admin/cli/extensions/CustomCommand.java | 56 ++ .../admin/cli/extensions/CustomCommandFactory.java | 35 .../admin/cli/extensions/CustomCommandGroup.java | 47 + .../admin/cli/extensions/ParameterDescriptor.java | 38 .../pulsar/admin/cli/extensions/ParameterType.java | 26 +++ .../pulsar/admin/cli/extensions/package-info.java | 19 ++ pulsar-client-tools-customcommand-example/pom.xml | 77 +++ .../admin/cli/examples/MyCommandFactory.java | 151 ++ .../META-INF/services/command_factory.yml | 19 ++ pulsar-client-tools-test/pom.xml | 26 +++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 127 pulsar-client-tools/pom.xml| 10 + .../org/apache/pulsar/admin/cli/CliCommand.java| 2 +- .../pulsar/admin/cli/CmdGenerateDocument.java | 2 +- .../pulsar/admin/cli/CustomCommandsUtils.java | 223 + .../apache/pulsar/admin/cli/PulsarAdminTool.java | 63 -- .../cli/utils/CustomCommandFactoryDefinition.java | 37 .../cli/utils/CustomCommandFactoryDefinitions.java | 28 +++ .../cli/utils/CustomCommandFactoryMetaData.java| 37 .../cli/utils/CustomCommandFactoryProvider.java| 173 25 files changed, 1238 insertions(+), 61 deletions(-) diff --git a/bin/pulsar-admin-common.sh b/bin/pulsar-admin-common.sh index fdfed60beda..7d4b0d861bf 100755 --- a/bin/pulsar-admin-common.sh +++ b/bin/pulsar-admin-common.sh @@ -95,7 +95,7 @@ IS_JAVA_8=`$JAVA -version 2>&1 |grep version|grep '"1\.8'` # Start --add-opens options # '--add-opens' option is not supported in jdk8 if [[ -z "$IS_JAVA_8" ]]; then - OPTS="$OPTS --add-opens java.base/sun.net=ALL-UNNAMED" + OPTS="$OPTS --add-opens java.base/sun.net=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED" fi OPTS="-cp $PULSAR_CLASSPATH $OPTS" diff --git a/conf/client.conf b/conf/client.conf index 50d9bf374c1..ea1d339a09c 100644 --- a/conf/client.conf +++ b/conf/client.conf @@ -87,3 +87,9 @@ tlsKeyStorePassword= # When TLS authentication with CACert is used, the valid value is either OPENSSL or JDK. # When TLS authentication with KeyStore is used, available options can be SunJSSE, Conscrypt and so on. webserviceTlsProvider= + + + +# Pulsar Admin Custom Commands +#customCommandFactoriesDirectory=commandFactories +#customCommandFactories= diff --git a/pom.xml b/pom.xml index 26273cbf030..e861f9ae0a9 100644 --- a/pom.xml +++ b/pom.xml @@ -2089,7 +2089,9 @@ flexible messaging model and an intuitive client API. pulsar-client-admin-api pulsar-client-admin pulsar-client-admin-shaded +pulsar-client-tools-api pulsar-client-tools +pulsar-client-tools-customcommand-example pulsar-client-tools-test pulsar-client-all pulsar-websocket @@ -2152,7 +2154,9 @@ flexible messaging model and an intuitive client API. pulsar-client pulsar-client-admin-api pulsar-client-admin +pulsar-client-tools-api pulsar-client-tools +pulsar-client-tools-customcommand-example pulsar-client-tools-test pulsar-websocket pulsar-proxy diff --git a/pulsar-client-tools-test/pom.xml b/pulsar-client-tools-api/pom.xml similarity index 60% copy from pulsar-client-tools-test/pom.xml copy to pulsar-client-tools-api/pom.xml index 7d06e42b05f..302f184e9c2 100644 --- a/pulsar-client-tools-test/pom.xml +++ b/pulsar-client-tools-api/pom.xml @@ -19,7 +19,7 @@ --> http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> 4.0.0 org.apache.pulsar @@ -28,43 +28,27 @@ .. - pulsar-client-tools-test - Pulsar Client Tools Test - Pulsar Client Tools Test + p
[pulsar] branch master updated: PIP-180 Part III : Add shadow topic in TopicPolicy (#17242)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 4434bf28f9f PIP-180 Part III : Add shadow topic in TopicPolicy (#17242) 4434bf28f9f is described below commit 4434bf28f9ff10d76c356ccd649ff1c2dd720f7d Author: JiangHaiting AuthorDate: Wed Aug 31 15:29:13 2022 +0800 PIP-180 Part III : Add shadow topic in TopicPolicy (#17242) --- .../broker/admin/impl/PersistentTopicsBase.java| 62 + .../pulsar/broker/admin/v2/PersistentTopics.java | 79 ++ .../pulsar/broker/admin/TopicPoliciesTest.java | 35 ++ .../org/apache/pulsar/client/admin/Topics.java | 44 .../pulsar/client/admin/internal/TopicsImpl.java | 36 ++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 10 +++ .../org/apache/pulsar/admin/cli/CmdTopics.java | 45 .../pulsar/common/policies/data/PolicyName.java| 3 +- .../pulsar/common/policies/data/TopicPolicies.java | 1 + 9 files changed, 314 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 6ef6f662029..c002e8212ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; @@ -5382,4 +5383,65 @@ public class PersistentTopicsBase extends AdminResource { return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); })); } + +protected CompletableFuture validateShadowTopics(List shadowTopics) { +List> futures = new ArrayList<>(shadowTopics.size()); +for (String shadowTopic : shadowTopics) { +try { +TopicName shadowTopicName = TopicName.get(shadowTopic); +if (!shadowTopicName.isPersistent()) { +return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, +"Only persistent topic can be set as shadow topic")); +} + futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName) +.thenAccept(isExists -> { +if (!isExists) { +throw new RestException(Status.PRECONDITION_FAILED, +"Shadow topic [" + shadowTopic + "] not exists."); +} +})); +} catch (IllegalArgumentException e) { +return FutureUtil.failedFuture(new RestException(Status.FORBIDDEN, +"Invalid shadow topic name: " + shadowTopic)); +} +} +return FutureUtil.waitForAll(futures); +} + +protected CompletableFuture internalSetShadowTopic(List shadowTopics) { +if (!topicName.isPersistent()) { +return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, +"Only persistent source topic is supported with shadow topics.")); +} +if (CollectionUtils.isEmpty(shadowTopics)) { +return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, +"Cannot specify empty shadow topics, please use remove command instead.")); +} +return validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, PolicyOperation.WRITE) +.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) +.thenCompose(__ -> validateShadowTopics(shadowTopics)) +.thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName)) +.thenCompose(op -> { +TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); +topicPolicies.setShadowTopics(shadowTopics); +return pulsar().getTopicPoliciesService(). +updateTopicPoliciesAsync(topicName, topicPolicies); +}); +
[pulsar] branch master updated: [feature][broker] Allow to configure the entry filters per namespace and per topic (#17153)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 2f4af65c1de [feature][broker] Allow to configure the entry filters per namespace and per topic (#17153) 2f4af65c1de is described below commit 2f4af65c1de662211645552036ff06d5689fc716 Author: gaozhangmin AuthorDate: Tue Aug 30 20:41:42 2022 +0800 [feature][broker] Allow to configure the entry filters per namespace and per topic (#17153) --- conf/broker.conf | 3 + .../apache/pulsar/broker/ServiceConfiguration.java | 7 ++ .../pulsar/broker/admin/impl/NamespacesBase.java | 10 ++ .../broker/admin/impl/PersistentTopicsBase.java| 47 + .../apache/pulsar/broker/admin/v2/Namespaces.java | 62 .../broker/admin/v2/NonPersistentTopics.java | 92 + .../pulsar/broker/admin/v2/PersistentTopics.java | 91 + .../pulsar/broker/service/AbstractTopic.java | 16 +++ .../pulsar/broker/service/BrokerService.java | 31 ++ .../pulsar/broker/service/EntryFilterSupport.java | 19 +++- .../org/apache/pulsar/broker/service/Topic.java| 7 ++ .../service/nonpersistent/NonPersistentTopic.java | 10 ++ .../broker/service/persistent/PersistentTopic.java | 11 +++ .../broker/service/plugin/EntryFilterProvider.java | 23 + .../apache/pulsar/broker/admin/AdminApi2Test.java | 34 +++ .../broker/service/AbstractBaseDispatcherTest.java | 5 +- .../broker/service/plugin/FilterEntryTest.java | 85 +++- .../pulsar/broker/stats/ConsumerStatsTest.java | 10 +- .../org/apache/pulsar/client/admin/Namespaces.java | 48 - .../apache/pulsar/client/admin/TopicPolicies.java | 51 ++ .../pulsar/common/policies/data/EntryFilters.java | 47 +++-- .../pulsar/common/policies/data/Policies.java | 8 +- .../client/admin/internal/NamespacesImpl.java | 38 +++ .../client/admin/internal/TopicPoliciesImpl.java | 39 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 45 + .../apache/pulsar/admin/cli/CmdTopicPolicies.java | 61 .../policies/data/HierarchyTopicPolicies.java | 2 + .../pulsar/common/policies/data/PolicyName.java| 3 +- .../pulsar/common/policies/data/TopicPolicies.java | 1 + site2/docs/admin-api-namespaces.md | 109 + 30 files changed, 965 insertions(+), 50 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 957b594338d..6f51c96a008 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -453,6 +453,9 @@ entryFilterNames= # The directory for all the entry filter implementations entryFiltersDirectory= +# Whether allow topic level entry filters policies overrides broker configuration. +allowOverrideEntryFilters=false + # Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic maxConcurrentLookupRequest=5 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index df9ac8c7762..fffa57ff4ab 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1103,6 +1103,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private String entryFiltersDirectory = ""; +@FieldContext( +category = CATEGORY_SERVER, +dynamic = true, +doc = "Whether allow topic level entry filters policies overrides broker configuration." +) +private boolean allowOverrideEntryFilters = false; + @FieldContext( category = CATEGORY_SERVER, doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 08c968a2d5d..0b25599c126 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -83,6 +83,7 @@ import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.comm
[pulsar] branch master updated: [refactor][broker]refactor reflection method in loadbalance (#17313)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 0e149ef0484 [refactor][broker]refactor reflection method in loadbalance (#17313) 0e149ef0484 is described below commit 0e149ef0484755d42e214f03003237299433c89a Author: Qiang Huang AuthorDate: Mon Aug 29 17:05:17 2022 +0800 [refactor][broker]refactor reflection method in loadbalance (#17313) --- .../apache/pulsar/broker/loadbalance/LoadManager.java| 5 +++-- .../broker/loadbalance/impl/ModularLoadManagerImpl.java | 16 ++-- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java index e26b27950ec..fc0b0ddfd35 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java @@ -29,6 +29,7 @@ import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,9 +132,9 @@ public interface LoadManager { static LoadManager create(final PulsarService pulsar) { try { final ServiceConfiguration conf = pulsar.getConfiguration(); -final Class loadManagerClass = Class.forName(conf.getLoadManagerClassName()); // Assume there is a constructor with one argument of PulsarService. -final Object loadManagerInstance = loadManagerClass.getDeclaredConstructor().newInstance(); +final Object loadManagerInstance = Reflections.createInstance(conf.getLoadManagerClassName(), +Thread.currentThread().getContextClassLoader()); if (loadManagerInstance instanceof LoadManager) { final LoadManager casted = (LoadManager) loadManagerInstance; casted.initialize(pulsar); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 0c86602695a..78d77102392 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -70,6 +70,7 @@ import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.ResourceQuota; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.Reflections; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.apache.pulsar.metadata.api.MetadataCache; @@ -300,18 +301,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager { private LoadSheddingStrategy createLoadSheddingStrategy() { try { -Class loadSheddingClass = Class.forName(conf.getLoadBalancerLoadSheddingStrategy()); -Object loadSheddingInstance = loadSheddingClass.getDeclaredConstructor().newInstance(); -if (loadSheddingInstance instanceof LoadSheddingStrategy) { -return (LoadSheddingStrategy) loadSheddingInstance; -} else { -log.error("create load shedding strategy failed. using OverloadShedder instead."); -return new OverloadShedder(); -} +return Reflections.createInstance(conf.getLoadBalancerLoadSheddingStrategy(), LoadSheddingStrategy.class, +Thread.currentThread().getContextClassLoader()); } catch (Exception e) { -log.error("Error when trying to create load shedding strategy: ", e); +log.error("Error when trying to create load shedding strategy: {}", +conf.getLoadBalancerLoadPlacementStrategy(), e); } - +log.error("create load shedding strategy failed. using OverloadShedder instead."); return new OverloadShedder(); }
[pulsar] branch master updated: [cleanup][owasp] Supress false positive netty-tcnative (#17282)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 409bb128102 [cleanup][owasp] Supress false positive netty-tcnative (#17282) 409bb128102 is described below commit 409bb128102308da188f28b49adf7da8bb58ea2a Author: Nicolò Boschi AuthorDate: Mon Aug 29 08:45:25 2022 +0200 [cleanup][owasp] Supress false positive netty-tcnative (#17282) --- src/owasp-dependency-check-false-positives.xml | 7 +++ 1 file changed, 7 insertions(+) diff --git a/src/owasp-dependency-check-false-positives.xml b/src/owasp-dependency-check-false-positives.xml index 54b703f0108..e69afb31a68 100644 --- a/src/owasp-dependency-check-false-positives.xml +++ b/src/owasp-dependency-check-false-positives.xml @@ -54,6 +54,13 @@ ^pkg:maven/io\.netty/netty\-tcnative\-classes@.*$ cpe:/a:netty:netty + + +^pkg:maven/io\.netty/netty\-tcnative\-boringssl\-static@.*$ +cpe:/a:chromium_project:chromium +
[pulsar] branch master updated: Add CLI command to get available built-in functions (#16822)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 34eb74d37b6 Add CLI command to get available built-in functions (#16822) 34eb74d37b6 is described below commit 34eb74d37b69ef4320bd3f7d3bd323d8fda9564a Author: Christophe Bornet AuthorDate: Sun Aug 28 07:58:29 2022 +0200 Add CLI command to get available built-in functions (#16822) --- .../pulsar/broker/admin/impl/FunctionsBase.java| 18 + .../org/apache/pulsar/client/admin/Functions.java | 14 ++ .../common/functions/FunctionDefinition.java | 0 .../client/admin/internal/FunctionsImpl.java | 30 ++ .../org/apache/pulsar/admin/cli/CmdFunctions.java | 15 +++ .../pulsar/functions/worker/FunctionsManager.java | 8 ++ .../functions/worker/rest/api/FunctionsImpl.java | 14 ++ .../worker/rest/api/v3/FunctionsApiV3Resource.java | 18 + .../functions/worker/service/api/Functions.java| 6 - 9 files changed, 122 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index f43e8227cfc..d5380407491 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -41,6 +41,7 @@ import javax.ws.rs.core.StreamingOutput; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.FunctionDefinition; import org.apache.pulsar.common.functions.FunctionState; import org.apache.pulsar.common.functions.UpdateOptionsImpl; import org.apache.pulsar.common.io.ConnectorDefinition; @@ -748,6 +749,23 @@ public class FunctionsBase extends AdminResource { functions().reloadBuiltinFunctions(clientAppId(), clientAuthData()); } +@GET +@ApiOperation( +value = "Fetches the list of built-in Pulsar functions", +response = FunctionDefinition.class, +responseContainer = "List" +) +@ApiResponses(value = { +@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"), +@ApiResponse(code = 400, message = "Invalid request"), +@ApiResponse(code = 408, message = "Request timeout") +}) +@Path("/builtins") +@Produces(MediaType.APPLICATION_JSON) +public List getBuiltinFunction() { +return functions().getBuiltinFunctions(clientAppId(), clientAuthData()); +} + @PUT @ApiOperation(value = "Updates a Pulsar Function on the worker leader", hidden = true) @ApiResponses(value = { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java index bbbfcb85268..5bf289f9b65 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.functions.FunctionDefinition; import org.apache.pulsar.common.functions.FunctionState; import org.apache.pulsar.common.functions.UpdateOptions; import org.apache.pulsar.common.io.ConnectorDefinition; @@ -792,6 +793,19 @@ public interface Functions { @Deprecated Set getSinks() throws PulsarAdminException; +/** + * Fetches a list of supported Pulsar Functions currently running in cluster mode. + * + * @throws PulsarAdminException + * Unexpected error + */ +List getBuiltInFunctions() throws PulsarAdminException; + +/** + * Fetches a list of supported Pulsar Functions currently running in cluster mode asynchronously. + */ +CompletableFuture> getBuiltInFunctionsAsync(); + /** * Fetch the current state associated with a Pulsar Function. * diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionDefinition.java similarity index 100% rename from pulsar-common/src/main/java/org/apache/pulsar/common/func
[pulsar] branch master updated: Add CLI command to reload built-in functions (#17102)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1d65a886b3a Add CLI command to reload built-in functions (#17102) 1d65a886b3a is described below commit 1d65a886b3aaf7a96d680499f35abbc013bf401f Author: Christophe Bornet AuthorDate: Fri Aug 26 15:52:08 2022 +0200 Add CLI command to reload built-in functions (#17102) --- .../org/apache/pulsar/broker/admin/impl/FunctionsBase.java | 14 ++ .../java/org/apache/pulsar/client/admin/Functions.java | 13 + .../apache/pulsar/client/admin/internal/FunctionsImpl.java | 11 +++ .../java/org/apache/pulsar/admin/cli/CmdFunctions.java | 10 ++ .../pulsar/functions/worker/rest/api/FunctionsImpl.java| 13 + .../worker/rest/api/v3/FunctionsApiV3Resource.java | 14 ++ .../pulsar/functions/worker/service/api/Functions.java | 4 7 files changed, 79 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index f1c4c105de6..f43e8227cfc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -734,6 +734,20 @@ public class FunctionsBase extends AdminResource { return functions().getListOfConnectors(); } +@POST +@ApiOperation( +value = "Reload the built-in Functions" +) +@ApiResponses(value = { +@ApiResponse(code = 401, message = "This operation requires super-user access"), +@ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."), +@ApiResponse(code = 500, message = "Internal server error") +}) +@Path("/builtins/reload") +public void reloadBuiltinFunctions() throws IOException { +functions().reloadBuiltinFunctions(clientAppId(), clientAuthData()); +} + @PUT @ApiOperation(value = "Updates a Pulsar Function on the worker leader", hidden = true) @ApiResponses(value = { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java index 82c47e5d97a..bbbfcb85268 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -893,4 +893,17 @@ public interface Functions { */ CompletableFuture putFunctionStateAsync( String tenant, String namespace, String function, FunctionState state); + +/** + * Reload the available built-in functions. + * + * @throws PulsarAdminException + * Unexpected error + */ +void reloadBuiltInFunctions() throws PulsarAdminException; + +/** + * Reload the available built-in functions. + */ +CompletableFuture reloadBuiltInFunctionsAsync(); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 0686917a381..0e96da58432 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -841,4 +841,15 @@ public class FunctionsImpl extends ComponentResource implements Functions { } return future; } + +@Override +public void reloadBuiltInFunctions() throws PulsarAdminException { +sync(this::reloadBuiltInFunctionsAsync); +} + +@Override +public CompletableFuture reloadBuiltInFunctionsAsync() { +WebTarget path = functions.path("builtins/reload"); +return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); +} } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index c107f939813..b007106b2a8 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -1200,6 +1200,15 @@ public class CmdFunctions extends CmdBase { } } +@Parameters(commandDescription = "Reload the available built-in functions") +public class ReloadBuiltInFunctions extends CmdFunctions.BaseComm
[pulsar] branch master updated: Add messgae_id in CommandSend for Shadow Topic (#17195)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 6798f88f472 Add messgae_id in CommandSend for Shadow Topic (#17195) 6798f88f472 is described below commit 6798f88f472897b1756260afe8947af3facef0f2 Author: JiangHaiting AuthorDate: Fri Aug 26 15:59:05 2022 +0800 Add messgae_id in CommandSend for Shadow Topic (#17195) --- .../client/impl/ProduceWithMessageIdTest.java | 119 + .../org/apache/pulsar/client/impl/MessageImpl.java | 2 +- .../apache/pulsar/client/impl/ProducerImpl.java| 20 +++- .../apache/pulsar/common/protocol/Commands.java| 18 +++- pulsar-common/src/main/proto/PulsarApi.proto | 3 + 5 files changed, 153 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java new file mode 100644 index 000..e03f29677b9 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProduceWithMessageIdTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MockBrokerService; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.protocol.Commands; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = "broker-impl") +@Slf4j +public class ProduceWithMessageIdTest { +MockBrokerService mockBrokerService; + +@BeforeClass(alwaysRun = true) +public void setup() { +mockBrokerService = new MockBrokerService(); +mockBrokerService.start(); +} + +@AfterClass(alwaysRun = true) +public void teardown() { +if (mockBrokerService != null) { +mockBrokerService.stop(); +mockBrokerService = null; +} +} + +@Test +public void testSend() throws Exception { +long ledgerId = 123; +long entryId = 456; +mockBrokerService.setHandleSend((ctx, send, headersAndPayload) -> { +Assert.assertTrue(send.hasMessageId()); +log.info("receive messageId in ServerCnx, id={}", send.getMessageId()); +Assert.assertEquals(send.getMessageId().getLedgerId(), ledgerId); +Assert.assertEquals(send.getMessageId().getEntryId(), entryId); +ctx.writeAndFlush( +Commands.newSendReceipt(send.getProducerId(), send.getSequenceId(), 0, ledgerId, entryId)); +}); + +@Cleanup +PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder() +.serviceUrl(mockBrokerService.getBrokerAddress()) +.build(); + +String topic = "persistent://public/default/t1"; +ProducerImpl producer = +(ProducerImpl) client.newProducer().topic(topic).enableBatching(false).create(); + +MessageMetadata metadata = new MessageMetadata(); +ByteBuffer buffer = ByteBuffer.wrap("data".getBytes(StandardCharsets.UTF_8)); +MessageImpl msg = MessageImpl.create(metadata, buffer, Schema.BYTES, topic); +//set message id here. +msg.setMessageId(new MessageIdImpl(ledgerId, entryId, -1)); + +AtomicBoolean result = new AtomicBoolean(false); +producer.sendAsync(msg, new SendCallback() { +@Override +public void sendComp
[pulsar] branch master updated: Fix swagger annotation for analyzeBacklog (#17001)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 2611b17b702 Fix swagger annotation for analyzeBacklog (#17001) 2611b17b702 is described below commit 2611b17b70237009c0fc07e45ad89cedf9c70800 Author: Michael Marshall AuthorDate: Thu Aug 25 22:53:39 2022 -0700 Fix swagger annotation for analyzeBacklog (#17001) --- .../main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 7ce6fa81366..56131a1d5fc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1746,7 +1746,7 @@ public class PersistentTopics extends PersistentTopicsBase { @PathParam("subName") String encodedSubName, @ApiParam(name = "position", value = "messageId to start the analysis") ResetCursorData position, -@ApiParam(value = "Is authentication required to perform this operation") +@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { Optional positionImpl;
[pulsar] branch master updated (cf8d441882c -> b58439e835e)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from cf8d441882c [fix][client-cpp] Fix getLastMessageIdAsync returns ResultNotConnected after seek (#16943) add b58439e835e [bug] pulsar-perf consume, do not fail in case of reading data older than 10 days (#17160) No new revisions were added by this update. Summary of changes: .../java/org/apache/pulsar/testclient/PerformanceConsumer.java| 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-)
[pulsar] branch master updated (023c9b95af3 -> 085a7998dc3)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 023c9b95af3 [fix][cli] Pulsar shell: handle ctrl-d (#17204) add 085a7998dc3 Pulsar-perf produce add possibility to set eventTime on messages (#17189) No new revisions were added by this update. Summary of changes: .../java/org/apache/pulsar/testclient/PerformanceProducer.java | 7 +++ 1 file changed, 7 insertions(+)
[pulsar] branch master updated: [fix][cli] Pulsar shell: handle ctrl-d (#17204)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 023c9b95af3 [fix][cli] Pulsar shell: handle ctrl-d (#17204) 023c9b95af3 is described below commit 023c9b95af36f3cbccf32f93e8f8738565fb87f7 Author: Nicolò Boschi AuthorDate: Mon Aug 22 20:37:02 2022 +0200 [fix][cli] Pulsar shell: handle ctrl-d (#17204) --- .../src/main/java/org/apache/pulsar/shell/PulsarShell.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/PulsarShell.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/PulsarShell.java index e92f5a12ca0..4195edaf1f2 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/PulsarShell.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/shell/PulsarShell.java @@ -395,7 +395,8 @@ public class PulsarShell { try { final String line = reader.readLine().trim(); return substituteVariables(reader.parseLine(line), variables); -} catch (org.jline.reader.UserInterruptException userInterruptException) { +} catch (org.jline.reader.UserInterruptException +| org.jline.reader.EndOfFileException userInterruptException) { throw new InterruptShellException(); } };
[pulsar] branch master updated: [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions (streaming dispatcher) (#17163)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7f6972766f1 [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions (streaming dispatcher) (#17163) 7f6972766f1 is described below commit 7f6972766f1f8a989f477cbb3085d0660be53bd8 Author: Nicolò Boschi AuthorDate: Sat Aug 20 09:03:09 2022 +0200 [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions (streaming dispatcher) (#17163) * [fix][broker] Avoid messages being repeatedly replayed with SHARED subscriptions * move test to broker-api --- .../persistent/PersistentStreamingDispatcherMultipleConsumers.java | 5 + .../SimpleProducerConsumerTestStreamingDispatcherTest.java | 2 +- .../org/apache/pulsar/client/api/SimpleProducerConsumerTest.java | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java index 3cbc43cfdfe..26647dfc1d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java @@ -155,6 +155,11 @@ public class PersistentStreamingDispatcherMultipleConsumers extends PersistentDi @Override public synchronized void readMoreEntries() { +if (sendInProgress) { +// we cannot read more entries while sending the previous batch +// otherwise we could re-read the same entries and send duplicates +return; +} // totalAvailablePermits may be updated by other threads int currentTotalAvailablePermits = totalAvailablePermits; if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java index 7ed7981c937..08b680b61b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/SimpleProducerConsumerTestStreamingDispatcherTest.java @@ -25,7 +25,7 @@ import org.testng.annotations.Test; /** * SimpleProducerConsumerTest with {@link StreamingDispatcher} */ -@Test(groups = "flaky") +@Test(groups = "broker-api") public class SimpleProducerConsumerTestStreamingDispatcherTest extends SimpleProducerConsumerTest { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 2f572a841b0..e19b712a85b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -113,7 +113,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -@Test(groups = "flaky") +@Test(groups = "broker-api") public class SimpleProducerConsumerTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); private static final int TIMEOUT_MULTIPLIER = Integer.getInteger("SimpleProducerConsumerTest.receive.timeout.multiplier", 1);
[pulsar] branch branch-2.10 updated: [improve][proxy] Consolidate Netty channel flushes to mitigate syscall overhead (#16372)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new ba15416c126 [improve][proxy] Consolidate Netty channel flushes to mitigate syscall overhead (#16372) ba15416c126 is described below commit ba15416c126a178e2569dbead7bcaf41c777817b Author: lipenghui AuthorDate: Tue Jul 5 14:29:04 2022 +0800 [improve][proxy] Consolidate Netty channel flushes to mitigate syscall overhead (#16372) ### Motivation Follow change for https://github.com/apache/pulsar/pull/16361 which commented at https://github.com/apache/pulsar/pull/16361#issuecomment-1173241406 (cherry picked from commit 10db821e8e369efd6cba05eabda0e41ef346cab5) --- .../main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java | 3 +++ .../java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 9edb16a5e65..ca9e6cdb2ae 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -37,6 +37,7 @@ import io.netty.handler.codec.haproxy.HAProxyCommand; import io.netty.handler.codec.haproxy.HAProxyMessage; import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol; +import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslProvider; @@ -178,6 +179,8 @@ public class DirectProxyHandler { b.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { +ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(1024, +true)); if (tlsEnabledWithBroker) { String host = targetBrokerAddress.getHostString(); int port = targetBrokerAddress.getPort(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java index 2fd0156155d..fc7c78d6a24 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java @@ -21,6 +21,7 @@ package org.apache.pulsar.proxy.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslProvider; @@ -91,6 +92,8 @@ public class ServiceChannelInitializer extends ChannelInitializer @Override protected void initChannel(SocketChannel ch) throws Exception { +ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(1024, +true)); if (serverSslCtxRefresher != null && this.enableTls) { SslContext sslContext = serverSslCtxRefresher.get(); if (sslContext != null) {
[pulsar] branch master updated: [cleanup][functions] Remove unused code (#16472)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new fe2eb6ac463 [cleanup][functions] Remove unused code (#16472) fe2eb6ac463 is described below commit fe2eb6ac46390ced5fe76e25f1547926a71a8e41 Author: Christophe Bornet AuthorDate: Tue Aug 16 12:28:19 2022 +0200 [cleanup][functions] Remove unused code (#16472) packageUrl is not set anymore in FunctionDetails so code that depend on it can be removed. In the case of KubernetesRuntime we currently download the function itself and not the package. This change doesn't modify this behaviour. --- .../runtime/kubernetes/KubernetesRuntime.java | 41 +- .../runtime/kubernetes/KubernetesRuntimeTest.java | 37 --- 2 files changed, 1 insertion(+), 77 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index 02af86c7334..50dc975075f 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -88,7 +88,6 @@ import org.apache.pulsar.functions.runtime.RuntimeUtils; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.Actions; import org.apache.pulsar.functions.utils.FunctionCommon; -import org.apache.pulsar.packages.management.core.common.PackageType; /** * Kubernetes based runtime for running functions. @@ -855,13 +854,8 @@ public class KubernetesRuntime implements Runtime { } private List getDownloadCommand(Function.FunctionDetails functionDetails, String userCodeFilePath) { -if (Arrays.stream(PackageType.values()).anyMatch(type -> -functionDetails.getPackageUrl().startsWith(type.toString( { -return getPackageDownloadCommand(functionDetails.getPackageUrl(), userCodeFilePath); -} else { -return getDownloadCommand(functionDetails.getTenant(), functionDetails.getNamespace(), +return getDownloadCommand(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), userCodeFilePath); -} } private List getDownloadCommand(String tenant, String namespace, String name, String userCodeFilePath) { @@ -908,39 +902,6 @@ public class KubernetesRuntime implements Runtime { userCodeFilePath); } -private List getPackageDownloadCommand(String packageName, String userCodeFilePath) { -// add auth plugin and parameters if necessary -if (authenticationEnabled && authConfig != null) { -if (isNotBlank(authConfig.getClientAuthenticationPlugin()) -&& isNotBlank(authConfig.getClientAuthenticationParameters()) -&& instanceConfig.getFunctionAuthenticationSpec() != null) { -return Arrays.asList( -pulsarRootDir + configAdminCLI, -"--auth-plugin", -authConfig.getClientAuthenticationPlugin(), -"--auth-params", -authConfig.getClientAuthenticationParameters(), -"--admin-url", -pulsarAdminUrl, -"packages", -"download", -packageName, -"--path", -userCodeFilePath); -} -} - -return Arrays.asList( -pulsarRootDir + configAdminCLI, -"--admin-url", -pulsarAdminUrl, -"packages", -"download", -packageName, -"--path", -userCodeFilePath); -} - private static String setShardIdEnvironmentVariableCommand() { return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}", ENV_SHARD_ID, ENV_SHARD_ID); } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index 18fbe2e5160..df5c369303c 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -806,24 +
[pulsar] branch master updated: Fix rack awareness cache expiration race condition (#16825)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new e451806a715 Fix rack awareness cache expiration race condition (#16825) e451806a715 is described below commit e451806a715661fca81876579e0f078aca36c9d9 Author: Michael Marshall AuthorDate: Fri Jul 29 03:24:56 2022 -0500 Fix rack awareness cache expiration race condition (#16825) --- .../rackawareness/BookieRackAffinityMapping.java | 64 +++--- .../BookieRackAffinityMappingTest.java | 12 ++-- 2 files changed, 26 insertions(+), 50 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index ec4b7da250e..c0c29637114 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -26,9 +26,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy; import org.apache.bookkeeper.client.RackChangeNotifier; @@ -66,8 +64,8 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping private ITopologyAwareEnsemblePlacementPolicy rackawarePolicy = null; private List bookieAddressListLastTime = new ArrayList<>(); -private volatile BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration(); -private volatile Map bookieInfoMap = new HashMap<>(); +private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration(); +private Map bookieInfoMap = new HashMap<>(); public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException { MetadataStore store; @@ -110,15 +108,17 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping } @Override -public void setConf(Configuration conf) { +public synchronized void setConf(Configuration conf) { super.setConf(conf); MetadataStore store; try { store = createMetadataStore(conf); bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); -bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join(); -for (Map bookieMapping : bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() -.map(Map::values).orElse(Collections.emptyList())) { +store.registerListener(this::handleUpdates); +racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() +.orElseGet(BookiesRackConfiguration::new); +updateRacksWithHost(racksWithHost); +for (Map bookieMapping : racksWithHost.values()) { for (String address : bookieMapping.keySet()) { bookieAddressListLastTime.add(BookieId.parse(address)); } @@ -130,13 +130,9 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping } catch (InterruptedException | ExecutionException | MetadataException e) { throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); } -store.registerListener(this::handleUpdates); - -// A previous version of this code tried to eagerly load the cache. However, this is invalid -// in later versions of bookkeeper as when setConf is called, the bookieAddressResolver is not yet set } -private void updateRacksWithHost(BookiesRackConfiguration racks) { +private synchronized void updateRacksWithHost(BookiesRackConfiguration racks) { // In config z-node, the bookies are added in the `ip:port` notation, while BK will ask // for just the IP/hostname when trying to get the rack for a bookie. // To work around this issue, we insert in the map the bookie ip/hostname with same rack-info @@ -176,7 +172,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping } @Override -public List resolve(List bookieAddressList) { +public synchronized List resolve(List bookieAddressList) { List racks = new ArrayList<>(bookieAddressList.size()); for (String bookieAddress : bookieAddressList) { racks.add(getRack(bookieAddress)); @@ -185,32 +181,9 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
[pulsar] branch master updated: Issue 16802: fix Repeated messages of shared dispatcher (#16812)
This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 825b68db7be Issue 16802: fix Repeated messages of shared dispatcher (#16812) 825b68db7be is described below commit 825b68db7bed1c79af4b7b69b48bee76ebe75af5 Author: Enrico Olivelli AuthorDate: Thu Jul 28 21:03:43 2022 +0200 Issue 16802: fix Repeated messages of shared dispatcher (#16812) --- .../PersistentDispatcherMultipleConsumers.java | 47 ++ ...istentStickyKeyDispatcherMultipleConsumers.java | 13 ++-- .../service/persistent/DelayedDeliveryTest.java| 1 - ...ntStickyKeyDispatcherMultipleConsumersTest.java | 75 +- 4 files changed, 87 insertions(+), 49 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 71faeb7adba..cf58bfd43ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -89,7 +89,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected volatile PositionImpl minReplayedPosition = null; protected boolean shouldRewindBeforeReadingOrReplaying = false; protected final String name; - +protected boolean sendInProgress; protected static final AtomicIntegerFieldUpdater TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, @@ -240,6 +240,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } public synchronized void readMoreEntries() { +if (sendInProgress) { +// we cannot read more entries while sending the previous batch +// otherwise we could re-read the same entries and send duplicates +return; +} if (shouldPauseDeliveryForDelayTracker()) { return; } @@ -496,7 +501,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } @Override -public synchronized void readEntriesComplete(List entries, Object ctx) { +public final synchronized void readEntriesComplete(List entries, Object ctx) { ReadType readType = (ReadType) ctx; if (readType == ReadType.Normal) { havePendingRead = false; @@ -528,18 +533,39 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size()); } +// dispatch messages to a separate thread, but still in order for this subscription +// sendMessagesToConsumers is responsible for running broker-side filters +// that may be quite expensive if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) { -// dispatch messages to a separate thread, but still in order for this subscription -// sendMessagesToConsumers is responsible for running broker-side filters -// that may be quite expensive +// setting sendInProgress here, because sendMessagesToConsumers will be executed +// in a separate thread, and we want to prevent more reads +sendInProgress = true; dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries))); } else { sendMessagesToConsumers(readType, entries); } } -protected synchronized void sendMessagesToConsumers(ReadType readType, List entries) { +protected final synchronized void sendMessagesToConsumers(ReadType readType, List entries) { +sendInProgress = true; +boolean readMoreEntries; +try { +readMoreEntries = trySendMessagesToConsumers(readType, entries); +} finally { +sendInProgress = false; +} +if (readMoreEntries) { +readMoreEntries(); +} +} +/** + * Dispatch the messages to the Consumers. + * @return true if you want to trigger a new read. + * This method is overridden by other classes, please take a look to other implementations + * if you need to change it. + */ +protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) { if (needTrimAckedMessages()) { cursor.trimDeletedEntries(entries); } @@ -547,8 +573,7 @@ public class PersistentDispatcherMult