dlmarion commented on code in PR #2778:
URL: https://github.com/apache/accumulo/pull/2778#discussion_r934549503
##########
server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java:
##########
@@ -51,9 +65,18 @@ public class ServerConfigurationFactory extends
ServerConfiguration {
private final SiteConfiguration siteConfig;
private final DeleteWatcher deleteWatcher = new DeleteWatcher();
+ private static final int REFRESH_PERIOD_MINUTES = 15;
+
+ private final ConfigRefreshRunner refresher;
+ private static final AtomicBoolean isConfigRefreshRunning = new
AtomicBoolean(false);
+ private static final Lock refreshLock = new ReentrantLock();
+
public ServerConfigurationFactory(ServerContext context, SiteConfiguration
siteConfig) {
this.context = context;
this.siteConfig = siteConfig;
+
+ refresher = new ConfigRefreshRunner();
+ Runtime.getRuntime().addShutdownHook(new Thread(refresher::shutdown));
Review Comment:
Please use one of the `Threads.createThread` variants. It ensures that the
UncaughtExceptionHandler and name are set so that its visible in stack traces.
##########
server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java:
##########
@@ -140,4 +163,119 @@ public void connectionEvent() {
// no-op. changes handled by prop store impl
}
}
+
+ private class ConfigRefreshRunner {
+ private static final long MIN_JITTER_DELAY = 1;
+ private static final long MAX_JITTER_DELAY = 23;
+ private final ScheduledFuture<?> refreshTaskFuture;
+
+ ConfigRefreshRunner() {
+
+ Runnable refreshTask = this::verifySnapshotVersions;
+
+ ScheduledThreadPoolExecutor executor = ThreadPools.getServerThreadPools()
+ .createScheduledExecutorService(1, "config-refresh", false);
+
+ // staggering the initial delay prevents synchronization of Accumulo
servers communicating
+ // with ZooKeeper for the sync process. (Value is 25% -> 100% of the
refresh period.)
+ long randDelay = jitter(REFRESH_PERIOD_MINUTES / 4,
REFRESH_PERIOD_MINUTES);
+ refreshTaskFuture =
+ executor.scheduleWithFixedDelay(refreshTask, randDelay,
REFRESH_PERIOD_MINUTES, MINUTES);
+ }
+
+ /**
+ * Check that the stored version in ZooKeeper matches the version held in
the local snapshot.
+ * When a mismatch is detected, a change event is sent to the prop store
which will cause a
+ * re-load. If the Zookeeper node has been deleted, the local cache
entries are removed.
+ * <p>
+ * This method is designed to be called as a scheduled task, so it does
not propagate exceptions
+ * other than interrupted Exceptions so the scheduled tasks will continue
to run.
+ */
+ private void verifySnapshotVersions() {
+
+ // short circuit if refresh in progress
+ if (isConfigRefreshRunning.get()) {
+ return;
+ }
+
+ // allow only one thread if missed short circuit check.
+ refreshLock.lock();
Review Comment:
I don't think that you need `isConfigRefreshRunning` or `refreshLock`. You
are executing this using `scheduleWithFixedDelay`, which runs the next task
after the termination of the current task and the specified delay. Meaning -
only one should be running at a time.
##########
server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java:
##########
@@ -140,4 +163,119 @@ public void connectionEvent() {
// no-op. changes handled by prop store impl
}
}
+
+ private class ConfigRefreshRunner {
+ private static final long MIN_JITTER_DELAY = 1;
+ private static final long MAX_JITTER_DELAY = 23;
+ private final ScheduledFuture<?> refreshTaskFuture;
+
+ ConfigRefreshRunner() {
+
+ Runnable refreshTask = this::verifySnapshotVersions;
+
+ ScheduledThreadPoolExecutor executor = ThreadPools.getServerThreadPools()
+ .createScheduledExecutorService(1, "config-refresh", false);
+
+ // staggering the initial delay prevents synchronization of Accumulo
servers communicating
+ // with ZooKeeper for the sync process. (Value is 25% -> 100% of the
refresh period.)
+ long randDelay = jitter(REFRESH_PERIOD_MINUTES / 4,
REFRESH_PERIOD_MINUTES);
+ refreshTaskFuture =
+ executor.scheduleWithFixedDelay(refreshTask, randDelay,
REFRESH_PERIOD_MINUTES, MINUTES);
+ }
+
+ /**
+ * Check that the stored version in ZooKeeper matches the version held in
the local snapshot.
+ * When a mismatch is detected, a change event is sent to the prop store
which will cause a
+ * re-load. If the Zookeeper node has been deleted, the local cache
entries are removed.
+ * <p>
+ * This method is designed to be called as a scheduled task, so it does
not propagate exceptions
+ * other than interrupted Exceptions so the scheduled tasks will continue
to run.
+ */
+ private void verifySnapshotVersions() {
+
+ // short circuit if refresh in progress
+ if (isConfigRefreshRunning.get()) {
+ return;
+ }
+
+ // allow only one thread if missed short circuit check.
+ refreshLock.lock();
+ try {
+ isConfigRefreshRunning.set(true);
+ long refreshStart = System.nanoTime();
+ int keyCount = 0;
+ int keyChangedCount = 0;
+
+ PropStore propStore = context.getPropStore();
+ keyCount++;
+
+ // rely on store to propagate change event if different
+ propStore.validateDataVersion(SystemPropKey.of(context),
+ ((ZooBasedConfiguration)
getSystemConfiguration()).getDataVersion());
+ // small yield - spread out ZooKeeper calls
+ jitterDelay();
+
+ for (Map.Entry<NamespaceId,NamespaceConfiguration> entry :
namespaceConfigs.entrySet()) {
+ keyCount++;
+ PropStoreKey<?> propKey = NamespacePropKey.of(context,
entry.getKey());
+ if (!propStore.validateDataVersion(propKey,
entry.getValue().getDataVersion())) {
+ keyChangedCount++;
+ namespaceConfigs.remove(entry.getKey());
+ }
+ // small yield - spread out ZooKeeper calls between namespace config
checks
+ jitterDelay();
+ }
+
+ for (Map.Entry<TableId,TableConfiguration> entry :
tableConfigs.entrySet()) {
+ keyCount++;
+ TableId tid = entry.getKey();
+ PropStoreKey<?> propKey = TablePropKey.of(context, tid);
+ if (!propStore.validateDataVersion(propKey,
entry.getValue().getDataVersion())) {
+ keyChangedCount++;
+ tableConfigs.remove(tid);
+ tableParentConfigs.remove(tid);
+ log.debug("data version sync: difference found. forcing
configuration update for {}}",
+ propKey);
+ }
+ // small yield - spread out ZooKeeper calls between table config
checks
+ jitterDelay();
Review Comment:
Same comment as above about jitterDelay
##########
server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java:
##########
@@ -51,9 +65,18 @@ public class ServerConfigurationFactory extends
ServerConfiguration {
private final SiteConfiguration siteConfig;
private final DeleteWatcher deleteWatcher = new DeleteWatcher();
+ private static final int REFRESH_PERIOD_MINUTES = 15;
Review Comment:
There is something else that will cause the configuration to be refreshed,
right? A watcher or something? If not, does this mean that the server
processes will become consistent with configuration changes after 15 minutes?
If so, I think that is too long - maybe it should be configurable?
##########
server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java:
##########
@@ -140,4 +163,119 @@ public void connectionEvent() {
// no-op. changes handled by prop store impl
}
}
+
+ private class ConfigRefreshRunner {
+ private static final long MIN_JITTER_DELAY = 1;
+ private static final long MAX_JITTER_DELAY = 23;
+ private final ScheduledFuture<?> refreshTaskFuture;
+
+ ConfigRefreshRunner() {
+
+ Runnable refreshTask = this::verifySnapshotVersions;
+
+ ScheduledThreadPoolExecutor executor = ThreadPools.getServerThreadPools()
+ .createScheduledExecutorService(1, "config-refresh", false);
+
+ // staggering the initial delay prevents synchronization of Accumulo
servers communicating
+ // with ZooKeeper for the sync process. (Value is 25% -> 100% of the
refresh period.)
+ long randDelay = jitter(REFRESH_PERIOD_MINUTES / 4,
REFRESH_PERIOD_MINUTES);
+ refreshTaskFuture =
+ executor.scheduleWithFixedDelay(refreshTask, randDelay,
REFRESH_PERIOD_MINUTES, MINUTES);
+ }
+
+ /**
+ * Check that the stored version in ZooKeeper matches the version held in
the local snapshot.
+ * When a mismatch is detected, a change event is sent to the prop store
which will cause a
+ * re-load. If the Zookeeper node has been deleted, the local cache
entries are removed.
+ * <p>
+ * This method is designed to be called as a scheduled task, so it does
not propagate exceptions
+ * other than interrupted Exceptions so the scheduled tasks will continue
to run.
+ */
+ private void verifySnapshotVersions() {
+
+ // short circuit if refresh in progress
+ if (isConfigRefreshRunning.get()) {
+ return;
+ }
+
+ // allow only one thread if missed short circuit check.
+ refreshLock.lock();
+ try {
+ isConfigRefreshRunning.set(true);
+ long refreshStart = System.nanoTime();
+ int keyCount = 0;
+ int keyChangedCount = 0;
+
+ PropStore propStore = context.getPropStore();
+ keyCount++;
+
+ // rely on store to propagate change event if different
+ propStore.validateDataVersion(SystemPropKey.of(context),
+ ((ZooBasedConfiguration)
getSystemConfiguration()).getDataVersion());
+ // small yield - spread out ZooKeeper calls
+ jitterDelay();
+
+ for (Map.Entry<NamespaceId,NamespaceConfiguration> entry :
namespaceConfigs.entrySet()) {
+ keyCount++;
+ PropStoreKey<?> propKey = NamespacePropKey.of(context,
entry.getKey());
+ if (!propStore.validateDataVersion(propKey,
entry.getValue().getDataVersion())) {
+ keyChangedCount++;
+ namespaceConfigs.remove(entry.getKey());
+ }
+ // small yield - spread out ZooKeeper calls between namespace config
checks
+ jitterDelay();
Review Comment:
I'm not sure this is necessary because:
1. There is already jitter at the beginning
2. Not all server processes start at the exact same time
3. Each run of the background task is not going to take exactly the same time
##########
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java:
##########
@@ -391,4 +399,26 @@ public PropCache getCache() {
public @Nullable VersionedProperties getWithoutCaching(PropStoreKey<?>
propStoreKey) {
return cache.getWithoutCaching(propStoreKey);
}
+
+ @Override
+ public boolean validateDataVersion(PropStoreKey<?> storeKey, long
expectedVersion) {
+ try {
+ Stat stat = zrw.getStatus(storeKey.getPath());
+ log.trace("data version sync: stat returned: {} for {}", stat, storeKey);
+ if (stat == null || expectedVersion != stat.getVersion()) {
+ propStoreWatcher.signalZkChangeEvent(storeKey);
+ return false;
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(ex);
+ } catch (KeeperException.NoNodeException ex) {
+ propStoreWatcher.signalZkChangeEvent(storeKey);
Review Comment:
you are returning `true` here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]