METRON-1442: Split rest end points for indexing topology into random access indexing and batch indexing this closes apache/incubator-metron#923
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fcff0596 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fcff0596 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fcff0596 Branch: refs/heads/feature/METRON-1344-test-infrastructure Commit: fcff0596c7d2b2546d89283fb90fbc8c10b31f1f Parents: 0630505 Author: MohanDV <mohan...@gmail.com> Authored: Mon Feb 5 09:48:47 2018 -0500 Committer: cstella <ceste...@gmail.com> Committed: Mon Feb 5 09:48:47 2018 -0500 ---------------------------------------------------------------------- .../src/main/config/rest_application.yml | 3 +- .../apache/metron/rest/MetronRestConstants.java | 6 +- .../metron/rest/controller/StormController.java | 81 +++++++++++---- .../metron/rest/service/StormAdminService.java | 4 +- .../service/impl/StormAdminServiceImpl.java | 8 +- .../rest/service/impl/StormCLIWrapper.java | 16 +-- .../src/main/resources/application-test.yml | 3 +- .../src/main/resources/application-vagrant.yml | 4 +- .../StormControllerIntegrationTest.java | 102 ++++++++++--------- .../rest/mock/MockStormCLIClientWrapper.java | 93 ++++++++++++----- .../metron/rest/mock/MockStormRestTemplate.java | 22 ++-- .../service/impl/StormAdminServiceImplTest.java | 8 +- .../rest/service/impl/StormCLIWrapperTest.java | 17 ++-- 13 files changed, 233 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/config/rest_application.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/config/rest_application.yml b/metron-interface/metron-rest/src/main/config/rest_application.yml index 6e4fb66..4cc51ff 100644 --- a/metron-interface/metron-rest/src/main/config/rest_application.yml +++ b/metron-interface/metron-rest/src/main/config/rest_application.yml @@ -46,7 +46,8 @@ storm: enrichment: script.path: ${METRON_HOME}/bin/start_enrichment_topology.sh indexing: - script.path: ${METRON_HOME}/bin/start_elasticsearch_topology.sh + randomaccess.script.path: ${METRON_HOME}/bin/start_elasticsearch_topology.sh + batch.script.path: ${METRON_HOME}/bin/start_hdfs_topology.sh kerberos: enabled: ${SECURITY_ENABLED} http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java index 4e8d7f2..f18d4cf 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java @@ -35,10 +35,12 @@ public class MetronRestConstants { public static final String TOPOLOGY_SUMMARY_URL = "/api/v1/topology/summary"; public static final String TOPOLOGY_URL = "/api/v1/topology"; public static final String ENRICHMENT_TOPOLOGY_NAME = "enrichment"; - public static final String INDEXING_TOPOLOGY_NAME = "indexing"; + public static final String BATCH_INDEXING_TOPOLOGY_NAME = "batch_indexing"; + public static final String RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME = "random_access_indexing"; public static final String PARSER_SCRIPT_PATH_SPRING_PROPERTY = "storm.parser.script.path"; public static final String ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY = "storm.enrichment.script.path"; - public static final String INDEXING_SCRIPT_PATH_SPRING_PROPERTY = "storm.indexing.script.path"; + public static final String BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY = "storm.indexing.batch.script.path"; + public static final String RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY = "storm.indexing.randomaccess.script.path"; public static final String PARSER_TOPOLOGY_OPTIONS_SPRING_PROPERTY = "storm.parser.topology.options"; public static final String KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY = "kafka.security.protocol"; http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java index 292c668..d1af1c5 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/StormController.java @@ -147,12 +147,12 @@ public class StormController { return new ResponseEntity<>(stormStatusService.deactivateTopology(MetronRestConstants.ENRICHMENT_TOPOLOGY_NAME), HttpStatus.OK); } - @ApiOperation(value = "Retrieves the status of the Storm indexing topology") + @ApiOperation(value = "Retrieves the status of the Storm random access indexing topology") @ApiResponses(value = { @ApiResponse(message = "Returns topology status information", code = 200), @ApiResponse(message = "Topology is missing", code = 404) }) - @RequestMapping(value = "/indexing", method = RequestMethod.GET) - ResponseEntity<TopologyStatus> getIndexing() throws RestException { - TopologyStatus topologyStatus = stormStatusService.getTopologyStatus(MetronRestConstants.INDEXING_TOPOLOGY_NAME); + @RequestMapping(value = "/indexing/randomaccess", method = RequestMethod.GET) + ResponseEntity<TopologyStatus> getRandomAccessIndexing() throws RestException { + TopologyStatus topologyStatus = stormStatusService.getTopologyStatus(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME); if (topologyStatus != null) { return new ResponseEntity<>(topologyStatus, HttpStatus.OK); } else { @@ -160,32 +160,32 @@ public class StormController { } } - @ApiOperation(value = "Starts a Storm indexing topology") + @ApiOperation(value = "Starts a Storm random access indexing topology") @ApiResponse(message = "Returns start response message", code = 200) - @RequestMapping(value = "/indexing/start", method = RequestMethod.GET) - ResponseEntity<TopologyResponse> startIndexing() throws RestException { - return new ResponseEntity<>(stormAdminService.startIndexingTopology(), HttpStatus.OK); + @RequestMapping(value = "/indexing/randomaccess/start", method = RequestMethod.GET) + ResponseEntity<TopologyResponse> startRandomAccessIndexing() throws RestException { + return new ResponseEntity<>(stormAdminService.startIndexingTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY), HttpStatus.OK); } - @ApiOperation(value = "Stops a Storm enrichment topology") + @ApiOperation(value = "Stops a Storm random access indexing topology") @ApiResponse(message = "Returns stop response message", code = 200) - @RequestMapping(value = "/indexing/stop", method = RequestMethod.GET) - ResponseEntity<TopologyResponse> stopIndexing(@ApiParam(name="stopNow", value="Stop the topology immediately")@RequestParam(required = false, defaultValue = "false") boolean stopNow) throws RestException { - return new ResponseEntity<>(stormAdminService.stopIndexingTopology(stopNow), HttpStatus.OK); + @RequestMapping(value = "/indexing/randomaccess/stop", method = RequestMethod.GET) + ResponseEntity<TopologyResponse> stopRandomAccessIndexing(@ApiParam(name="stopNow", value="Stop the topology immediately")@RequestParam(required = false, defaultValue = "false") boolean stopNow) throws RestException { + return new ResponseEntity<>(stormAdminService.stopIndexingTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME ,stopNow), HttpStatus.OK); } - @ApiOperation(value = "Activates a Storm indexing topology") + @ApiOperation(value = "Activates a Storm random access indexing topology") @ApiResponse(message = "Returns activate response message", code = 200) - @RequestMapping(value = "/indexing/activate", method = RequestMethod.GET) - ResponseEntity<TopologyResponse> activateIndexing() throws RestException { - return new ResponseEntity<>(stormStatusService.activateTopology(MetronRestConstants.INDEXING_TOPOLOGY_NAME), HttpStatus.OK); + @RequestMapping(value = "/indexing/randomaccess/activate", method = RequestMethod.GET) + ResponseEntity<TopologyResponse> activateRandomAccessIndexing() throws RestException { + return new ResponseEntity<>(stormStatusService.activateTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME), HttpStatus.OK); } - @ApiOperation(value = "Deactivates a Storm indexing topology") + @ApiOperation(value = "Deactivates a Storm random access indexing topology") @ApiResponse(message = "Returns deactivate response message", code = 200) - @RequestMapping(value = "/indexing/deactivate", method = RequestMethod.GET) - ResponseEntity<TopologyResponse> deactivateIndexing() throws RestException { - return new ResponseEntity<>(stormStatusService.deactivateTopology(MetronRestConstants.INDEXING_TOPOLOGY_NAME), HttpStatus.OK); + @RequestMapping(value = "/indexing/randomaccess/deactivate", method = RequestMethod.GET) + ResponseEntity<TopologyResponse> deactivateRandomAccessIndexing() throws RestException { + return new ResponseEntity<>(stormStatusService.deactivateTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME), HttpStatus.OK); } @ApiOperation(value = "Retrieves information about the Storm command line client") @@ -195,4 +195,45 @@ public class StormController { return new ResponseEntity<>(stormAdminService.getStormClientStatus(), HttpStatus.OK); } + @ApiOperation(value = "Retrieves the status of the Storm batch indexing topology") + @ApiResponses(value = { @ApiResponse(message = "Returns topology status information", code = 200), + @ApiResponse(message = "Topology is missing", code = 404) }) + @RequestMapping(value = "/indexing/batch", method = RequestMethod.GET) + ResponseEntity<TopologyStatus> getBatchIndexing() throws RestException { + TopologyStatus topologyStatus = stormStatusService.getTopologyStatus(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME); + if (topologyStatus != null) { + return new ResponseEntity<>(topologyStatus, HttpStatus.OK); + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); + } + } + + @ApiOperation(value = "Starts a Storm batch indexing topology") + @ApiResponse(message = "Returns start response message", code = 200) + @RequestMapping(value = "/indexing/batch/start", method = RequestMethod.GET) + ResponseEntity<TopologyResponse> startBatchIndexing() throws RestException { + return new ResponseEntity<>(stormAdminService.startIndexingTopology(MetronRestConstants.BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY), HttpStatus.OK); + } + + @ApiOperation(value = "Stops a Storm batch indexing topology") + @ApiResponse(message = "Returns stop response message", code = 200) + @RequestMapping(value = "/indexing/batch/stop", method = RequestMethod.GET) + ResponseEntity<TopologyResponse> stopBatchIndexing(@ApiParam(name="stopNow", value="Stop the topology immediately")@RequestParam(required = false, defaultValue = "false") boolean stopNow) throws RestException { + return new ResponseEntity<>(stormAdminService.stopIndexingTopology(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME, stopNow), HttpStatus.OK); + } + + @ApiOperation(value = "Activates a Storm batch indexing topology") + @ApiResponse(message = "Returns activate response message", code = 200) + @RequestMapping(value = "/indexing/batch/activate", method = RequestMethod.GET) + ResponseEntity<TopologyResponse> activateBatchIndexing() throws RestException { + return new ResponseEntity<>(stormStatusService.activateTopology(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME), HttpStatus.OK); + } + + @ApiOperation(value = "Deactivates a Storm batch indexing topology") + @ApiResponse(message = "Returns deactivate response message", code = 200) + @RequestMapping(value = "/indexing/batch/deactivate", method = RequestMethod.GET) + ResponseEntity<TopologyResponse> deactivateBatchIndexing() throws RestException { + return new ResponseEntity<>(stormStatusService.deactivateTopology(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME), HttpStatus.OK); + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java index 8c1e228..3f6f8ff 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/StormAdminService.java @@ -32,9 +32,9 @@ public interface StormAdminService { TopologyResponse stopEnrichmentTopology(boolean stopNow) throws RestException; - TopologyResponse startIndexingTopology() throws RestException; + TopologyResponse startIndexingTopology(String scriptPath) throws RestException; - TopologyResponse stopIndexingTopology(boolean stopNow) throws RestException; + TopologyResponse stopIndexingTopology(String name, boolean stopNow) throws RestException; Map<String, String> getStormClientStatus() throws RestException; } http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java index 9bd368f..40b01f1 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java @@ -74,13 +74,13 @@ public class StormAdminServiceImpl implements StormAdminService { } @Override - public TopologyResponse startIndexingTopology() throws RestException { - return createResponse(stormCLIClientWrapper.startIndexingTopology(), TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR); + public TopologyResponse startIndexingTopology(String scriptPath) throws RestException { + return createResponse(stormCLIClientWrapper.startIndexingTopology(scriptPath), TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR); } @Override - public TopologyResponse stopIndexingTopology(boolean stopNow) throws RestException { - return createResponse(stormCLIClientWrapper.stopIndexingTopology(stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR); + public TopologyResponse stopIndexingTopology(String name, boolean stopNow) throws RestException { + return createResponse(stormCLIClientWrapper.stopIndexingTopology(name, stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR); } private TopologyResponse createResponse(int responseCode, TopologyStatusCode successMessage, TopologyStatusCode errorMessage) { http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java index fff7390..26049dd 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java @@ -37,7 +37,6 @@ import java.util.Map; import static java.util.stream.Collectors.toList; import static org.apache.metron.rest.MetronRestConstants.ENRICHMENT_TOPOLOGY_NAME; -import static org.apache.metron.rest.MetronRestConstants.INDEXING_TOPOLOGY_NAME; public class StormCLIWrapper { @@ -70,14 +69,14 @@ public class StormCLIWrapper { return runCommand(getStopCommand(ENRICHMENT_TOPOLOGY_NAME, stopNow)); } - public int startIndexingTopology() throws RestException { + public int startIndexingTopology(String scriptPath) throws RestException { kinit(); - return runCommand(getIndexingStartCommand()); + return runCommand(getIndexingStartCommand(scriptPath)); } - public int stopIndexingTopology(boolean stopNow) throws RestException { + public int stopIndexingTopology(String name, boolean stopNow) throws RestException { kinit(); - return runCommand(getStopCommand(INDEXING_TOPOLOGY_NAME, stopNow)); + return runCommand(getStopCommand(name, stopNow)); } protected int runCommand(String[] command) throws RestException { @@ -137,9 +136,9 @@ public class StormCLIWrapper { return command; } - protected String[] getIndexingStartCommand() { + protected String[] getIndexingStartCommand(String scriptPath) { String[] command = new String[1]; - command[0] = environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY); + command[0] = environment.getProperty(scriptPath); return command; } @@ -166,7 +165,8 @@ public class StormCLIWrapper { Map<String, String> status = new HashMap<>(); status.put("parserScriptPath", environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY)); status.put("enrichmentScriptPath", environment.getProperty(MetronRestConstants.ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY)); - status.put("indexingScriptPath", environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY)); + status.put("randomAccessIndexingScriptPath", environment.getProperty(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)); + status.put("batchIndexingScriptPath", environment.getProperty(MetronRestConstants.BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)); status.put("stormClientVersionInstalled", stormClientVersionInstalled()); return status; } http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/resources/application-test.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/resources/application-test.yml b/metron-interface/metron-rest/src/main/resources/application-test.yml index 749dec4..3cca5e0 100644 --- a/metron-interface/metron-rest/src/main/resources/application-test.yml +++ b/metron-interface/metron-rest/src/main/resources/application-test.yml @@ -38,7 +38,8 @@ storm: enrichment: script.path: /usr/metron/${metron.version}/bin/start_enrichment_topology.sh indexing: - script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh + randomaccess.script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh + batch.script.path: /usr/metron/${metron.version}/bin/start_hdfs_topology.sh search: max: http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/main/resources/application-vagrant.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/resources/application-vagrant.yml b/metron-interface/metron-rest/src/main/resources/application-vagrant.yml index cf2c170..3eea24a 100644 --- a/metron-interface/metron-rest/src/main/resources/application-vagrant.yml +++ b/metron-interface/metron-rest/src/main/resources/application-vagrant.yml @@ -49,7 +49,9 @@ storm: enrichment: script.path: /usr/metron/${metron.version}/bin/start_enrichment_topology.sh indexing: - script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh + randomaccess.script.path: /usr/metron/${metron.version}/bin/start_elasticsearch_topology.sh + batch.script.path: /usr/metron/${metron.version}/bin/start_hdfs_topology.sh + kerberos: enabled: false http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java index 9a6022c..3986413 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java @@ -17,8 +17,10 @@ */ package org.apache.metron.rest.controller; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; import org.apache.metron.common.configuration.SensorParserConfig; import org.apache.metron.integration.utils.TestUtils; +import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.model.TopologyStatusCode; import org.apache.metron.rest.service.GlobalConfigService; import org.apache.metron.rest.service.SensorParserConfigService; @@ -33,6 +35,7 @@ import org.springframework.http.MediaType; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.ResultActions; import org.springframework.test.web.servlet.setup.MockMvcBuilders; import org.springframework.web.context.WebApplicationContext; @@ -287,66 +290,69 @@ public class StormControllerIntegrationTest { .andExpect(jsonPath("$.status").value("SUCCESS")) .andExpect(jsonPath("$.message").value(TopologyStatusCode.STOPPED.name())); - this.mockMvc.perform(get(stormUrl + "/indexing").with(httpBasic(user,password))) - .andExpect(status().isNotFound()); - - this.mockMvc.perform(get(stormUrl + "/indexing/activate").with(httpBasic(user,password))) - .andExpect(status().isOk()) - .andExpect(jsonPath("$.status").value("ERROR")) - .andExpect(jsonPath("$.message").value(TopologyStatusCode.TOPOLOGY_NOT_FOUND.name())); - - this.mockMvc.perform(get(stormUrl + "/indexing/deactivate").with(httpBasic(user,password))) - .andExpect(status().isOk()) - .andExpect(jsonPath("$.status").value("ERROR")) - .andExpect(jsonPath("$.message").value(TopologyStatusCode.TOPOLOGY_NOT_FOUND.name())); - - this.mockMvc.perform(get(stormUrl + "/indexing/stop?stopNow=true").with(httpBasic(user,password))) - .andExpect(status().isOk()) - .andExpect(jsonPath("$.status").value("ERROR")) - .andExpect(jsonPath("$.message").value(TopologyStatusCode.STOP_ERROR.toString())); - - this.mockMvc.perform(get(stormUrl + "/indexing/start").with(httpBasic(user,password))) - .andExpect(status().isOk()) - .andExpect(jsonPath("$.status").value("SUCCESS")) - .andExpect(jsonPath("$.message").value(TopologyStatusCode.STARTED.toString())); - - this.mockMvc.perform(get(stormUrl + "/indexing/deactivate").with(httpBasic(user,password))) - .andExpect(status().isOk()) - .andExpect(jsonPath("$.status").value("SUCCESS")) - .andExpect(jsonPath("$.message").value(TopologyStatusCode.INACTIVE.name())); - - this.mockMvc.perform(get(stormUrl + "/indexing/activate").with(httpBasic(user,password))) - .andExpect(status().isOk()) - .andExpect(jsonPath("$.status").value("SUCCESS")) - .andExpect(jsonPath("$.message").value(TopologyStatusCode.ACTIVE.name())); - - this.mockMvc.perform(get(stormUrl + "/indexing").with(httpBasic(user,password))) + for(String type : ImmutableList.of("randomaccess", "batch")) { + this.mockMvc.perform(get(stormUrl + "/indexing/" + type).with(httpBasic(user,password))) + .andExpect(status().isNotFound()); + this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/activate").with(httpBasic(user, password))) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.status").value("ERROR")) + .andExpect(jsonPath("$.message").value(TopologyStatusCode.TOPOLOGY_NOT_FOUND.name())); + + this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/deactivate").with(httpBasic(user, password))) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.status").value("ERROR")) + .andExpect(jsonPath("$.message").value(TopologyStatusCode.TOPOLOGY_NOT_FOUND.name())); + + this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/stop?stopNow=true").with(httpBasic(user, password))) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.status").value("ERROR")) + .andExpect(jsonPath("$.message").value(TopologyStatusCode.STOP_ERROR.toString())); + + this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/start").with(httpBasic(user, password))) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.status").value("SUCCESS")) + .andExpect(jsonPath("$.message").value(TopologyStatusCode.STARTED.toString())); + + ResultActions actions = this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/deactivate").with(httpBasic(user, password))); + actions.andExpect(status().isOk()) + .andExpect(jsonPath("$.status").value("SUCCESS")) + .andExpect(jsonPath("$.message").value(TopologyStatusCode.INACTIVE.name())); + + this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/activate").with(httpBasic(user, password))) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.status").value("SUCCESS")) + .andExpect(jsonPath("$.message").value(TopologyStatusCode.ACTIVE.name())); + String topologyName = type.equals("randomaccess")? MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME:MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME; + this.mockMvc.perform(get(stormUrl + "/indexing/" + type).with(httpBasic(user, password))) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.name").value(topologyName)) + .andExpect(jsonPath("$.id", containsString("indexing"))) + .andExpect(jsonPath("$.status").value("ACTIVE")) + .andExpect(jsonPath("$.latency").exists()) + .andExpect(jsonPath("$.throughput").exists()) + .andExpect(jsonPath("$.emitted").exists()) + .andExpect(jsonPath("$.acked").exists()); + this.mockMvc.perform(get(stormUrl).with(httpBasic(user,password))) .andExpect(status().isOk()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$.name").value("indexing")) - .andExpect(jsonPath("$.id", containsString("indexing"))) - .andExpect(jsonPath("$.status").value("ACTIVE")) - .andExpect(jsonPath("$.latency").exists()) - .andExpect(jsonPath("$.throughput").exists()) - .andExpect(jsonPath("$.emitted").exists()) - .andExpect(jsonPath("$.acked").exists()); + .andExpect(jsonPath("$[?(@.name == '" + topologyName + "' && @.status == 'ACTIVE')]").exists()); - this.mockMvc.perform(get(stormUrl).with(httpBasic(user,password))) - .andExpect(status().isOk()) - .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$[?(@.name == 'indexing' && @.status == 'ACTIVE')]").exists()); - - this.mockMvc.perform(get(stormUrl + "/indexing/stop").with(httpBasic(user,password))) + this.mockMvc.perform(get(stormUrl + "/indexing/" + type + "/stop").with(httpBasic(user,password))) .andExpect(status().isOk()) .andExpect(jsonPath("$.status").value("SUCCESS")) .andExpect(jsonPath("$.message").value(TopologyStatusCode.STOPPED.name())); + } + + this.mockMvc.perform(get(stormUrl + "/client/status").with(httpBasic(user,password))) .andExpect(status().isOk()) .andExpect(jsonPath("$.stormClientVersionInstalled").value("1.0.1")) .andExpect(jsonPath("$.parserScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_parser_topology.sh")) .andExpect(jsonPath("$.enrichmentScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_enrichment_topology.sh")) - .andExpect(jsonPath("$.indexingScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_elasticsearch_topology.sh")); + .andExpect(jsonPath("$.randomAccessIndexingScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_elasticsearch_topology.sh")) + .andExpect(jsonPath("$.batchIndexingScriptPath").value("/usr/metron/" + metronVersion + "/bin/start_hdfs_topology.sh")); globalConfigService.delete(); sensorParserConfigService.delete("broTest"); http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java index dd21095..9018935 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormCLIClientWrapper.java @@ -17,6 +17,7 @@ */ package org.apache.metron.rest.mock; +import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.TopologyStatusCode; import org.apache.metron.rest.service.impl.StormCLIWrapper; @@ -29,7 +30,8 @@ public class MockStormCLIClientWrapper extends StormCLIWrapper { private final Map<String, TopologyStatusCode> parsersStatus = new HashMap<>(); private TopologyStatusCode enrichmentStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND; - private TopologyStatusCode indexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND; + private TopologyStatusCode randomAccessIndexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND; + private TopologyStatusCode batchIndexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND; public Set<String> getParserTopologyNames() { return parsersStatus.keySet(); @@ -128,45 +130,84 @@ public class MockStormCLIClientWrapper extends StormCLIWrapper { } } - public TopologyStatusCode getIndexingStatus() { - return indexingStatus; + public TopologyStatusCode getIndexingStatus(String name) { + return name.equals(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME)?batchIndexingStatus:randomAccessIndexingStatus; } @Override - public int startIndexingTopology() throws RestException { - if (indexingStatus == TopologyStatusCode.TOPOLOGY_NOT_FOUND) { - indexingStatus = TopologyStatusCode.ACTIVE; - return 0; - } else { - return 1; + public int startIndexingTopology(String scriptPath) throws RestException { + if(scriptPath.equals(MetronRestConstants.BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)) { + if (batchIndexingStatus == TopologyStatusCode.TOPOLOGY_NOT_FOUND) { + batchIndexingStatus = TopologyStatusCode.ACTIVE; + return 0; + } else { + return 1; + } + } + else { + if (randomAccessIndexingStatus == TopologyStatusCode.TOPOLOGY_NOT_FOUND) { + randomAccessIndexingStatus = TopologyStatusCode.ACTIVE; + return 0; + } else { + return 1; + } } } @Override - public int stopIndexingTopology(boolean stopNow) throws RestException { - if (indexingStatus == TopologyStatusCode.ACTIVE) { - indexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND; - return 0; - } else { - return 1; + public int stopIndexingTopology(String name, boolean stopNow) throws RestException { + if(name.equals(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME)) { + if (batchIndexingStatus == TopologyStatusCode.ACTIVE) { + batchIndexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND; + return 0; + } else { + return 1; + } + } + else { + if (randomAccessIndexingStatus == TopologyStatusCode.ACTIVE) { + randomAccessIndexingStatus = TopologyStatusCode.TOPOLOGY_NOT_FOUND; + return 0; + } else { + return 1; + } } } - public int activateIndexingTopology() { - if (indexingStatus == TopologyStatusCode.INACTIVE || indexingStatus == TopologyStatusCode.ACTIVE) { - indexingStatus = TopologyStatusCode.ACTIVE; - return 0; - } else { - return 1; + public int activateIndexingTopology(String name) { + if(name.equals(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME)) { + if (batchIndexingStatus == TopologyStatusCode.INACTIVE || batchIndexingStatus == TopologyStatusCode.ACTIVE) { + batchIndexingStatus = TopologyStatusCode.ACTIVE; + return 0; + } else { + return 1; + } + } + else { + if (randomAccessIndexingStatus == TopologyStatusCode.INACTIVE || randomAccessIndexingStatus == TopologyStatusCode.ACTIVE) { + randomAccessIndexingStatus = TopologyStatusCode.ACTIVE; + return 0; + } else { + return 1; + } } } - public int deactivateIndexingTopology() { - if (indexingStatus == TopologyStatusCode.INACTIVE || indexingStatus == TopologyStatusCode.ACTIVE) { - indexingStatus = TopologyStatusCode.INACTIVE; - return 0; + public int deactivateIndexingTopology(String name) { + if (name.equals(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME)) { + if (batchIndexingStatus == TopologyStatusCode.INACTIVE || batchIndexingStatus == TopologyStatusCode.ACTIVE) { + batchIndexingStatus = TopologyStatusCode.INACTIVE; + return 0; + } else { + return 1; + } } else { - return 1; + if (randomAccessIndexingStatus == TopologyStatusCode.INACTIVE || randomAccessIndexingStatus == TopologyStatusCode.ACTIVE) { + randomAccessIndexingStatus = TopologyStatusCode.INACTIVE; + return 0; + } else { + return 1; + } } } http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java index ccf993d..ef47ac9 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockStormRestTemplate.java @@ -53,9 +53,13 @@ public class MockStormRestTemplate extends RestTemplate { if (enrichmentStatus != TopologyStatusCode.TOPOLOGY_NOT_FOUND) { topologyStatusList.add(getTopologyStatus("enrichment")); } - TopologyStatusCode indexingStatus = mockStormCLIClientWrapper.getIndexingStatus(); - if (indexingStatus != TopologyStatusCode.TOPOLOGY_NOT_FOUND) { - topologyStatusList.add(getTopologyStatus("indexing")); + TopologyStatusCode batchIndexingStatus = mockStormCLIClientWrapper.getIndexingStatus(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME); + if (batchIndexingStatus != TopologyStatusCode.TOPOLOGY_NOT_FOUND) { + topologyStatusList.add(getTopologyStatus(MetronRestConstants.BATCH_INDEXING_TOPOLOGY_NAME)); + } + TopologyStatusCode randomIndexingStatus = mockStormCLIClientWrapper.getIndexingStatus(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME); + if (randomIndexingStatus != TopologyStatusCode.TOPOLOGY_NOT_FOUND) { + topologyStatusList.add(getTopologyStatus(MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME)); } topologySummary.setTopologies(topologyStatusList.toArray(new TopologyStatus[topologyStatusList.size()])); response = topologySummary; @@ -79,8 +83,8 @@ public class MockStormRestTemplate extends RestTemplate { topologyStatus.setId(name + "-id"); if ("enrichment".equals(name)) { topologyStatus.setStatus(mockStormCLIClientWrapper.getEnrichmentStatus()); - } else if ("indexing".equals(name)) { - topologyStatus.setStatus(mockStormCLIClientWrapper.getIndexingStatus()); + } else if (name.contains("indexing")) { + topologyStatus.setStatus(mockStormCLIClientWrapper.getIndexingStatus(name)); } else { topologyStatus.setStatus(mockStormCLIClientWrapper.getParserStatus(name)); } @@ -97,16 +101,16 @@ public class MockStormRestTemplate extends RestTemplate { if (action.equals("activate")) { if (name.equals("enrichment")) { returnCode = mockStormCLIClientWrapper.activateEnrichmentTopology(); - } else if (name.equals("indexing")) { - returnCode = mockStormCLIClientWrapper.activateIndexingTopology(); + } else if (name.contains("indexing")) { + returnCode = mockStormCLIClientWrapper.activateIndexingTopology(name); } else { returnCode = mockStormCLIClientWrapper.activateParserTopology(name); } } else if (action.equals("deactivate")){ if (name.equals("enrichment")) { returnCode = mockStormCLIClientWrapper.deactivateEnrichmentTopology(); - } else if (name.equals("indexing")) { - returnCode = mockStormCLIClientWrapper.deactivateIndexingTopology(); + } else if (name.contains("indexing")) { + returnCode = mockStormCLIClientWrapper.deactivateIndexingTopology(name); } else { returnCode = mockStormCLIClientWrapper.deactivateParserTopology(name); } http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java index d83a74c..65a1bda 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java @@ -122,22 +122,22 @@ public class StormAdminServiceImplTest { @Test public void startIndexingTopologyShouldProperlyReturnSuccessTopologyResponse() throws Exception { - when(stormCLIClientWrapper.startIndexingTopology()).thenReturn(0); + when(stormCLIClientWrapper.startIndexingTopology("random_access_indexing_script_path")).thenReturn(0); TopologyResponse expected = new TopologyResponse(); expected.setSuccessMessage(TopologyStatusCode.STARTED.toString()); - assertEquals(expected, stormAdminService.startIndexingTopology()); + assertEquals(expected, stormAdminService.startIndexingTopology("random_access_indexing_script_path")); } @Test public void stopIndexingTopologyShouldProperlyReturnSuccessTopologyResponse() throws Exception { - when(stormCLIClientWrapper.stopIndexingTopology(false)).thenReturn(0); + when(stormCLIClientWrapper.stopIndexingTopology("random_access_indexing", false)).thenReturn(0); TopologyResponse expected = new TopologyResponse(); expected.setSuccessMessage(TopologyStatusCode.STOPPED.toString()); - assertEquals(expected, stormAdminService.stopIndexingTopology(false)); + assertEquals(expected, stormAdminService.stopIndexingTopology("random_access_indexing",false)); } @Test http://git-wip-us.apache.org/repos/asf/metron/blob/fcff0596/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java index 73d54d8..60a9790 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormCLIWrapperTest.java @@ -174,11 +174,11 @@ public class StormCLIWrapperTest { whenNew(ProcessBuilder.class).withParameterTypes(String[].class).withArguments(anyVararg()).thenReturn(processBuilder); when(processBuilder.start()).thenReturn(process); - when(environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_indexing"); + when(environment.getProperty(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_indexing"); when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(false); when(process.exitValue()).thenReturn(0); - assertEquals(0, stormCLIWrapper.startIndexingTopology()); + assertEquals(0, stormCLIWrapper.startIndexingTopology(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)); verify(process).waitFor(); verifyNew(ProcessBuilder.class).withArguments("/start_indexing"); @@ -192,9 +192,9 @@ public class StormCLIWrapperTest { when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(false); when(process.exitValue()).thenReturn(0); - assertEquals(0, stormCLIWrapper.stopIndexingTopology(false)); + assertEquals(0, stormCLIWrapper.stopIndexingTopology("random_access_indexing", false)); verify(process).waitFor(); - verifyNew(ProcessBuilder.class).withArguments("storm", "kill", MetronRestConstants.INDEXING_TOPOLOGY_NAME); + verifyNew(ProcessBuilder.class).withArguments("storm", "kill", MetronRestConstants.RANDOM_ACCESS_INDEXING_TOPOLOGY_NAME); } @Test @@ -209,15 +209,16 @@ public class StormCLIWrapperTest { when(process.getInputStream()).thenReturn(inputStream); when(environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_parser"); when(environment.getProperty(MetronRestConstants.ENRICHMENT_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_enrichment"); - when(environment.getProperty(MetronRestConstants.INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_indexing"); - + when(environment.getProperty(MetronRestConstants.RANDOM_ACCESS_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_elasticsearch"); + when(environment.getProperty(MetronRestConstants.BATCH_INDEXING_SCRIPT_PATH_SPRING_PROPERTY)).thenReturn("/start_hdfs"); Map<String, String> actual = stormCLIWrapper.getStormClientStatus(); assertEquals(new HashMap<String, String>() {{ - put("parserScriptPath", "/start_parser"); + put("randomAccessIndexingScriptPath", "/start_elasticsearch"); put("enrichmentScriptPath", "/start_enrichment"); - put("indexingScriptPath", "/start_indexing"); put("stormClientVersionInstalled", "1.1"); + put("parserScriptPath", "/start_parser"); + put("batchIndexingScriptPath", "/start_hdfs"); }}, actual); verifyNew(ProcessBuilder.class).withArguments("storm", "version");