keith-turner commented on code in PR #5708: URL: https://github.com/apache/accumulo/pull/5708#discussion_r2183005955
########## server/base/src/main/java/org/apache/accumulo/server/conf/SystemConfiguration.java: ########## @@ -18,36 +18,144 @@ */ package org.apache.accumulo.server.conf; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.COMPACTOR; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.GARBAGE_COLLECTOR; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.MANAGER; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.MONITOR; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.SCAN_SERVER; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.TABLET_SERVER; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.store.SystemPropKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; + public class SystemConfiguration extends ZooBasedConfiguration { private static final Logger log = LoggerFactory.getLogger(SystemConfiguration.class); + private static class ChangedPropertyMonitor implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ChangedPropertyMonitor.class); + private static Map<ServerId.Type,List<Property>> SERVER_PROPERTY_PREFIXES = Map.of(COMPACTOR, + List.of(Property.COMPACTOR_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), + GARBAGE_COLLECTOR, + List.of(Property.GC_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), MANAGER, + List.of(Property.MANAGER_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), MONITOR, + List.of(Property.MONITOR_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), + SCAN_SERVER, List.of(Property.SSERV_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), + TABLET_SERVER, + List.of(Property.TSERV_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX)); + + private final AtomicReference<Set<String>> propsChanged = new AtomicReference<>(Set.of()); + private final List<Property> applicableProperties; + private volatile Map<String,String> currentProperties; + private volatile long currentVersion; + + public ChangedPropertyMonitor(long initialVersion, Map<String,String> initialProperties, + ServerId.Type serverType) { + this.applicableProperties = SERVER_PROPERTY_PREFIXES.get(serverType); + this.currentProperties = initialProperties; + this.currentVersion = initialVersion; + } + + private void changedProperties(Set<String> props) { + final Set<String> changed = new HashSet<>(); + props.forEach(p -> { + applicableProperties.forEach(ap -> { + if (p.startsWith(ap.getKey())) { + changed.add(p); + } + }); + }); + propsChanged.set(changed); + } + + public void update(long version, Map<String,String> properties) { Review Comment: Multiple threads reading configuration could end up in this code at the same time. There are multiple variables being read and written so it seems like this method and other methods in the class should be synchronized to consistently set all the variables. ```suggestion public synchronized void update(long version, Map<String,String> properties) { ``` ########## server/base/src/main/java/org/apache/accumulo/server/conf/SystemConfiguration.java: ########## @@ -18,36 +18,144 @@ */ package org.apache.accumulo.server.conf; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.COMPACTOR; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.GARBAGE_COLLECTOR; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.MANAGER; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.MONITOR; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.SCAN_SERVER; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.TABLET_SERVER; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.store.SystemPropKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; + public class SystemConfiguration extends ZooBasedConfiguration { private static final Logger log = LoggerFactory.getLogger(SystemConfiguration.class); + private static class ChangedPropertyMonitor implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ChangedPropertyMonitor.class); + private static Map<ServerId.Type,List<Property>> SERVER_PROPERTY_PREFIXES = Map.of(COMPACTOR, + List.of(Property.COMPACTOR_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), + GARBAGE_COLLECTOR, + List.of(Property.GC_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), MANAGER, + List.of(Property.MANAGER_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), MONITOR, + List.of(Property.MONITOR_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), + SCAN_SERVER, List.of(Property.SSERV_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), + TABLET_SERVER, + List.of(Property.TSERV_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX)); + + private final AtomicReference<Set<String>> propsChanged = new AtomicReference<>(Set.of()); + private final List<Property> applicableProperties; + private volatile Map<String,String> currentProperties; + private volatile long currentVersion; + + public ChangedPropertyMonitor(long initialVersion, Map<String,String> initialProperties, + ServerId.Type serverType) { + this.applicableProperties = SERVER_PROPERTY_PREFIXES.get(serverType); + this.currentProperties = initialProperties; + this.currentVersion = initialVersion; + } + + private void changedProperties(Set<String> props) { + final Set<String> changed = new HashSet<>(); + props.forEach(p -> { + applicableProperties.forEach(ap -> { + if (p.startsWith(ap.getKey())) { + changed.add(p); + } + }); + }); + propsChanged.set(changed); + } + + public void update(long version, Map<String,String> properties) { + if (currentVersion == version) { + return; + } + MapDifference<String,String> diff = Maps.difference(currentProperties, properties); + currentProperties = properties; + currentVersion = version; + if (diff.areEqual()) { + return; + } + changedProperties(diff.entriesDiffering().keySet()); + } + + public void propChecked(Property p) { + final Set<String> changed = propsChanged.get(); + if (changed.isEmpty()) { + return; + } + propsChanged.get().remove(p.getKey()); + } + + @Override + public void run() { + final Set<String> changed = propsChanged.get(); + if (!changed.isEmpty()) { + LOG.warn("The following properties have changed, but have not yet been read: {}", changed); + } + } + + } + + private final Map<String,String> initialProperties; private final RuntimeFixedProperties runtimeFixedProps; + private final ChangedPropertyMonitor monitor; public SystemConfiguration(ServerContext context, SystemPropKey propStoreKey, AccumuloConfiguration parent) { + this(context, propStoreKey, parent, Optional.empty()); + } + + public SystemConfiguration(ServerContext context, SystemPropKey propStoreKey, + AccumuloConfiguration parent, Optional<ServerId.Type> serverType) { super(log, context, propStoreKey, parent); - runtimeFixedProps = new RuntimeFixedProperties(getSnapshot(), context.getSiteConfiguration()); + initialProperties = getSnapshot(); + if (serverType.isPresent()) { + monitor = + new ChangedPropertyMonitor(getDataVersion(), initialProperties, serverType.orElseThrow()); + // start the monitor as a scheduled task at some interval. Review Comment: is this a todo? ########## server/base/src/main/java/org/apache/accumulo/server/conf/SystemConfiguration.java: ########## @@ -18,36 +18,144 @@ */ package org.apache.accumulo.server.conf; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.COMPACTOR; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.GARBAGE_COLLECTOR; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.MANAGER; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.MONITOR; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.SCAN_SERVER; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.TABLET_SERVER; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.store.SystemPropKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; + public class SystemConfiguration extends ZooBasedConfiguration { private static final Logger log = LoggerFactory.getLogger(SystemConfiguration.class); + private static class ChangedPropertyMonitor implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ChangedPropertyMonitor.class); + private static Map<ServerId.Type,List<Property>> SERVER_PROPERTY_PREFIXES = Map.of(COMPACTOR, + List.of(Property.COMPACTOR_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), + GARBAGE_COLLECTOR, + List.of(Property.GC_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), MANAGER, + List.of(Property.MANAGER_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), MONITOR, + List.of(Property.MONITOR_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), + SCAN_SERVER, List.of(Property.SSERV_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), + TABLET_SERVER, + List.of(Property.TSERV_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX)); + + private final AtomicReference<Set<String>> propsChanged = new AtomicReference<>(Set.of()); + private final List<Property> applicableProperties; + private volatile Map<String,String> currentProperties; + private volatile long currentVersion; + + public ChangedPropertyMonitor(long initialVersion, Map<String,String> initialProperties, + ServerId.Type serverType) { + this.applicableProperties = SERVER_PROPERTY_PREFIXES.get(serverType); + this.currentProperties = initialProperties; + this.currentVersion = initialVersion; + } + + private void changedProperties(Set<String> props) { + final Set<String> changed = new HashSet<>(); + props.forEach(p -> { + applicableProperties.forEach(ap -> { + if (p.startsWith(ap.getKey())) { + changed.add(p); + } + }); + }); + propsChanged.set(changed); + } + + public void update(long version, Map<String,String> properties) { + if (currentVersion == version) { + return; + } + MapDifference<String,String> diff = Maps.difference(currentProperties, properties); + currentProperties = properties; + currentVersion = version; + if (diff.areEqual()) { + return; + } + changedProperties(diff.entriesDiffering().keySet()); + } + + public void propChecked(Property p) { + final Set<String> changed = propsChanged.get(); + if (changed.isEmpty()) { + return; + } + propsChanged.get().remove(p.getKey()); + } + + @Override + public void run() { Review Comment: Not seeing the code that starts a thread or timer task to call this. ########## server/base/src/main/java/org/apache/accumulo/server/conf/SystemConfiguration.java: ########## @@ -18,36 +18,144 @@ */ package org.apache.accumulo.server.conf; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.COMPACTOR; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.GARBAGE_COLLECTOR; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.MANAGER; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.MONITOR; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.SCAN_SERVER; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.TABLET_SERVER; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.store.SystemPropKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; + public class SystemConfiguration extends ZooBasedConfiguration { private static final Logger log = LoggerFactory.getLogger(SystemConfiguration.class); + private static class ChangedPropertyMonitor implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ChangedPropertyMonitor.class); + private static Map<ServerId.Type,List<Property>> SERVER_PROPERTY_PREFIXES = Map.of(COMPACTOR, + List.of(Property.COMPACTOR_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), + GARBAGE_COLLECTOR, + List.of(Property.GC_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), MANAGER, + List.of(Property.MANAGER_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), MONITOR, + List.of(Property.MONITOR_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), + SCAN_SERVER, List.of(Property.SSERV_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX), + TABLET_SERVER, + List.of(Property.TSERV_PREFIX, Property.RPC_PREFIX, Property.INSTANCE_PREFIX)); + + private final AtomicReference<Set<String>> propsChanged = new AtomicReference<>(Set.of()); + private final List<Property> applicableProperties; + private volatile Map<String,String> currentProperties; + private volatile long currentVersion; + + public ChangedPropertyMonitor(long initialVersion, Map<String,String> initialProperties, + ServerId.Type serverType) { + this.applicableProperties = SERVER_PROPERTY_PREFIXES.get(serverType); + this.currentProperties = initialProperties; + this.currentVersion = initialVersion; + } + + private void changedProperties(Set<String> props) { + final Set<String> changed = new HashSet<>(); + props.forEach(p -> { + applicableProperties.forEach(ap -> { + if (p.startsWith(ap.getKey())) { + changed.add(p); + } + }); + }); + propsChanged.set(changed); + } + + public void update(long version, Map<String,String> properties) { + if (currentVersion == version) { + return; + } + MapDifference<String,String> diff = Maps.difference(currentProperties, properties); + currentProperties = properties; + currentVersion = version; + if (diff.areEqual()) { + return; + } + changedProperties(diff.entriesDiffering().keySet()); Review Comment: javadoc for entriesDiffering is : ``` Returns an unmodifiable map describing keys that appear in both maps, but with different values ``` So maybe if there is a new prop that was not present in the old map it would not be seen. Should this be something like the following? ```java Sets.union(diff.entriesDiffering().keySet(), diff.entriesOnlyOnRight().keySet()) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org