This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6154e3c Allow to override the auto-detected NIC speed limit (#1157) 6154e3c is described below commit 6154e3cb911154197c8850db8f2db5b99c17cc78 Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Jan 31 18:18:06 2018 -0800 Allow to override the auto-detected NIC speed limit (#1157) --- conf/broker.conf | 8 +++ .../apache/pulsar/broker/ServiceConfiguration.java | 14 ++++- .../loadbalance/impl/LinuxBrokerHostUsageImpl.java | 32 ++++++++---- .../broker/loadbalance/LoadReportNetworkLimit.java | 61 ++++++++++++++++++++++ .../common/naming/ServiceConfigurationTest.java | 23 ++++++-- .../org/apache/pulsar/common/util/FieldParser.java | 11 ++-- 6 files changed, 130 insertions(+), 19 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 7f9dd33..979f4e2 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -355,6 +355,14 @@ loadBalancerNamespaceBundleMaxBandwidthMbytes=100 # maximum number of bundles in a namespace loadBalancerNamespaceMaximumBundles=128 +# Override the auto-detection of the network interfaces max speed. +# This option is useful in some environments (eg: EC2 VMs) where the max speed +# reported by Linux is not reflecting the real bandwidth available to the broker. +# Since the network usage is employed by the load manager to decide when a broker +# is overloaded, it is important to make sure the info is correct or override it +# with the right value here. +loadBalancerOverrideBrokerNicSpeedGbps= + # Name of load manager to use loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 88117e0..0cc4001 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -18,10 +18,9 @@ */ package org.apache.pulsar.broker; -import java.lang.reflect.Field; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -349,6 +348,9 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext(dynamic = true) private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl"; + // Option to override the auto-detected network interfaces max speed + private Integer loadBalancerOverrideBrokerNicSpeedGbps; + /**** --- Replication --- ****/ // Enable replication metrics private boolean replicationMetricsEnabled = false; @@ -1218,6 +1220,14 @@ public class ServiceConfiguration implements PulsarConfiguration { return this.loadBalancerNamespaceMaximumBundles; } + public Optional<Integer> getLoadBalancerOverrideBrokerNicSpeedGbps() { + return Optional.ofNullable(loadBalancerOverrideBrokerNicSpeedGbps); + } + + public void setLoadBalancerOverrideBrokerNicSpeedGbps(int loadBalancerOverrideBrokerNicSpeedGbps) { + this.loadBalancerOverrideBrokerNicSpeedGbps = loadBalancerOverrideBrokerNicSpeedGbps; + } + public boolean isReplicationMetricsEnabled() { return replicationMetricsEnabled; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java index 4b62f84..7cdf84c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java @@ -18,15 +18,6 @@ */ package org.apache.pulsar.broker.loadbalance.impl; -import com.sun.management.OperatingSystemMXBean; - -import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.loadbalance.BrokerHostUsage; -import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; -import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.lang.management.ManagementFactory; import java.nio.file.Files; @@ -35,10 +26,20 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.loadbalance.BrokerHostUsage; +import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; +import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sun.management.OperatingSystemMXBean; + /** * Class that will return the broker host usage. * @@ -54,6 +55,8 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { private OperatingSystemMXBean systemBean; private SystemResourceUsage usage; + private final Optional<Integer> overrideBrokerNicSpeedGbps; + private static final Logger LOG = LoggerFactory.getLogger(LinuxBrokerHostUsageImpl.class); public LinuxBrokerHostUsageImpl(PulsarService pulsar) { @@ -61,6 +64,7 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { this.systemBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); this.lastCollection = 0L; this.usage = new SystemResourceUsage(); + this.overrideBrokerNicSpeedGbps = pulsar.getConfiguration().getLoadBalancerOverrideBrokerNicSpeedGbps(); pulsar.getLoadManagerExecutor().scheduleAtFixedRate(this::calculateBrokerHostUsage, 0, hostUsageCheckIntervalMin, TimeUnit.MINUTES); } @@ -117,12 +121,12 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { /** * Reads first line of /proc/stat to get total cpu usage. - * + * * <pre> * cpu user nice system idle iowait irq softirq steal guest guest_nice * cpu 317808 128 58637 2503692 7634 0 13472 0 0 0 * </pre> - * + * * Line is split in "words", filtering the first. The sum of all numbers give the amount of cpu cycles used this * far. Real CPU usage should equal the sum substracting the idle cycles, this would include iowait, irq and steal. */ @@ -175,6 +179,12 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { } private double getTotalNicLimitKbps(List<String> nics) { + if (overrideBrokerNicSpeedGbps.isPresent()) { + // Use the override value as configured. Return the total max speed across all available NICs, converted + // from Gbps into Kbps + return ((double) overrideBrokerNicSpeedGbps.get()) * nics.size() * 1024 * 1024; + } + // Nic speed is in Mbits/s, return kbits/s return nics.stream().mapToDouble(s -> { try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimit.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimit.java new file mode 100644 index 0000000..7f83247 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimit.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance; + +import org.apache.commons.lang3.SystemUtils; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import static org.testng.Assert.*; + +public class LoadReportNetworkLimit extends MockedPulsarServiceBaseTest { + + @BeforeClass + @Override + public void setup() throws Exception { + conf.setLoadBalancerEnabled(true); + conf.setLoadBalancerOverrideBrokerNicSpeedGbps(5); + super.internalSetup(); + } + + @AfterClass + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void checkLoadReportNicSpeed() throws Exception { + // Since we have overridden the NIC speed in the configuration, the load report for the broker should always + + LoadManagerReport report = admin.brokerStats().getLoadReport(); + + if (SystemUtils.IS_OS_LINUX) { + assertEquals(report.getBandwidthIn().limit, 5.0 * 1024 * 1024); + assertEquals(report.getBandwidthOut().limit, 5.0 * 1024 * 1024); + } else { + // On non-Linux system we don't report the network usage + assertEquals(report.getBandwidthIn().limit, -1.0); + assertEquals(report.getBandwidthOut().limit, -1.0); + } + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index f19803c..bad061c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -29,6 +29,7 @@ import java.io.InputStream; import java.io.PrintWriter; import java.io.StringWriter; import java.nio.charset.StandardCharsets; +import java.util.Optional; import java.util.Properties; import org.apache.pulsar.broker.ServiceConfiguration; @@ -36,7 +37,7 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.testng.annotations.Test; /** - * + * * */ public class ServiceConfigurationTest { @@ -45,7 +46,7 @@ public class ServiceConfigurationTest { /** * test {@link ServiceConfiguration} initialization - * + * * @throws Exception */ @Test @@ -59,9 +60,25 @@ public class ServiceConfigurationTest { assertEquals(config.getBootstrapNamespaces().get(1), "ns2"); } + @Test + public void testOptionalSettingEmpty() throws Exception { + String confFile = "loadBalancerOverrideBrokerNicSpeedGbps=\n"; + InputStream stream = new ByteArrayInputStream(confFile.getBytes()); + final ServiceConfiguration config = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class); + assertEquals(config.getLoadBalancerOverrideBrokerNicSpeedGbps(), Optional.empty()); + } + + @Test + public void testOptionalSettingPresent() throws Exception { + String confFile = "loadBalancerOverrideBrokerNicSpeedGbps=5\n"; + InputStream stream = new ByteArrayInputStream(confFile.getBytes()); + final ServiceConfiguration config = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class); + assertEquals(config.getLoadBalancerOverrideBrokerNicSpeedGbps(), Optional.of(5)); + } + /** * test {@link ServiceConfiguration} with incorrect values. - * + * * @throws Exception */ @Test(expectedExceptions = IllegalArgumentException.class) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java index c7bab5f..ebac111 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.common.util; -import org.apache.pulsar.common.policies.data.AuthAction; - import static com.google.common.base.Preconditions.checkNotNull; import static java.lang.String.format; @@ -33,6 +31,8 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import io.netty.util.internal.StringUtil; + /** * * Generic value converter. @@ -192,7 +192,12 @@ public final class FieldParser { * @return The converted Integer value. */ public static Integer stringToInteger(String val) { - return Integer.valueOf(trim(val)); + String v = trim(val); + if (StringUtil.isNullOrEmpty(v)) { + return null; + } else { + return Integer.valueOf(v); + } } /** -- To stop receiving notification emails like this one, please contact mme...@apache.org.