aahmed-se closed pull request #2045: [WIP] Add Embedded Pulsar Class
URL: https://github.com/apache/incubator-pulsar/pull/2045
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/EmbeddedPulsar.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/EmbeddedPulsar.java
new file mode 100644
index 0000000000..634c9fb9ed
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/EmbeddedPulsar.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar;
+
+import com.google.common.collect.Sets;
+import lombok.Builder;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Optional;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+@Builder(toBuilder = true)
+public class EmbeddedPulsar {
+
+    private static final Logger log = 
LoggerFactory.getLogger(EmbeddedPulsar.class);
+
+    private PulsarService broker;
+    private PulsarAdmin admin;
+    private LocalBookkeeperEnsemble bkEnsemble;
+    private ServiceConfiguration config;
+    private WorkerService fnWorkerService;
+    private String configFile;
+
+    @Builder.Default
+    private boolean wipeData = true;
+    @Builder.Default
+    private int numOfBk = 1;
+    @Builder.Default
+    private int zkPort = 2181;
+    @Builder.Default
+    private int bkPort = 3181;
+    @Builder.Default
+    private String zkDir = "data/standalone/zookeeper";
+    @Builder.Default
+    private String bkDir = "data/standalone/bookkeeper";
+    @Builder.Default
+    private boolean noBroker = false;
+    @Builder.Default
+    private boolean onlyBroker = false;
+    @Builder.Default
+    private String advertisedAddress = null;
+
+    public void start() throws Exception {
+
+        this.config = 
PulsarConfigurationLoader.create((ClassLoader.class.getResourceAsStream("/embedded.conf")),
 ServiceConfiguration.class);
+
+        String zkServers = "127.0.0.1";
+
+        if (advertisedAddress != null) {
+            // Use advertised address from command line
+            config.setAdvertisedAddress(advertisedAddress);
+            zkServers = advertisedAddress;
+        } else if (isBlank(config.getAdvertisedAddress())) {
+            // Use advertised address as local hostname
+            
config.setAdvertisedAddress(ServiceConfigurationUtils.unsafeLocalhostResolve());
+        } else {
+            // Use advertised address from config file
+        }
+
+        // Set ZK server's host to localhost
+        config.setZookeeperServers(zkServers + ":" + zkPort);
+        config.setConfigurationStoreServers(zkServers + ":" + zkPort);
+        config.setRunningStandalone(true);
+
+        if (config == null) {
+            System.exit(1);
+        }
+
+        log.debug("--- setup PulsarStandaloneStarter ---");
+
+        if (!onlyBroker) {
+            // Start LocalBookKeeper
+            bkEnsemble = new LocalBookkeeperEnsemble(numOfBk, zkPort, bkPort, 
zkDir, bkDir, wipeData, advertisedAddress);
+            bkEnsemble.startStandalone();
+        }
+
+        if (noBroker) {
+            return;
+        }
+
+        // Start Broker
+        broker = new PulsarService(config, 
Optional.ofNullable(fnWorkerService));
+        broker.start();
+
+        URL webServiceUrl = new URL(
+            String.format("http://%s:%d";, config.getAdvertisedAddress(), 
config.getWebServicePort()));
+        final String brokerServiceUrl = String.format("pulsar://%s:%d", 
config.getAdvertisedAddress(),
+            config.getBrokerServicePort());
+        admin = 
PulsarAdmin.builder().serviceHttpUrl(webServiceUrl.toString()).authentication(
+            config.getBrokerClientAuthenticationPlugin(), 
config.getBrokerClientAuthenticationParameters()).build();
+
+        // Create a sample namespace
+        final String property = "sample";
+        final String cluster = config.getClusterName();
+        final String globalCluster = "global";
+        final String namespace = property + "/" + cluster + "/ns1";
+        try {
+            ClusterData clusterData = new 
ClusterData(webServiceUrl.toString(), null /* serviceUrlTls */,
+                brokerServiceUrl, null /* brokerServiceUrlTls */);
+            if (!admin.clusters().getClusters().contains(cluster)) {
+                admin.clusters().createCluster(cluster, clusterData);
+            } else {
+                admin.clusters().updateCluster(cluster, clusterData);
+            }
+
+            // Create marker for "global" cluster
+            if (!admin.clusters().getClusters().contains(globalCluster)) {
+                admin.clusters().createCluster(globalCluster, new 
ClusterData(null, null));
+            }
+
+            if (!admin.tenants().getTenants().contains(property)) {
+                admin.tenants().createTenant(property,
+                    new 
TenantInfo(Sets.newHashSet(config.getSuperUserRoles()), 
Sets.newHashSet(cluster)));
+            }
+
+            if 
(!admin.namespaces().getNamespaces(property).contains(namespace)) {
+                admin.namespaces().createNamespace(namespace);
+            }
+        } catch (PulsarAdminException e) {
+            log.info(e.getMessage());
+        }
+
+        // Create a public tenant and default namespace
+        final String publicTenant = TopicName.PUBLIC_TENANT;
+        final String defaultNamespace = TopicName.PUBLIC_TENANT + "/" + 
TopicName.DEFAULT_NAMESPACE;
+        try {
+            if (!admin.tenants().getTenants().contains(publicTenant)) {
+                admin.tenants().createTenant(publicTenant,
+                    new 
TenantInfo(Sets.newHashSet(config.getSuperUserRoles()), 
Sets.newHashSet(cluster)));
+            }
+            if 
(!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
+                admin.namespaces().createNamespace(defaultNamespace);
+                
admin.namespaces().setNamespaceReplicationClusters(defaultNamespace, 
Sets.newHashSet(config.getClusterName()));
+            }
+        } catch (PulsarAdminException e) {
+            log.info(e.getMessage());
+        }
+
+        log.debug("--- setup completed ---");
+    }
+
+
+    public void stop() {
+        try {
+            if (fnWorkerService != null) {
+                fnWorkerService.stop();
+            }
+
+            if (broker != null) {
+                broker.close();
+            }
+
+            if (bkEnsemble != null) {
+                bkEnsemble.stop();
+            }
+        } catch (Exception e) {
+            log.error("Shutdown failed: {}", e.getMessage());
+        }
+    }
+
+
+    public static void main(String[] args) throws Exception {
+        EmbeddedPulsar embeddedPulsar = EmbeddedPulsar.builder().build();
+        embeddedPulsar.start();
+        embeddedPulsar.stop();
+    }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/resources/embedded.conf 
b/pulsar-broker/src/main/resources/embedded.conf
new file mode 100644
index 0000000000..f41a780908
--- /dev/null
+++ b/pulsar-broker/src/main/resources/embedded.conf
@@ -0,0 +1,410 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+### --- General broker settings --- ###
+
+# Zookeeper quorum connection string
+zookeeperServers=
+
+# Configuration Store connection string
+  configurationStoreServers=
+
+brokerServicePort=6650
+
+# Port to use to server HTTP request
+webServicePort=8080
+
+# Hostname or IP address the service binds on, default is 0.0.0.0.
+bindAddress=0.0.0.0
+
+# Hostname or IP address the service advertises to the outside world. If not 
set, the value of InetAddress.getLocalHost().getHostName() is used.
+advertisedAddress=
+
+# Name of the cluster to which this broker belongs to
+  clusterName=standalone
+
+# Enable cluster's failure-domain which can distribute brokers into logical 
region
+failureDomainsEnabled=false
+
+# Zookeeper session timeout in milliseconds
+zooKeeperSessionTimeoutMillis=30000
+
+# Time to wait for broker graceful shutdown. After this time elapses, the 
process will be killed
+brokerShutdownTimeoutMs=3000
+
+# Enable backlog quota check. Enforces action on topic when the quota is 
reached
+backlogQuotaCheckEnabled=true
+
+# How often to check for topics that have reached the quota
+backlogQuotaCheckIntervalInSeconds=60
+
+# Default per-topic backlog quota limit
+backlogQuotaDefaultLimitGB=10
+
+# Enable the deletion of inactive topics
+brokerDeleteInactiveTopicsEnabled=true
+
+# How often to check for inactive topics
+brokerDeleteInactiveTopicsFrequencySeconds=60
+
+# How frequently to proactively check and purge expired messages
+messageExpiryCheckIntervalInMinutes=5
+
+# How long to delay rewinding cursor and dispatching messages when active 
consumer is changed
+activeConsumerFailoverDelayTimeMillis=1000
+
+# How long to delete inactive subscriptions from last consuming
+# When it is 0, inactive subscriptions are not deleted automatically
+subscriptionExpirationTimeMinutes=0
+
+# How frequently to proactively check and purge expired subscription
+subscriptionExpiryCheckIntervalInMinutes=5
+
+# Set the default behavior for message deduplication in the broker
+# This can be overridden per-namespace. If enabled, broker will reject
+# messages that were already stored in the topic
+brokerDeduplicationEnabled=false
+
+# Maximum number of producer information that it's going to be
+# persisted for deduplication purposes
+brokerDeduplicationMaxNumberOfProducers=10000
+
+# Number of entries after which a dedup info snapshot is taken.
+# A bigger interval will lead to less snapshots being taken though it would
+# increase the topic recovery time, when the entries published after the
+# snapshot need to be replayed
+brokerDeduplicationEntriesInterval=1000
+
+# Time of inactivity after which the broker will discard the deduplication 
information
+# relative to a disconnected producer. Default is 6 hours.
+brokerDeduplicationProducerInactivityTimeoutMinutes=360
+
+# When a namespace is created without specifying the number of bundle, this
+# value will be used as the default
+defaultNumberOfNamespaceBundles=4
+
+# Enable check for minimum allowed client library version
+clientLibraryVersionCheckEnabled=false
+
+# Path for the file used to determine the rotation status for the broker when 
responding
+# to service discovery health checks
+statusFilePath=/usr/local/apache/htdocs
+
+# Max number of unacknowledged messages allowed to receive messages by a 
consumer on a shared subscription. Broker will stop sending
+# messages to consumer once, this limit reaches until consumer starts 
acknowledging messages back
+# Using a value of 0, is disabling unackeMessage limit check and consumer can 
receive messages without any restriction
+maxUnackedMessagesPerConsumer=50000
+
+# Max number of unacknowledged messages allowed per shared subscription. 
Broker will stop dispatching messages to
+# all consumers of the subscription once this limit reaches until consumer 
starts acknowledging messages back and
+# unack count reaches to limit/2. Using a value of 0, is disabling 
unackedMessage-limit
+# check and dispatcher can dispatch messages without any restriction
+maxUnackedMessagesPerSubscription=200000
+
+# Max number of unacknowledged messages allowed per broker. Once this limit 
reaches, broker will stop dispatching
+# messages to all shared subscription which has higher number of unack 
messages until subscriptions start
+# acknowledging messages back and unack count reaches to limit/2. Using a 
value of 0, is disabling
+# unackedMessage-limit check and broker doesn't block dispatchers
+maxUnackedMessagesPerBroker=0
+
+# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks 
subscriptions which has higher unacked messages
+# than this percentage limit and subscription will not receive any new 
messages until that subscription acks back
+# limit/2 messages
+maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
+
+# Default messages per second dispatch throttling-limit for every topic. Using 
a value of 0, is disabling default
+# message dispatch-throttling
+dispatchThrottlingRatePerTopicInMsg=0
+
+# Default bytes per second dispatch throttling-limit for every topic. Using a 
value of 0, is disabling
+# default message-byte dispatch-throttling
+dispatchThrottlingRatePerTopicInByte=0
+
+# By default we enable dispatch-throttling for both caught up consumers as 
well as consumers who have
+# backlog.
+dispatchThrottlingOnNonBacklogConsumerEnabled=true
+
+# Max number of concurrent lookup request broker allows to throttle heavy 
incoming lookup traffic
+maxConcurrentLookupRequest=10000
+
+# Max number of concurrent topic loading request broker allows to control 
number of zk-operations
+maxConcurrentTopicLoadRequest=5000
+
+# Max concurrent non-persistent message can be processed per connection
+maxConcurrentNonPersistentMessagePerConnection=1000
+
+# Number of worker threads to serve non-persistent topic
+numWorkerThreadsForNonPersistentTopic=8
+
+# Enable broker to load persistent topics
+enablePersistentTopics=true
+
+# Enable broker to load non-persistent topics
+enableNonPersistentTopics=true
+
+# Max number of producers allowed to connect to topic. Once this limit 
reaches, Broker will reject new producers
+# until the number of connected producers decrease.
+# Using a value of 0, is disabling maxProducersPerTopic-limit check.
+maxProducersPerTopic=0
+
+# Max number of consumers allowed to connect to topic. Once this limit 
reaches, Broker will reject new consumers
+# until the number of connected consumers decrease.
+# Using a value of 0, is disabling maxConsumersPerTopic-limit check.
+maxConsumersPerTopic=0
+
+# Max number of consumers allowed to connect to subscription. Once this limit 
reaches, Broker will reject new consumers
+# until the number of connected consumers decrease.
+# Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
+maxConsumersPerSubscription=0
+
+### --- Authentication --- ###
+# Role names that are treated as "proxy roles". If the broker sees a request 
with
+#role as proxyRoles - it will demand to see a valid original principal.
+proxyRoles=
+
+# If this flag is set then the broker authenticates the original Auth data
+# else it just accepts the originalPrincipal and authorizes it (if required).
+  authenticateOriginalAuthData=false
+
+# Enable authentication
+authenticationEnabled=false
+
+# Autentication provider name list, which is comma separated list of class 
names
+authenticationProviders=
+
+# Enforce authorization
+  authorizationEnabled=false
+
+# Authorization provider fully qualified class-name
+authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
+# Allow wildcard matching in authorization
+# (wildcard matching only applicable if wildcard-char:
+# * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
+authorizationAllowWildcardsMatching=false
+
+# Role names that are treated as "super-user", meaning they will be able to do 
all admin
+# operations and publish/consume from all topics
+superUserRoles=
+
+# Authentication settings of the broker itself. Used when the broker connects 
to other brokers,
+# either in same or other clusters
+  brokerClientAuthenticationPlugin=
+brokerClientAuthenticationParameters=
+
+# Supported Athenz provider domain names(comma separated) for authentication
+  athenzDomainNames=
+
+# When this parameter is not empty, unauthenticated users perform as 
anonymousUserRole
+anonymousUserRole=
+
+### --- BookKeeper Client --- ###
+
+# Authentication plugin to use when connecting to bookies
+  bookkeeperClientAuthenticationPlugin=
+
+# BookKeeper auth plugin implementatation specifics parameters name and values
+bookkeeperClientAuthenticationParametersName=
+  bookkeeperClientAuthenticationParameters=
+
+# Timeout for BK add / read operations
+bookkeeperClientTimeoutInSeconds=30
+
+# Speculative reads are initiated if a read request doesn't complete within a 
certain time
+# Using a value of 0, is disabling the speculative reads
+bookkeeperClientSpeculativeReadTimeoutInMillis=0
+
+# Enable bookies health check. Bookies that have more than the configured 
number of failure within
+# the interval will be quarantined for some time. During this period, new 
ledgers won't be created
+# on these bookies
+bookkeeperClientHealthCheckEnabled=true
+bookkeeperClientHealthCheckIntervalSeconds=60
+bookkeeperClientHealthCheckErrorThresholdPerInterval=5
+bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800
+
+# Enable rack-aware bookie selection policy. BK will chose bookies from 
different racks when
+# forming a new bookie ensemble
+bookkeeperClientRackawarePolicyEnabled=true
+
+# Enable bookie isolation by specifying a list of bookie groups to choose 
from. Any bookie
+# outside the specified groups will not be used by the broker
+bookkeeperClientIsolationGroups=
+
+### --- Managed Ledger --- ###
+
+# Number of bookies to use when creating a ledger
+  managedLedgerDefaultEnsembleSize=1
+
+# Number of copies to store for each message
+managedLedgerDefaultWriteQuorum=1
+
+# Number of guaranteed copies (acks to wait before write is complete)
+managedLedgerDefaultAckQuorum=1
+
+# Default type of checksum to use when writing to BookKeeper. Default is 
"CRC32"
+# Other possible options are "CRC32C" (which is faster), "MAC" or "DUMMY" (no 
checksum).
+managedLedgerDigestType=CRC32
+
+# Amount of memory to use for caching data payload in managed ledger. This 
memory
+# is allocated from JVM direct memory and it's shared across all the topics
+# running  in the same broker
+managedLedgerCacheSizeMB=1024
+
+# Threshold to which bring down the cache level when eviction is triggered
+managedLedgerCacheEvictionWatermark=0.9
+
+# Rate limit the amount of writes generated by consumer acking the messages
+managedLedgerDefaultMarkDeleteRateLimit=0.1
+
+# Max number of entries to append to a ledger before triggering a rollover
+# A ledger rollover is triggered on these conditions
+#  * Either the max rollover time has been reached
+#  * or max entries have been written to the ledged and at least min-time
+#    has passed
+managedLedgerMaxEntriesPerLedger=50000
+
+# Minimum time between ledger rollover for a topic
+managedLedgerMinLedgerRolloverTimeMinutes=10
+
+# Maximum time before forcing a ledger rollover for a topic
+managedLedgerMaxLedgerRolloverTimeMinutes=240
+
+# Max number of entries to append to a cursor ledger
+managedLedgerCursorMaxEntriesPerLedger=50000
+
+# Max time before triggering a rollover on a cursor ledger
+managedLedgerCursorRolloverTimeInSeconds=14400
+
+# Max number of "acknowledgment holes" that are going to be persistently 
stored.
+# When acknowledging out of order, a consumer will leave holes that are 
supposed
+# to be quickly filled by acking all the messages. The information of which
+# messages are acknowledged is persisted by compressing in "ranges" of messages
+# that were acknowledged. After the max number of ranges is reached, the 
information
+# will only be tracked in memory and messages will be redelivered in case of
+# crashes.
+managedLedgerMaxUnackedRangesToPersist=10000
+
+# Max number of "acknowledgment holes" that can be stored in Zookeeper. If 
number of unack message range is higher
+# than this limit then broker will persist unacked ranges into bookkeeper to 
avoid additional data overhead into
+# zookeeper.
+managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
+
+# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's 
list. It helps when data-ledgers gets
+# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
+autoSkipNonRecoverableData=false
+
+### --- Load balancer --- ###
+
+# Enable load balancer
+loadBalancerEnabled=false
+
+# Percentage of change to trigger load report update
+loadBalancerReportUpdateThresholdPercentage=10
+
+# maximum interval to update load report
+loadBalancerReportUpdateMaxIntervalMinutes=15
+
+# Frequency of report to collect
+loadBalancerHostUsageCheckIntervalMinutes=1
+
+# Load shedding interval. Broker periodically checks whether some traffic 
should be offload from
+# some over-loaded broker to other under-loaded brokers
+loadBalancerSheddingIntervalMinutes=1
+
+# Prevent the same topics to be shed and moved to other broker more that once 
within this timeframe
+loadBalancerSheddingGracePeriodMinutes=30
+
+# Usage threshold to allocate max number of topics to broker
+loadBalancerBrokerMaxTopics=50000
+
+# Interval to flush dynamic resource quota to ZooKeeper
+loadBalancerResourceQuotaUpdateIntervalMinutes=15
+
+# enable/disable namespace bundle auto split
+loadBalancerAutoBundleSplitEnabled=true
+
+# enable/disable automatic unloading of split bundles
+loadBalancerAutoUnloadSplitBundlesEnabled=true
+
+# maximum topics in a bundle, otherwise bundle split will be triggered
+loadBalancerNamespaceBundleMaxTopics=1000
+
+# maximum sessions (producers + consumers) in a bundle, otherwise bundle split 
will be triggered
+loadBalancerNamespaceBundleMaxSessions=1000
+
+# maximum msgRate (in + out) in a bundle, otherwise bundle split will be 
triggered
+loadBalancerNamespaceBundleMaxMsgRate=30000
+
+# maximum bandwidth (in + out) in a bundle, otherwise bundle split will be 
triggered
+loadBalancerNamespaceBundleMaxBandwidthMbytes=100
+
+# maximum number of bundles in a namespace
+loadBalancerNamespaceMaximumBundles=128
+
+### --- Replication --- ###
+
+# Enable replication metrics
+replicationMetricsEnabled=true
+
+# Max number of connections to open for each broker in a remote cluster
+# More connections host-to-host lead to better throughput over high-latency
+# links.
+replicationConnectionsPerBroker=16
+
+# Replicator producer queue size
+replicationProducerQueueSize=1000
+
+# Default message retention time
+defaultRetentionTimeInMinutes=0
+
+# Default retention size
+defaultRetentionSizeInMB=0
+
+# How often to check whether the connections are still alive
+keepAliveIntervalSeconds=30
+
+# How often broker checks for inactive topics to be deleted (topics with no 
subscriptions and no one connected)
+brokerServicePurgeInactiveFrequencyInSeconds=60
+
+### --- WebSocket --- ###
+
+# Enable the WebSocket API service in broker
+webSocketServiceEnabled=true
+
+# Number of IO threads in Pulsar Client used in WebSocket proxy
+webSocketNumIoThreads=8
+
+# Number of connections per Broker in Pulsar Client used in WebSocket proxy
+webSocketConnectionsPerBroker=8
+
+
+### --- Metrics --- ###
+
+# Enable topic level metrics
+exposeTopicLevelMetricsInPrometheus=true
+
+### --- Broker Web Stats --- ###
+
+# Enable topic level metrics
+exposePublisherStats=true
+
+### --- Deprecated config variables --- ###
+
+# Deprecated. Use configurationStoreServers
+globalZookeeperServers=
diff --git a/pulsar-broker/src/main/resources/log4j2.xml 
b/pulsar-broker/src/main/resources/log4j2.xml
new file mode 100644
index 0000000000..85a7c1e59c
--- /dev/null
+++ b/pulsar-broker/src/main/resources/log4j2.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<Configuration status="INFO">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t:%C@%L] %-5level 
%logger{36} - %msg%n" />
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="warn">
+            <AppenderRef ref="Console" />
+        </Root>
+        <Logger name="org.apache.pulsar" level="info"/>
+        <Logger name="org.apache.bookkeeper" level="info"/>
+    </Loggers>
+</Configuration>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to