This is an automated email from the ASF dual-hosted git repository. smolnar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push: new bb6719f3c KNOX-2994 - PollingConfigurationAnalyzer starts after the Knox GW is up and running (#831) bb6719f3c is described below commit bb6719f3cad33cc89c990a2ab5bc61756c497d4f Author: Sandor Molnar <smol...@apache.org> AuthorDate: Thu Jan 4 08:30:58 2024 +0100 KNOX-2994 - PollingConfigurationAnalyzer starts after the Knox GW is up and running (#831) --- .../ClouderaManagerServiceDiscoveryMessages.java | 4 + .../cm/monitor/PollingConfigurationAnalyzer.java | 105 +++++++++++++-------- .../monitor/PollingConfigurationAnalyzerTest.java | 52 +++++++++- 3 files changed, 120 insertions(+), 41 deletions(-) diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java index ca331241d..c86fe2912 100644 --- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java +++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java @@ -133,6 +133,10 @@ public interface ClouderaManagerServiceDiscoveryMessages { text = "Started ClouderaManager cluster configuration monitor (checking every {0} seconds)") void startedClouderaManagerConfigMonitor(long pollingInterval); + @Message(level = MessageLevel.INFO, + text = "The Knox Gateway is not yet ready to monitor ClouderaManager cluster configuration changes.") + void gatewayIsNotYetReadyToMonitorClouderaManagerConfigs(); + @Message(level = MessageLevel.INFO, text = "Stopping ClouderaManager cluster configuration monitor") void stoppingClouderaManagerConfigMonitor(); diff --git a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java index c4e511d36..251ce6d24 100644 --- a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java +++ b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java @@ -42,6 +42,7 @@ import org.apache.knox.gateway.services.security.AliasService; import org.apache.knox.gateway.services.security.KeystoreService; import org.apache.knox.gateway.services.security.KeystoreServiceException; import org.apache.knox.gateway.services.topology.TopologyService; +import org.apache.knox.gateway.services.topology.impl.GatewayStatusService; import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService; import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig; import org.apache.knox.gateway.topology.discovery.cm.ClouderaManagerServiceDiscoveryMessages; @@ -155,6 +156,8 @@ public class PollingConfigurationAnalyzer implements Runnable { private final GatewayConfig gatewayConfig; + private GatewayStatusService gatewayStatusService; + PollingConfigurationAnalyzer(final GatewayConfig gatewayConfig, final ClusterConfigurationCache configCache, final AliasService aliasService, @@ -207,56 +210,68 @@ public class PollingConfigurationAnalyzer implements Runnable { log.startedClouderaManagerConfigMonitor(interval); isActive = true; + boolean gatewayStatusOk = false; while (isActive) { - try { - final List<String> clustersToStopMonitoring = new ArrayList<>(); - - for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) { - String address = entry.getKey(); - for (String clusterName : entry.getValue()) { - if (configCache.getDiscoveryConfig(address, clusterName) == null) { - log.noClusterConfiguration(clusterName, address); - continue; - } - log.checkingClusterConfiguration(clusterName, address); + if (!gatewayStatusOk) { + gatewayStatusOk = getGatewayStatusService() != null && getGatewayStatusService().status(); + } + if (gatewayStatusOk) { + monitorClusterConfigurationChanges(); + } else { + log.gatewayIsNotYetReadyToMonitorClouderaManagerConfigs(); + } + waitFor(interval); + } - // Check here for existing descriptor references, and add to the removal list if there are not any - if (!clusterReferencesExist(address, clusterName)) { - clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName); - continue; - } + log.stoppedClouderaManagerConfigMonitor(); + } - // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor - // start events, and check the configuration only of the restarted service(s) to identify changes - // that should trigger re-discovery. - final List<RelevantEvent> relevantEvents = getRelevantEvents(address, clusterName); + private void monitorClusterConfigurationChanges() { + try { + final List<String> clustersToStopMonitoring = new ArrayList<>(); + + for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) { + String address = entry.getKey(); + for (String clusterName : entry.getValue()) { + if (configCache.getDiscoveryConfig(address, clusterName) == null) { + log.noClusterConfiguration(clusterName, address); + continue; + } + log.checkingClusterConfiguration(clusterName, address); - // If there are no recent start events, then nothing to do now - if (!relevantEvents.isEmpty()) { - // If a change has occurred, notify the listeners - if (hasConfigChanged(address, clusterName, relevantEvents) || hasScaleEvent(relevantEvents)) { - notifyChangeListener(address, clusterName); - } - // these events should not be processed again even if the next CM query result contains them - relevantEvents.forEach(re -> processedEvents.put(re.auditEvent.getId(), 1L)); - } + // Check here for existing descriptor references, and add to the removal list if there are not any + if (!clusterReferencesExist(address, clusterName)) { + clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName); + continue; } - } - // Remove outdated entries from the cache - for (String fqcn : clustersToStopMonitoring) { - String[] parts = fqcn.split(FQCN_DELIM); - stopMonitoring(parts[0], parts[1]); + // Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor + // start events, and check the configuration only of the restarted service(s) to identify changes + // that should trigger re-discovery. + final List<RelevantEvent> relevantEvents = getRelevantEvents(address, clusterName); + + // If there are no recent start events, then nothing to do now + if (!relevantEvents.isEmpty()) { + // If a change has occurred, notify the listeners + if (hasConfigChanged(address, clusterName, relevantEvents) || hasScaleEvent(relevantEvents)) { + notifyChangeListener(address, clusterName); + } + // these events should not be processed again even if the next CM query result contains them + relevantEvents.forEach(re -> processedEvents.put(re.auditEvent.getId(), 1L)); + } } - clustersToStopMonitoring.clear(); // reset the removal list + } - } catch (Exception e) { - log.clouderaManagerConfigurationChangesMonitoringError(e); + // Remove outdated entries from the cache + for (String fqcn : clustersToStopMonitoring) { + String[] parts = fqcn.split(FQCN_DELIM); + stopMonitoring(parts[0], parts[1]); } - waitFor(interval); - } + clustersToStopMonitoring.clear(); // reset the removal list - log.stoppedClouderaManagerConfigMonitor(); + } catch (Exception e) { + log.clouderaManagerConfigurationChangesMonitoringError(e); + } } private boolean hasScaleEvent(List<RelevantEvent> relevantEvents) { @@ -372,6 +387,16 @@ public class PollingConfigurationAnalyzer implements Runnable { return ccms; } + private GatewayStatusService getGatewayStatusService() { + if (gatewayStatusService == null) { + final GatewayServices gatewayServices = GatewayServer.getGatewayServices(); + if (gatewayServices != null) { + gatewayStatusService = gatewayServices.getService(ServiceType.GATEWAY_STATUS_SERVICE); + } + } + return gatewayStatusService; + } + /** * Determine if any descriptors reference the specified discovery source and cluster. * diff --git a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java index d6dc186f4..50df250f0 100644 --- a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java +++ b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java @@ -27,11 +27,13 @@ import org.apache.knox.gateway.config.GatewayConfig; import org.apache.knox.gateway.services.GatewayServices; import org.apache.knox.gateway.services.ServiceType; import org.apache.knox.gateway.services.topology.TopologyService; +import org.apache.knox.gateway.services.topology.impl.GatewayStatusService; import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService; import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig; import org.apache.knox.gateway.topology.discovery.cm.model.hdfs.NameNodeServiceModelGenerator; import org.apache.knox.gateway.topology.discovery.cm.model.hive.HiveOnTezServiceModelGenerator; import org.easymock.EasyMock; +import org.junit.After; import org.junit.Test; import java.io.File; @@ -58,6 +60,11 @@ import static org.junit.Assert.assertTrue; public class PollingConfigurationAnalyzerTest { + @After + public void tearDown() { + setGatewayServices(null); + } + @Test(expected = IllegalArgumentException.class) public void testRestartEventWithWrongApiEventCategory() { doTestStartEvent(ApiEventCategory.LOG_EVENT); @@ -341,11 +348,16 @@ public class PollingConfigurationAnalyzerTest { return null; }).once(); + //GatewayStatusService mock + final GatewayStatusService gatewayStatusService = EasyMock.createNiceMock(GatewayStatusService.class); + EasyMock.expect(gatewayStatusService.status()).andReturn(Boolean.TRUE).anyTimes(); + // GatewayServices mock GatewayServices gws = EasyMock.createNiceMock(GatewayServices.class); EasyMock.expect(gws.getService(ServiceType.TOPOLOGY_SERVICE)).andReturn(ts).anyTimes(); EasyMock.expect(gws.getService(ServiceType.CLUSTER_CONFIGURATION_MONITOR_SERVICE)).andReturn(ccms).anyTimes(); - EasyMock.replay(ts, ccms, gws); + EasyMock.expect(gws.getService(ServiceType.GATEWAY_STATUS_SERVICE)).andReturn(gatewayStatusService).anyTimes(); + EasyMock.replay(ts, ccms, gatewayStatusService, gws); try { setGatewayServices(gws); @@ -413,6 +425,26 @@ public class PollingConfigurationAnalyzerTest { doTestEventWithConfigChange(revisionEvent, clusterName); } + @Test + public void shouldNotPerformClusterConfigurationChangeMonitoringIfKnoxGatewayIsNotYetReady() { + final String address = "http://host1:1234"; + final String clusterName = "Cluster 10"; + + // Simulate a successful restart waiting for staleness event with id = 123 + final ApiEvent rollingRestartEvent = createApiEvent(clusterName, HiveOnTezServiceModelGenerator.SERVICE_TYPE, HiveOnTezServiceModelGenerator.SERVICE, + PollingConfigurationAnalyzer.RESTART_WAITING_FOR_STALENESS_SUCCESS_COMMAND, PollingConfigurationAnalyzer.SUCCEEDED_STATUS, "EV_CLUSTER_RESTARTED", + "123"); + + final ChangeListener listener = new ChangeListener(); + final TestablePollingConfigAnalyzer pca = buildPollingConfigAnalyzer(address, clusterName, Collections.emptyMap(), listener, false); + + // this should NOT trigger a notification because the Knox Gateway is not yet + // ready (by GatewayStatusService.status()) + listener.clearNotification(); + doTestEvent(rollingRestartEvent, address, clusterName, Collections.emptyMap(), Collections.emptyMap(), pca); + assertFalse("Unexpected change notification", listener.wasNotified(address, clusterName)); + } + private void doTestStartEvent(final ApiEventCategory category) { final String clusterName = "My Cluster"; final String serviceType = NameNodeServiceModelGenerator.SERVICE_TYPE; @@ -472,6 +504,11 @@ public class PollingConfigurationAnalyzerTest { private TestablePollingConfigAnalyzer buildPollingConfigAnalyzer(final String address, final String clusterName, final Map<String, ServiceConfigurationModel> serviceConfigurationModels, ChangeListener listener) { + return buildPollingConfigAnalyzer(address, clusterName, serviceConfigurationModels, listener, true); + } + + private TestablePollingConfigAnalyzer buildPollingConfigAnalyzer(final String address, final String clusterName, + final Map<String, ServiceConfigurationModel> serviceConfigurationModels, ChangeListener listener, boolean isKnoxGatewayReady) { final GatewayConfig gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class); EasyMock.expect(gatewayConfig.getIncludedSSLCiphers()).andReturn(Collections.emptyList()).anyTimes(); EasyMock.expect(gatewayConfig.getIncludedSSLProtocols()).andReturn(Collections.emptySet()).anyTimes(); @@ -495,6 +532,19 @@ public class PollingConfigurationAnalyzerTest { EasyMock.expect(configCache.getClusterServiceConfigurations(address, clusterName)).andReturn(serviceConfigurationModels).anyTimes(); EasyMock.replay(configCache); + if (isKnoxGatewayReady) { + // GatewayStatusService mock + final GatewayStatusService gatewayStatusService = EasyMock.createNiceMock(GatewayStatusService.class); + EasyMock.expect(gatewayStatusService.status()).andReturn(Boolean.TRUE).anyTimes(); + + // GatewayServices mock + GatewayServices gws = EasyMock.createNiceMock(GatewayServices.class); + EasyMock.expect(gws.getService(ServiceType.GATEWAY_STATUS_SERVICE)).andReturn(gatewayStatusService).anyTimes(); + EasyMock.replay(gatewayStatusService, gws); + + setGatewayServices(gws); + } + return new TestablePollingConfigAnalyzer(gatewayConfig, configCache, listener); }