Repository: incubator-eagle Updated Branches: refs/heads/master 2bc25969a -> f108c7f45
[EAGLE-690] integrage topology health check with alert engine integrage topology health check with alert engine Author: yupu <y...@ebay.com> Closes #570 from puyulu/master. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f108c7f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f108c7f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f108c7f4 Branch: refs/heads/master Commit: f108c7f456a02d92c70479895f61d3ab724745f8 Parents: 2bc2596 Author: yupu <y...@ebay.com> Authored: Thu Nov 3 14:47:28 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Thu Nov 3 14:47:28 2016 +0800 ---------------------------------------------------------------------- .../apache/eagle/topology/TopologyCheckApp.java | 17 +++ .../eagle/topology/TopologyCheckAppConfig.java | 8 +- .../topology/extractor/TopologyCrawler.java | 3 +- .../extractor/TopologyEntityParser.java | 6 +- .../extractor/TopologyEntityParserResult.java | 2 +- .../hdfs/HdfsTopologyEntityParser.java | 14 +- .../topology/extractor/mr/YarnNodeInfo.java | 21 ++- .../extractor/mr/YarnNodeInfoWrapper.java | 2 +- .../topology/extractor/mr/YarnNodeInfos.java | 2 +- .../topology/resolver/TopologyRackResolver.java | 2 +- .../impl/IPMaskTopologyRackResolver.java | 10 +- .../topology/storm/HealthCheckParseBolt.java | 73 ++++++++++ .../topology/storm/TopologyDataPersistBolt.java | 72 +++++++++- .../topology/utils/EntityBuilderHelper.java | 18 +-- .../eagle/topology/utils/JMXQueryHelper.java | 6 +- .../utils/ServiceNotResponseException.java | 11 +- .../eagle/topology/utils/StringUtils.java | 45 ++++++ ....eagle.topology.TopologyCheckAppProvider.xml | 142 ++++++++++++------- .../src/main/resources/application.conf | 33 +++-- .../entity/HBaseServiceTopologyAPIEntity.java | 4 +- .../entity/HdfsServiceTopologyAPIEntity.java | 11 +- .../entity/HealthCheckParseAPIEntity.java | 67 +++++++++ .../entity/JournalNodeServiceAPIEntity.java | 4 +- .../entity/MRServiceTopologyAPIEntity.java | 11 +- .../entity/TopologyEntityRepository.java | 2 +- eagle-topology-check/pom.xml | 12 ++ 26 files changed, 474 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java index 95bcb4d..93a06f8 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java @@ -23,16 +23,25 @@ import backtype.storm.topology.TopologyBuilder; import com.typesafe.config.Config; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.app.sink.StormStreamSink; +import org.apache.eagle.topology.storm.HealthCheckParseBolt; import org.apache.eagle.topology.storm.TopologyCheckAppSpout; import org.apache.eagle.topology.storm.TopologyDataPersistBolt; public class TopologyCheckApp extends StormApplication { + + private static final String SINK_TASK_NUM = "topology.numOfSinkTasks"; + private static final String TOPOLOGY_HEALTH_CHECK_STREAM = "topology_health_check_stream"; + @Override public StormTopology execute(Config config, StormEnvironment environment) { TopologyCheckAppConfig topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config); String spoutName = TopologyCheckAppConfig.TOPOLOGY_DATA_FETCH_SPOUT_NAME; String persistBoltName = TopologyCheckAppConfig.TOPOLOGY_ENTITY_PERSIST_BOLT_NAME; + String parseBoltName = TopologyCheckAppConfig.PARSE_BOLT_NAME; + String kafkaSinkBoltName = TopologyCheckAppConfig.SINK_BOLT_NAME; + int numOfSinkTasks = config.getInt(SINK_TASK_NUM); TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout( @@ -47,6 +56,14 @@ public class TopologyCheckApp extends StormApplication { topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt ).setNumTasks(topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt).shuffleGrouping(spoutName); + topologyBuilder.setBolt( + parseBoltName, + new HealthCheckParseBolt(), + topologyCheckAppConfig.dataExtractorConfig.numEntityPersistBolt).shuffleGrouping(persistBoltName); + + StormStreamSink<?> sinkBolt = environment.getStreamSink(TOPOLOGY_HEALTH_CHECK_STREAM, config); + topologyBuilder.setBolt(kafkaSinkBoltName, sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks).shuffleGrouping(parseBoltName); + return topologyBuilder.createTopology(); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java index 409a87f..0b7cb3d 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java @@ -32,6 +32,8 @@ public class TopologyCheckAppConfig implements Serializable { public static final String TOPOLOGY_DATA_FETCH_SPOUT_NAME = "topologyDataFetcherSpout"; public static final String TOPOLOGY_ENTITY_PERSIST_BOLT_NAME = "topologyEntityPersistBolt"; + public static final String PARSE_BOLT_NAME = "parserBolt"; + public static final String SINK_BOLT_NAME = "sinkBolt"; private static final int MAX_NUM_THREADS = 10; private static final String HBASE_ZOOKEEPER_CLIENT_PORT = "2181"; @@ -98,7 +100,7 @@ public class TopologyCheckAppConfig implements Serializable { if (config.hasPath("dataSourceConfig.mr")) { topologyTypes.add(TopologyConstants.TopologyType.MR); mrConfig = new MRConfig(); - mrConfig.rmUrls = config.getString("dataSourceConfig.mr.rmUrl").split(",\\s*"); + mrConfig.rmUrls = config.getString("dataSourceConfig.mr.rmUrl").split(",\\s*"); mrConfig.historyServerUrl = getOptionalConfig("dataSourceConfig.mr.historyServerUrl", null); } @@ -129,12 +131,12 @@ public class TopologyCheckAppConfig implements Serializable { } public static class MRConfig implements Serializable { - public String [] rmUrls; + public String[] rmUrls; public String historyServerUrl; } public static class HdfsConfig implements Serializable { - public String [] namenodeUrls; + public String[] namenodeUrls; } private String getOptionalConfig(String key, String defaultValue) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java index 55eba15..2451db6 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyCrawler.java @@ -21,7 +21,8 @@ package org.apache.eagle.topology.extractor; public interface TopologyCrawler { /** - * Fetch raw data and emit the parsed result to the next bolt + * Fetch raw data and emit the parsed result to the next bolt. + * * @return */ void extract(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java index 6a860ad..64c9bd4 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParser.java @@ -24,19 +24,19 @@ import java.io.IOException; public interface TopologyEntityParser { /** - * Parse hadoop topology and return the topology entity results + * Parse hadoop topology and return the topology entity results. * @return the topology entity result */ public TopologyEntityParserResult parse(long timestamp) throws IOException; /** - * Get topology type for the parser + * Get topology type for the parser. * @return topology type */ public TopologyConstants.TopologyType getTopologyType(); /** - * Get hadoop version for the parser + * Get hadoop version for the parser. * @return */ public TopologyConstants.HadoopVersion getHadoopVersion(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java index f5746f7..1799054 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/TopologyEntityParserResult.java @@ -21,7 +21,6 @@ package org.apache.eagle.topology.extractor; import org.apache.eagle.log.entity.GenericMetricEntity; import org.apache.eagle.topology.TopologyConstants; import org.apache.eagle.topology.entity.TopologyBaseAPIEntity; - import java.util.ArrayList; import java.util.List; @@ -58,6 +57,7 @@ public class TopologyEntityParserResult { public TopologyConstants.HadoopVersion getVersion() { return version; } + public void setVersion(TopologyConstants.HadoopVersion version) { this.version = version; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java index 1cec474..c35ed18 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/hdfs/HdfsTopologyEntityParser.java @@ -47,7 +47,7 @@ import static org.apache.eagle.topology.TopologyConstants.RACK_TAG; public class HdfsTopologyEntityParser implements TopologyEntityParser { private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HdfsTopologyEntityParser.class); - private String [] namenodeUrls; + private String[] namenodeUrls; private String site; private TopologyRackResolver rackResolver; @@ -78,7 +78,7 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser { private static final String STATUS_PATTERN = "([\\d\\.]+):\\d+\\s+\\([\\D]+(\\d+)\\)"; private static final String QJM_PATTERN = "([\\d\\.]+):\\d+"; - + private static final double TB = 1024 * 1024 * 1024 * 1024; public HdfsTopologyEntityParser(String site, TopologyCheckAppConfig.HdfsConfig hdfsConfig, TopologyRackResolver rackResolver) { @@ -118,9 +118,9 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser { if (bean == null || bean.getPropertyMap() == null) { throw new ServiceNotResponseException("Invalid JMX format, FSNamesystem bean is null!"); } - final String hostname = (String)bean.getPropertyMap().get(HA_NAME); + final String hostname = (String) bean.getPropertyMap().get(HA_NAME); HdfsServiceTopologyAPIEntity result = createHdfsServiceEntity(TopologyConstants.NAME_NODE_ROLE, hostname, updateTime); - final String state = (String)bean.getPropertyMap().get(HA_STATE); + final String state = (String) bean.getPropertyMap().get(HA_STATE); result.setStatus(state); final Double configuredCapacityGB = (Double) bean.getPropertyMap().get(CAPACITY_TOTAL_GB); result.setConfiguredCapacityTB(Double.toString(configuredCapacityGB / 1024)); @@ -155,9 +155,9 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser { JSONObject jsonMap = (JSONObject) jsonArray.get(0); Map<String, HdfsServiceTopologyAPIEntity> journalNodesMap = new HashMap<>(); - String QJM = jsonMap.getString("manager"); + String manager = jsonMap.getString("manager"); Pattern qjm = Pattern.compile(QJM_PATTERN); - Matcher jpmMatcher = qjm.matcher(QJM); + Matcher jpmMatcher = qjm.matcher(manager); while (jpmMatcher.find()) { String ip = jpmMatcher.group(1); String hostname = EntityBuilderHelper.resolveHostByIp(ip); @@ -196,7 +196,7 @@ public class HdfsTopologyEntityParser implements TopologyEntityParser { int numDeadDecommNodes = 0; String deadNodesStrings = (String) bean.getPropertyMap().get(DEAD_NODES); - JSONTokener tokener = new JSONTokener(deadNodesStrings); + JSONTokener tokener = new JSONTokener(deadNodesStrings); JSONObject jsonNodesObject = new JSONObject(tokener); final JSONArray deadNodes = jsonNodesObject.names(); for (int i = 0; deadNodes != null && i < deadNodes.length(); ++i) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java index 9315f78..4ca2b06 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfo.java @@ -21,7 +21,7 @@ package org.apache.eagle.topology.extractor.mr; import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) public class YarnNodeInfo { @@ -40,60 +40,79 @@ public class YarnNodeInfo { public String getRack() { return rack; } + public void setRack(String rack) { this.rack = rack; } + public String getState() { return state; } + public void setState(String state) { this.state = state; } + public String getId() { return id; } + public void setId(String id) { this.id = id; } + public String getNodeHostName() { return nodeHostName; } + public void setNodeHostName(String nodeHostName) { this.nodeHostName = nodeHostName; } + public String getNodeHTTPAddress() { return nodeHTTPAddress; } + public void setNodeHTTPAddress(String nodeHTTPAddress) { this.nodeHTTPAddress = nodeHTTPAddress; } + public String getLastHealthUpdate() { return lastHealthUpdate; } + public void setLastHealthUpdate(String lastHealthUpdate) { this.lastHealthUpdate = lastHealthUpdate; } + public String getHealthReport() { return healthReport; } + public void setHealthReport(String healthReport) { this.healthReport = healthReport; } + public String getNumContainers() { return numContainers; } + public void setNumContainers(String numContainers) { this.numContainers = numContainers; } + public String getUsedMemoryMB() { return usedMemoryMB; } + public void setUsedMemoryMB(String usedMemoryMB) { this.usedMemoryMB = usedMemoryMB; } + public String getAvailMemoryMB() { return availMemoryMB; } + public void setAvailMemoryMB(String availMemoryMB) { this.availMemoryMB = availMemoryMB; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java index 83d8d7f..8079e49 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfoWrapper.java @@ -21,7 +21,7 @@ package org.apache.eagle.topology.extractor.mr; import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) public class YarnNodeInfoWrapper { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java index d715a1e..536edd8 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/extractor/mr/YarnNodeInfos.java @@ -24,7 +24,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; import java.util.List; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) public class YarnNodeInfos { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java index ab7b3cc..c4b4976 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/TopologyRackResolver.java @@ -21,7 +21,7 @@ package org.apache.eagle.topology.resolver; public interface TopologyRackResolver { /** - *resolve rack by hostname + *resolve rack by hostname. * @return rack name */ String resolve(String hostname); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java index df0f863..821de3c 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/resolver/impl/IPMaskTopologyRackResolver.java @@ -24,19 +24,19 @@ import java.net.InetAddress; import java.net.UnknownHostException; /** - * resolve rack by hostname + * resolve rack by hostname. */ public class IPMaskTopologyRackResolver implements TopologyRackResolver { - private final int DEFAULT_RACK_POS = 2; + private final int pos = 2; private int rackPos; public IPMaskTopologyRackResolver() { - this.rackPos = DEFAULT_RACK_POS; + this.rackPos = pos; } public IPMaskTopologyRackResolver(int rackPos) { - this.rackPos = (rackPos > 3 || rackPos < 0) ? DEFAULT_RACK_POS : rackPos; + this.rackPos = (rackPos > 3 || rackPos < 0) ? pos : rackPos; } @Override @@ -44,7 +44,7 @@ public class IPMaskTopologyRackResolver implements TopologyRackResolver { String result = null; try { InetAddress address = InetAddress.getByName(hostname); - result = "rack" + (int)(address.getAddress()[rackPos] & 0xff); + result = "rack" + (int) (address.getAddress()[rackPos] & 0xff); } catch (UnknownHostException e) { //e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/HealthCheckParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/HealthCheckParseBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/HealthCheckParseBolt.java new file mode 100644 index 0000000..a0b282b --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/HealthCheckParseBolt.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.eagle.topology.storm; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import org.apache.eagle.topology.entity.HealthCheckParseAPIEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; + +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; + +public class HealthCheckParseBolt extends BaseRichBolt { + + private static Logger LOG = LoggerFactory.getLogger(HealthCheckParseBolt.class); + private OutputCollector collector; + + @Override + public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { + this.collector = outputCollector; + } + + @Override + public void execute(Tuple tuple) { + HealthCheckParseAPIEntity result = null; + try { + result = (HealthCheckParseAPIEntity) tuple.getValueByField("f1"); + Map<String, Object> map = new TreeMap<>(); + map.put("status", result.getStatus()); + map.put("timestamp", result.getTimeStamp()); + map.put("role", result.getRole()); + map.put("host", result.getHost()); + map.put("site", result.getSite()); + + if (LOG.isDebugEnabled()) { + LOG.debug("emitted " + map); + } + collector.emit(Arrays.asList(result.getHost(), map)); + } catch (Exception ex) { + LOG.error("Failing parse security log message, and ignore this message", ex); + } finally { + collector.ack(tuple); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY, FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java index 9b0eb82..e0ff3cd 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/storm/TopologyDataPersistBolt.java @@ -18,11 +18,20 @@ package org.apache.eagle.topology.storm; +import static org.apache.eagle.topology.TopologyConstants.HOSTNAME_TAG; +import static org.apache.eagle.topology.TopologyConstants.RACK_TAG; +import static org.apache.eagle.topology.TopologyConstants.ROLE_TAG; +import static org.apache.eagle.topology.TopologyConstants.SITE_TAG; + import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + + import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.service.client.EagleServiceClientException; @@ -32,6 +41,10 @@ import org.apache.eagle.service.client.impl.EagleServiceClientImpl; import org.apache.eagle.topology.TopologyCheckAppConfig; import org.apache.eagle.topology.TopologyConstants; import org.apache.eagle.topology.extractor.TopologyEntityParserResult; +import org.apache.eagle.topology.entity.HBaseServiceTopologyAPIEntity; +import org.apache.eagle.topology.entity.HdfsServiceTopologyAPIEntity; +import org.apache.eagle.topology.entity.HealthCheckParseAPIEntity; +import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity; import org.apache.eagle.topology.entity.TopologyBaseAPIEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +66,8 @@ public class TopologyDataPersistBolt extends BaseRichBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig())); + this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig().getString("service.host"), this.config.getConfig().getInt("service.port"), + this.config.getConfig().getString("service.username"), this.config.getConfig().getString("service.password"))); this.collector = collector; } @@ -84,9 +98,10 @@ public class TopologyDataPersistBolt extends BaseRichBolt { deleteEntities(entitiesForDeletion, serviceName); writeEntities(entitiesToWrite, serviceName); writeEntities(result.getMetrics(), serviceName); + emitToKafkaBolt(result); this.collector.ack(input); } catch (Exception e) { - e.printStackTrace(); + LOG.error(e.getMessage(), e); this.collector.fail(input); } } @@ -100,7 +115,7 @@ public class TopologyDataPersistBolt extends BaseRichBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - + declarer.declare(new Fields("f1")); } private void deleteEntities(List<TopologyBaseAPIEntity> entities, String serviceName) { @@ -112,9 +127,9 @@ public class TopologyDataPersistBolt extends BaseRichBolt { LOG.info("Successfully delete {} entities for {}", entities.size(), serviceName); } } catch (EagleServiceClientException e) { - e.printStackTrace(); + LOG.error(e.getMessage(), e); } catch (IOException e) { - e.printStackTrace(); + LOG.error(e.getMessage(), e); } entities.clear(); } @@ -135,8 +150,51 @@ public class TopologyDataPersistBolt extends BaseRichBolt { private String generateKey(TopologyBaseAPIEntity entity) { return String.format("%s-%s-%s-%s", entity.getTags().get(TopologyConstants.SITE_TAG), - entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG), - entity.getTags().get(TopologyConstants.ROLE_TAG)); + entity.getTags().get(TopologyConstants.RACK_TAG), entity.getTags().get(TopologyConstants.HOSTNAME_TAG), + entity.getTags().get(TopologyConstants.ROLE_TAG)); } + private void emitToKafkaBolt(TopologyEntityParserResult result) { + + List<HealthCheckParseAPIEntity> healthCheckParseAPIList = new ArrayList<HealthCheckParseAPIEntity>(); + + setNodeInfo(result.getMasterNodes(), healthCheckParseAPIList); + + setNodeInfo(result.getSlaveNodes(), healthCheckParseAPIList); + + for (HealthCheckParseAPIEntity healthCheckAPIEntity : healthCheckParseAPIList) { + this.collector.emit(new Values(healthCheckAPIEntity)); + } + + } + + private void setNodeInfo(List<TopologyBaseAPIEntity> topologyBaseAPIList, List<HealthCheckParseAPIEntity> healthCheckParseAPIList) { + HealthCheckParseAPIEntity healthCheckAPIEntity = null; + for (Iterator<TopologyBaseAPIEntity> iterator = topologyBaseAPIList.iterator(); iterator.hasNext(); ) { + + healthCheckAPIEntity = new HealthCheckParseAPIEntity(); + TopologyBaseAPIEntity topologyBaseAPIEntity = iterator.next(); + + if (topologyBaseAPIEntity instanceof HBaseServiceTopologyAPIEntity) { + + healthCheckAPIEntity.setStatus(((HBaseServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus()); + + } + if (topologyBaseAPIEntity instanceof HdfsServiceTopologyAPIEntity) { + + healthCheckAPIEntity.setStatus(((HdfsServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus()); + } + + if (topologyBaseAPIEntity instanceof MRServiceTopologyAPIEntity) { + + healthCheckAPIEntity.setStatus(((MRServiceTopologyAPIEntity) topologyBaseAPIEntity).getStatus()); + } + + healthCheckAPIEntity.setTimeStamp(topologyBaseAPIEntity.getTimestamp()); + healthCheckAPIEntity.setHost(topologyBaseAPIEntity.getTags().get(HOSTNAME_TAG)); + healthCheckAPIEntity.setRole(topologyBaseAPIEntity.getTags().get(ROLE_TAG)); + healthCheckAPIEntity.setSite(topologyBaseAPIEntity.getTags().get(SITE_TAG)); + healthCheckParseAPIList.add(healthCheckAPIEntity); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java index 9bad73b..f390fa8 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/EntityBuilderHelper.java @@ -21,16 +21,12 @@ package org.apache.eagle.topology.utils; import org.apache.commons.lang.StringUtils; import org.apache.eagle.log.entity.GenericMetricEntity; import org.apache.eagle.topology.TopologyConstants; -import org.apache.eagle.topology.entity.MRServiceTopologyAPIEntity; -import org.apache.eagle.topology.entity.TopologyBaseAPIEntity; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; -import static org.apache.eagle.topology.TopologyConstants.*; - public class EntityBuilderHelper { public static String resolveHostByIp(String ip) { @@ -48,7 +44,7 @@ public class EntityBuilderHelper { metricEntity.setTimestamp(timestamp); metricEntity.setTags(tags); metricEntity.setPrefix(metricName); - metricEntity.setValue(new double[]{value}); + metricEntity.setValue(new double[] {value}); return metricEntity; } @@ -59,12 +55,12 @@ public class EntityBuilderHelper { String metricName = String.format(TopologyConstants.METRIC_LIVE_RATIO_NAME_FORMAT, role); return EntityBuilderHelper.metricWrapper(timestamp, metricName, value, tags); } - - public static String getValidHostName(String key){ - if(StringUtils.isBlank(key)){ - throw new IllegalArgumentException("key can not be empty"); - } - return key.indexOf(TopologyConstants.COLON) > 0 ? key.substring(0,key.indexOf(TopologyConstants.COLON)) : key; + + public static String getValidHostName(String key) { + if (StringUtils.isBlank(key)) { + throw new IllegalArgumentException("key can not be empty"); + } + return key.indexOf(TopologyConstants.COLON) > 0 ? key.substring(0, key.indexOf(TopologyConstants.COLON)) : key; } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java index fa4c71f..bca8485 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/JMXQueryHelper.java @@ -34,7 +34,7 @@ import java.util.HashMap; import java.util.Map; /** - * Helper class to query Hadoop JMX servlets + * Helper class to query Hadoop JMX servlets. */ public final class JMXQueryHelper { @@ -66,13 +66,13 @@ public final class JMXQueryHelper { final JSONArray jsonArray = jsonBeansObject.getJSONArray("beans"); int size = jsonArray.length(); for (int i = 0; i < size; ++i) { - final JSONObject obj = (JSONObject)jsonArray.get(i); + final JSONObject obj = (JSONObject) jsonArray.get(i); final JMXBean bean = new JMXBean(); final Map<String, Object> map = new HashMap<String, Object>(); bean.setPropertyMap(map); final JSONArray names = obj.names(); int jsonSize = names.length(); - for (int j = 0 ; j < jsonSize; ++j) { + for (int j = 0; j < jsonSize; ++j) { final String key = names.getString(j); Object value = obj.get(key); map.put(key, value); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java index 48c9133..3204ca3 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/ServiceNotResponseException.java @@ -25,14 +25,14 @@ public class ServiceNotResponseException extends IOException { private static final long serialVersionUID = -2425311876734366496L; /** - * Default constructor of FeederException + * Default constructor of FeederException. */ public ServiceNotResponseException() { super(); } /** - * Constructor of FeederException + * Constructor of FeederException. * * @param message error message */ @@ -41,18 +41,17 @@ public class ServiceNotResponseException extends IOException { } /** - * Constructor of FeederException + * Constructor of FeederException. * * @param message error message - * @param cause the cause of the exception - * + * @param cause the cause of the exception */ public ServiceNotResponseException(String message, Throwable cause) { super(message, cause); } /** - * Constructor of FeederException + * Constructor of FeederException. * * @param cause the cause of the exception */ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java new file mode 100644 index 0000000..3b47f84 --- /dev/null +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/utils/StringUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.topology.utils; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +public final class StringUtils { + + public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + private StringUtils() { + + } + + public static String convertMapToString(Map<String, String> tags) { + StringBuilder tagBuilder = new StringBuilder(); + Iterator<Entry<String, String>> iter = tags.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry<String, String> entry = (Map.Entry<String, String>) iter.next(); + tagBuilder.append(entry.getKey() + ":" + entry.getValue()); + if (iter.hasNext()) { + tagBuilder.append(","); + } + } + return tagBuilder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml index ed31a8f..8dbf79e 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml @@ -22,128 +22,166 @@ <name>Topology Health Check</name> <version>0.5.0-incubating</version> <configuration> - <!-- org.apache.eagle.topology.TopologyCheckApp --> + <!-- org.apache.eagle.topology.TopologyCheckApp --> <property> <name>dataExtractorConfig.site</name> <displayName>site</displayName> <description>Site</description> <value>sandbox</value> - </property> + </property> <property> <name>dataExtractorConfig.fetchDataIntervalInSecs</name> - <displayName>FetchDataIntervalInSecs</displayName> + <displayName>Fetch Data Interval in Secs</displayName> <description>Fetch Data Interval in Secs</description> <value>300</value> </property> <property> <name>dataExtractorConfig.parseThreadPoolSize</name> - <displayName>parseThreadPoolSize</displayName> + <displayName>Parser Thread Pool Size</displayName> <description>Parser Thread Pool Size</description> <value>5</value> </property> <property> <name>dataExtractorConfig.numDataFetcherSpout</name> - <displayName>numDataFetcherSpout</displayName> + <displayName>Spout Task Number</displayName> <description>Spout Task Number</description> <value>1</value> </property> <property> <name>dataExtractorConfig.numEntityPersistBolt</name> - <displayName>numEntityPersistBolt</displayName> + <displayName>Bolt Task Number</displayName> <description>Bolt Task Number</description> <value>1</value> - </property> + </property> + + <property> + <name>dataExtractorConfig.rackResolverCls</name> + <displayName>Rack Resolver Class</displayName> + <description>rack resolver class</description> + <value>org.apache.eagle.topology.resolver.impl.DefaultTopologyRackResolver</value> + </property> + <property> <name>dataSourceConfig.hbase.zkQuorum</name> - <displayName>zkQuorum</displayName> - <description>Zookeeper Quorum</description> + <displayName>HBase Zookeeper Quorum</displayName> + <description>HBase Zookeeper Quorum</description> <value>sandbox.hortonworks.com:2181</value> + <required>true</required> </property> <property> <name>dataSourceConfig.hbase.zkZnodeParent</name> - <displayName>zkZnodeParent</displayName> + <displayName>Hbase Zookeeper Znode Parent Root</displayName> <description>Hbase Zookeeper Znode Parent Root</description> <value>/hbase-unsecure</value> + <required>true</required> </property> <property> <name>dataSourceConfig.hbase.zkPropertyClientPort</name> - <displayName>zkPropertyClientPort</displayName> + <displayName>Hbase Zookeeper Client Port</displayName> <description>Hbase Zookeeper Client Port</description> <value>2181</value> + <required>true</required> </property> <property> <name>dataSourceConfig.hbase.kerberos.master.principal</name> - <displayName>hbaseMasterPrincipal</displayName> + <displayName>Hbase Master Principal</displayName> <description>Hbase Master Principal</description> <value>hadoop/_h...@example.com</value> </property> - <property> - <name>dataSourceConfig.hbase.kerberos.eagle.keytab</name> - <displayName>eagleKeytab</displayName> - <description>Eagle keytab</description> - <value></value> - </property> + <property> <name>dataSourceConfig.hbase.kerberos.master.principal</name> - <displayName>hbaseMasterPrincipal</displayName> + <displayName>Hbase Master Principal</displayName> <description>Hbase Master Principal</description> <value>hadoop/_h...@example.com</value> - </property> - + </property> + <property> <name>dataSourceConfig.hdfs.namenodeUrl</name> - <displayName>hdfsNamenodeUrl</displayName> + <displayName>Hdfs Namenode Web URL</displayName> <description>Hdfs Namenode Web URL</description> <value>http://sandbox.hortonworks.com:50070</value> + <required>true</required> </property> - <property> <name>dataSourceConfig.mr.rmUrl</name> - <displayName>resourceManagerUrl</displayName> + <displayName>Resource Manager URL</displayName> <description>Resource Manager URL</description> <value>http://sandbox.hortonworks.com:8088</value> + <required>true</required> </property> <property> <name>dataSourceConfig.mr.historyServerUrl</name> - <displayName>historyServerUrl</displayName> - <description>History Server URL</description> - <value></value> - </property> - - <property> - <name>eagleProps.eagleService.host</name> - <description>eagleProps.eagleService.host</description> - <value>localhost</value> - </property> - <property> - <name>eagleProps.eagleService.port</name> - <description>eagleProps.eagleService.port</description> - <value>9090</value> + <displayName>History Server URL</displayName> + <description>History Server URL</description> + <value></value> + </property> + + <property> + <name>topology.numOfSinkTasks</name> + <displayName>topology.numOfSinkTasks</displayName> + <value>2</value> + <description>number of sink tasks</description> </property> + + <!-- data sink configurations --> <property> - <name>eagleProps.eagleService.username</name> - <description>eagleProps.eagleService.username</description> - <value>admin</value> + <name>dataSinkConfig.topic</name> + <displayName>Topic For Kafka Data Sink</displayName> + <value>topology_health_check</value> + <description>Topic For Kafka Data Sink</description> </property> <property> - <name>eagleProps.eagleService.password</name> - <description>eagleProps.eagleService.password</description> - <value>secret</value> + <name>dataSinkConfig.brokerList</name> + <displayName>Kafka Broker List</displayName> + <value>server.eagle.apache.org:9092</value> + <description>Kafka Broker List</description> </property> <property> - <name>eagleProps.eagleService.basePath</name> - <description>eagleProps.eagleService.basePath</description> - <value>/rest</value> + <name>dataSinkConfig.serializerClass</name> + <displayName>Serializer Class Kafka Message Value</displayName> + <value>kafka.serializer.StringEncoder</value> + <description>Serializer Class Kafka Message Value</description> </property> <property> - <name>eagleProps.eagleService.readTimeOutSeconds</name> - <displayName>eagleProps.eagleService.readTimeOutSeconds</displayName> - <description>The maximum amount of time (in seconds) the app is trying to read from eagle service</description> - <value>2</value> + <name>dataSinkConfig.keySerializerClass</name> + <displayName>Serializer Class Kafka Message Key</displayName> + <value>kafka.serializer.StringEncoder</value> + <description>Serializer Class Kafka Message Key</description> </property> - + </configuration> + <streams> + <stream> + <streamId>topology_health_check_stream</streamId> + <description>topology health check Stream</description> + <validate>true</validate> + <timeseries>true</timeseries> + <columns> + <column> + <name>status</name> + <type>string</type> + </column> + <column> + <name>timestamp</name> + <type>long</type> + </column> + <column> + <name>host</name> + <type>string</type> + </column> + <column> + <name>site</name> + <type>string</type> + </column> + <column> + <name>role</name> + <type>string</type> + </column> + </columns> + </stream> + </streams> <docs> <install> </install> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf index cbc7ac1..f069df5 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf @@ -18,7 +18,11 @@ mode : "LOCAL", workers : 1, - dataExtractorConfig : { + topology : { + "numOfSinkTasks" : 2 + } + + dataExtractorConfig : { "site": "sandbox", "fetchDataIntervalInSecs": 300, "parseThreadPoolSize": 5, @@ -41,20 +45,23 @@ } }, mr: { - rmUrl: "http://sandbox.hortonworks.com:8088", + rmUrl: "http://sandbox.hortonworks.com:50030", historyServerUrl : "http://sandbox.hortonworks.com:19888" #if not need, then empty } } - - eagleProps : { - "mailHost" : "abc.com", - "mailDebug" : "true", - eagleService.host:"localhost", - eagleService.port: 9090, - eagleService.username: "admin", - eagleService.password : "secret", - eagleService.basePath : "/rest", - eagleService.readTimeOutSeconds : 20, - eagleService.maxFlushNum : 500 + + "dataSinkConfig": { + "topic" : "topology_health_check", + "brokerList" : "sandbox.hortonworks.com:6667", + "serializerClass" : "kafka.serializer.StringEncoder", + "keySerializerClass" : "kafka.serializer.StringEncoder" + } + + "service": { + "host": "localhost", + "port": 9090, + "username": "admin", + "password": "secret", + "readTimeOutSeconds" : 10, } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java index 24761dc..d91ff5c 100644 --- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java +++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HBaseServiceTopologyAPIEntity.java @@ -22,13 +22,13 @@ import org.apache.eagle.log.entity.meta.*; import org.apache.eagle.topology.TopologyConstants; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include= JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("hadoop_topology") @ColumnFamily("f") @Prefix("hbaseservicestatus") @Service(TopologyConstants.HBASE_INSTANCE_SERVICE_NAME) @TimeSeries(false) -public class HBaseServiceTopologyAPIEntity extends TopologyBaseAPIEntity { +public class HBaseServiceTopologyAPIEntity extends TopologyBaseAPIEntity { @Column("a") private String status; @Column("b") http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java index 0ef6d0a..baca97b 100644 --- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java +++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HdfsServiceTopologyAPIEntity.java @@ -22,7 +22,7 @@ import org.apache.eagle.log.entity.meta.*; import org.apache.eagle.topology.TopologyConstants; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("hadoop_topology") @ColumnFamily("f") @Prefix("hdfsservicestatus") @@ -56,34 +56,43 @@ public class HdfsServiceTopologyAPIEntity extends TopologyBaseAPIEntity { public String getNumFailedVolumes() { return numFailedVolumes; } + public void setNumFailedVolumes(String numFailedVolumes) { this.numFailedVolumes = numFailedVolumes; valueChanged("numFailedVolumes"); } + public String getNumBlocks() { return numBlocks; } + public void setNumBlocks(String numBlocks) { this.numBlocks = numBlocks; valueChanged("numBlocks"); } + public String getStatus() { return status; } + public void setStatus(String status) { this.status = status; valueChanged("status"); } + public String getConfiguredCapacityTB() { return configuredCapacityTB; } + public void setConfiguredCapacityTB(String configuredCapacityTB) { this.configuredCapacityTB = configuredCapacityTB; valueChanged("configuredCapacityTB"); } + public String getUsedCapacityTB() { return usedCapacityTB; } + public void setUsedCapacityTB(String usedCapacityTB) { this.usedCapacityTB = usedCapacityTB; valueChanged("usedCapacityTB"); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HealthCheckParseAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HealthCheckParseAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HealthCheckParseAPIEntity.java new file mode 100644 index 0000000..d78fea6 --- /dev/null +++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/HealthCheckParseAPIEntity.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.eagle.topology.entity; + +public class HealthCheckParseAPIEntity { + + private String status; + private long timeStamp; + private String host; + private String role; + private String site; + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public long getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public String getRole() { + return role; + } + + public void setRole(String role) { + this.role = role; + } + + public String getSite() { + return site; + } + + public void setSite(String site) { + this.site = site; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java index f21ad7a..392b107 100644 --- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java +++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/JournalNodeServiceAPIEntity.java @@ -22,13 +22,13 @@ import org.apache.eagle.log.entity.meta.*; import org.apache.eagle.topology.TopologyConstants; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("hadoop_topology") @ColumnFamily("f") @Prefix("journalnodestatus") @Service(TopologyConstants.JN_INSTANCE_SERVICE_NAME) @TimeSeries(false) -@Tags({TopologyConstants.SITE_TAG, TopologyConstants.HOSTNAME_TAG, TopologyConstants.RACK_TAG, TopologyConstants.ROLE_TAG}) +@Tags( {TopologyConstants.SITE_TAG, TopologyConstants.HOSTNAME_TAG, TopologyConstants.RACK_TAG, TopologyConstants.ROLE_TAG}) public class JournalNodeServiceAPIEntity extends TopologyBaseAPIEntity { @Column("a") private long writtenTxidDiff; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java index 75dfbfb..82e57fd 100644 --- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java +++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/MRServiceTopologyAPIEntity.java @@ -22,13 +22,13 @@ import org.apache.eagle.log.entity.meta.*; import org.apache.eagle.topology.TopologyConstants; import org.codehaus.jackson.map.annotate.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("hadoop_topology") @ColumnFamily("f") @Prefix("mrservicestatus") @Service(TopologyConstants.MR_INSTANCE_SERVICE_NAME) @TimeSeries(false) -public class MRServiceTopologyAPIEntity extends TopologyBaseAPIEntity { +public class MRServiceTopologyAPIEntity extends TopologyBaseAPIEntity { @Column("a") private String status; @Column("b") @@ -52,27 +52,34 @@ public class MRServiceTopologyAPIEntity extends TopologyBaseAPIEntity { public String getStatus() { return status; } + public void setStatus(String status) { this.status = status; valueChanged("status"); } + public String getNumConfiguredMapSlots() { return numConfiguredMapSlots; } + public void setNumConfiguredMapSlots(String numConfiguredMapSlots) { this.numConfiguredMapSlots = numConfiguredMapSlots; valueChanged("numConfiguredMapSlots"); } + public String getNumConfiguredReduceSlots() { return numConfiguredReduceSlots; } + public void setNumConfiguredReduceSlots(String numConfiguredReduceSlots) { this.numConfiguredReduceSlots = numConfiguredReduceSlots; valueChanged("numConfiguredReduceSlots"); } + public String getHealthReport() { return healthReport; } + public void setHealthReport(String healthReport) { this.healthReport = healthReport; valueChanged("healthReport"); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java index e4de886..f064edf 100644 --- a/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java +++ b/eagle-topology-check/eagle-topology-entity/src/main/java/org/apache/eagle/topology/entity/TopologyEntityRepository.java @@ -26,5 +26,5 @@ public class TopologyEntityRepository extends EntityRepository { entitySet.add(HdfsServiceTopologyAPIEntity.class); entitySet.add(MRServiceTopologyAPIEntity.class); entitySet.add(JournalNodeServiceAPIEntity.class); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f108c7f4/eagle-topology-check/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-topology-check/pom.xml b/eagle-topology-check/pom.xml index b26e32b..7d68f73 100644 --- a/eagle-topology-check/pom.xml +++ b/eagle-topology-check/pom.xml @@ -51,4 +51,16 @@ </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <failOnViolation>true</failOnViolation> + <failsOnError>true</failsOnError> + </configuration> + </plugin> + </plugins> + </build> </project> \ No newline at end of file