Repository: incubator-eagle Updated Branches: refs/heads/master dcfeae759 -> 2ac5985f9
EAGLE-140 Add eagle pipeline, single kafka monitor and hadoop metric monitor scripts https://issues.apache.org/jira/browse/EAGLE-140 Author: @hao <h...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/2ac5985f Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/2ac5985f Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/2ac5985f Branch: refs/heads/master Commit: 2ac5985f963ed06624bdfc679432f5e86e8efb25 Parents: dcfeae7 Author: Hao Chen <h...@apache.org> Authored: Fri Jan 29 11:30:22 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Fri Jan 29 11:30:22 2016 +0800 ---------------------------------------------------------------------- eagle-assembly/src/main/bin/eagle-ambari.sh | 0 eagle-assembly/src/main/bin/eagle-env.sh | 5 + eagle-assembly/src/main/bin/eagle-policy.sh | 0 .../src/main/bin/eagle-service-init.sh | 0 .../src/main/bin/eagle-topology-init.sh | 29 +++++ .../src/main/bin/hadoop-metric-monitor.sh | 48 ++++++++ .../main/bin/hbase-securitylog-schema-create.sh | 0 .../bin/hdfs-securitylog-metadata-create.sh | 0 .../src/main/bin/kafka-stream-monitor.sh | 61 ++++++++++ eagle-assembly/src/main/bin/pipeline-runner.sh | 52 +++++++++ eagle-assembly/src/main/conf/pipeline.conf | 40 +++++++ .../main/conf/sandbox-hadoopjmx-pipeline.conf | 49 +++++++++ .../main/conf/sandbox-hadoopjmx-topology.conf | 68 ++++++++++++ .../eagle-alert/eagle-alert-process/pom.xml | 6 +- .../eagle-stream-pipeline/pom.xml | 53 +++++---- .../pipeline/extension/ModuleManager.scala | 20 ++-- .../src/test/resources/application.conf | 2 +- .../src/test/resources/pipeline_4.conf | 2 +- .../src/test/resources/pipeline_5.conf | 110 +++++++++++++++++++ .../eagle/stream/pipeline/PipelineSpec.scala | 4 + .../storm/StormTopologyExecutorImpl.scala | 31 +++++- .../eagle-embed/eagle-embed-hbase/pom.xml | 8 +- .../eagle-embed/eagle-embed-server/pom.xml | 8 +- eagle-core/eagle-query/pom.xml | 4 - eagle-external/eagle-log4jkafka/pom.xml | 8 +- eagle-external/hadoop_jmx_collector/config.json | 2 +- .../hadoop_jmx_collector/util_func.py | 2 +- .../src/assembly/eagle-topology-assembly.xml | 2 + pom.xml | 23 ++-- 29 files changed, 565 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/eagle-ambari.sh ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/bin/eagle-ambari.sh b/eagle-assembly/src/main/bin/eagle-ambari.sh old mode 100644 new mode 100755 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/eagle-env.sh ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/bin/eagle-env.sh b/eagle-assembly/src/main/bin/eagle-env.sh index 30aff25..79ff5fa 100755 --- a/eagle-assembly/src/main/bin/eagle-env.sh +++ b/eagle-assembly/src/main/bin/eagle-env.sh @@ -37,3 +37,8 @@ export EAGLE_SERVICE_USER=admin # EAGLE_SERVICE_PASSWORD export EAGLE_SERVICE_PASSWD=secret +export EAGLE_CLASSPATH=$EAGLE_HOME/conf +# Add eagle shared library jars +for file in $EAGLE_HOME/lib/share/*;do + EAGLE_CLASSPATH=$EAGLE_CLASSPATH:$file +done http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/eagle-policy.sh ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/bin/eagle-policy.sh b/eagle-assembly/src/main/bin/eagle-policy.sh old mode 100644 new mode 100755 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/eagle-service-init.sh ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/bin/eagle-service-init.sh b/eagle-assembly/src/main/bin/eagle-service-init.sh old mode 100644 new mode 100755 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/eagle-topology-init.sh ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/bin/eagle-topology-init.sh b/eagle-assembly/src/main/bin/eagle-topology-init.sh old mode 100644 new mode 100755 index 88b756a..f30749f --- a/eagle-assembly/src/main/bin/eagle-topology-init.sh +++ b/eagle-assembly/src/main/bin/eagle-topology-init.sh @@ -90,6 +90,35 @@ echo "Importing AlertStreamService for USERPROFILE" curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H "Content-Type: application/json" "http://$EAGLE_SERVICE_HOST:$EAGLE_SERVICE_PORT/eagle-service/rest/entities?serviceName=AlertStreamService" \ -d '[ { "prefix": "alertStream", "tags": { "streamName": "userActivity", "site":"sandbox", "dataSource":"userProfile" }, "alertExecutorIdList": [ "userProfileAnomalyDetectionExecutor" ] } ]' +##################################################################### +# Import stream metadata for HADOOP METRIC +##################################################################### + +## AlertDataSource: data sources bound to sites +echo "Importing AlertDataSourceService for HADOOP METRIC ... " + +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"hadoop"}, "enabled": "true", "config" : "", "desc":"HADOOP"}]' + + +## AlertStreamService: alert streams generated from data source +echo "" +echo "Importing AlertStreamService for HADOOP METRIC ... " +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST \ +-H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \ +-d '[{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopJmxMetric"},"desc":"hadoop jmx metric stream"},{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopSysMetric"},"desc":"hadoop system metric stream"}]' + +## AlertExecutorService: what alert streams are consumed by alert executor +echo "" +echo "Importing AlertExecutorService for HADOOP METRIC ... " +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopJmxMetricExecutor","streamName":"hadoopJmxMetric"},"desc":"alert executor for hadoop jmx stream"}]' +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopSysMetricExecutor","streamName":"hadoopSysMetric"},"desc":"alert executor for hadoop system stream"}]' + +## AlertStreamSchemaService: schema for event from alert stream +echo "" +echo "Importing AlertStreamSchemaService for HADOOP METRIC ... " +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopJmxMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopJmxMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":" service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]' +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopSysMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopSysMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":" service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]' + ## Finished echo "" echo "Finished initialization for eagle topology" http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh b/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh new file mode 100644 index 0000000..706154a --- /dev/null +++ b/eagle-assembly/src/main/bin/hadoop-metric-monitor.sh @@ -0,0 +1,48 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +source $(dirname $0)/eagle-env.sh + +##################################################################### +# Import stream metadata for HADOOP METRIC +##################################################################### +## AlertDataSource: data sources bound to sites +echo "Importing AlertDataSourceService for HADOOP METRIC ... " + +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"hadoop"}, "enabled": "true", "config" : "", "desc":"HADOOP"}]' + + +## AlertStreamService: alert streams generated from data source +echo "" +echo "Importing AlertStreamService for HADOOP METRIC ... " +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST \ +-H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \ +-d '[{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopJmxMetric"},"desc":"hadoop jmx metric stream"},{"prefix":"alertStream","tags":{"dataSource":"hadoop","streamName":"hadoopSysMetric"},"desc":"hadoop system metric stream"}]' + +## AlertExecutorService: what alert streams are consumed by alert executor +echo "" +echo "Importing AlertExecutorService for HADOOP METRIC ... " +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopJmxMetricExecutor","streamName":"hadoopJmxMetric"},"desc":"alert executor for hadoop jmx stream"}]' +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" -d '[{"prefix":"alertExecutor","tags":{"dataSource":"hadoop","alertExecutorId":"hadoopSysMetricExecutor","streamName":"hadoopSysMetric"},"desc":"alert executor for hadoop system stream"}]' + +## AlertStreamSchemaService: schema for event from alert stream +echo "" +echo "Importing AlertStreamSchemaService for HADOOP METRIC ... " +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopJmxMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopJmxMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":" service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopJmxMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]' +curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"tags":{"dataSource":"hadoop","attrName":"host","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric host","attrDisplayName":"host"},{"tags":{"dataSource":"hadoop","attrName":"site","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric site","attrDisplayName":"site"},{"tags":{"dataSource":"hadoop","attrName":"timestamp","streamName":"hadoopSysMetric"},"attrType":"long","attrDescription":"metric timestamp","attrDisplayName":"timestamp"},{"tags":{"dataSource":"hadoop","attrName":"value","streamName":"hadoopSysMetric"},"attrType":"double","attrDescription":"metric value","attrDisplayName":"value"},{"tags":{"dataSource":"hadoop","attrName":"component","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":" service component","attrDisplayName":"component"},{"tags":{"dataSource":"hadoop","attrName":"metric","streamName":"hadoopSysMetric"},"attrType":"string","attrDescription":"metric name","attrDisplayName":"metric"}]' + +$EAGLE_HOME/kafka-stream-monitor.sh hadoopJmxMetric hadoopJmxMetricExecutor $EAGLE_HOME/conf/sandbox-hadoopjmx-topology.conf \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/hbase-securitylog-schema-create.sh ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/bin/hbase-securitylog-schema-create.sh b/eagle-assembly/src/main/bin/hbase-securitylog-schema-create.sh old mode 100644 new mode 100755 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/hdfs-securitylog-metadata-create.sh ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/bin/hdfs-securitylog-metadata-create.sh b/eagle-assembly/src/main/bin/hdfs-securitylog-metadata-create.sh old mode 100644 new mode 100755 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/kafka-stream-monitor.sh ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/bin/kafka-stream-monitor.sh b/eagle-assembly/src/main/bin/kafka-stream-monitor.sh new file mode 100644 index 0000000..bf9181b --- /dev/null +++ b/eagle-assembly/src/main/bin/kafka-stream-monitor.sh @@ -0,0 +1,61 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +source $(dirname $0)/eagle-env.sh + +if [ -e ${EAGLE_HOME}/lib/topology/eagle-topology-*-assembly.jar ];then + export jar_name=$(ls ${EAGLE_HOME}/lib/topology/eagle-topology-*-assembly.jar) +else + echo "ERROR: ${EAGLE_HOME}/lib/topology/eagle-topology-*-assembly.jar not found" + exit 1 +fi + +export main_class=org.apache.eagle.datastream.storm.KafkaStreamMonitor + +export alert_stream=$1 +export alert_executor=$2 +export config_file=$3 + +if [ "$#" -lt "2" ];then + echo "ERROR: parameter required" + echo "Usage: kafka-stream-monitor.sh [alert_stream alert_executor config_file] or [alert_stream config_file]" + echo "" + exit 1 +fi +if [ "$#" == "2" ];then + config_file=$2 + alert_executor="${alert_stream}Executor" +fi + +which storm >/dev/null 2>&1 +if [ $? != 0 ];then + echo "ERROR: storm not found" + exit 1 +else + export EAGLE_CLASSPATH=$EAGLE_CLASSPATH:$jar_name:`storm classpath` +fi + +cmd="java -cp $EAGLE_CLASSPATH $main_class -D eagle.stream.name=$alert_stream -D eagle.stream.executor=$alert_executor -D config.file=$config_file -D envContextConfig.jarFile=$jar_name" + +echo "==========" +echo "Alert Stream: $alert_stream" +echo "Alert Executor: $alert_executor" +echo "Alert Config: $config_file" +echo "==========" +echo "Run Cmd: $cmd" +echo $cmd +$cmd \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/bin/pipeline-runner.sh ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/bin/pipeline-runner.sh b/eagle-assembly/src/main/bin/pipeline-runner.sh new file mode 100755 index 0000000..65bf3f3 --- /dev/null +++ b/eagle-assembly/src/main/bin/pipeline-runner.sh @@ -0,0 +1,52 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +source $(dirname $0)/eagle-env.sh + +if [ -e ${EAGLE_HOME}/lib/topology/eagle-topology-*-assembly.jar ];then + export jar_name=$(ls ${EAGLE_HOME}/lib/topology/eagle-topology-*-assembly.jar) +else + echo "ERROR: ${EAGLE_HOME}/lib/topology/eagle-topology-*-assembly.jar not found" + exit 1 +fi +if [ -e ${EAGLE_HOME}/conf/pipeline.conf ];then + export common_conf=$(ls ${EAGLE_HOME}/conf/pipeline.conf) +else + echo "WARN: ${EAGLE_HOME}/conf/pipeline.conf not found" +fi + +main_class=org.apache.eagle.stream.pipeline.Pipeline + +if [ "$#" == "0" ];then + echo "ERROR: parameter required" + echo "Usage: $0 [pipeline-config]" + + echo "" + exit 1 +fi + +which storm >/dev/null 2>&1 +if [ $? != 0 ];then + echo "ERROR: storm not found" + exit 1 +else + export EAGLE_CLASSPATH=$EAGLE_CLASSPATH:$jar_name:`storm classpath` +fi + +cmd="java -cp $EAGLE_CLASSPATH $main_class --conf $common_conf --pipeline $1 -D envContextConfig.jarFile=$jar_name" +echo $cmd +$cmd \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/conf/pipeline.conf ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/conf/pipeline.conf b/eagle-assembly/src/main/conf/pipeline.conf new file mode 100644 index 0000000..c1c7758 --- /dev/null +++ b/eagle-assembly/src/main/conf/pipeline.conf @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing software +# distributed under the License is distributed on an "AS IS" BASIS +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + "envContextConfig": { + "env" : "storm" + "mode" : "cluster" + "nimbusHost":"sandbox.hortonworks.com", + "nimbusThriftPort":6627 + } + "eagleProps" : { + "dataJoinPollIntervalSec" : 30 + "mailHost" : "smtp.server.host" + "mailSmtpPort":"25" + "mailDebug" : "true" + "eagleService": { + "host": "localhost" + "port": 9099 + "username": "admin" + "password": "secret" + } + } + "dynamicConfigSource" : { + "enabled" : true + "initDelayMillis" : 0 + "delayMillis" : 30000 + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/conf/sandbox-hadoopjmx-pipeline.conf ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/conf/sandbox-hadoopjmx-pipeline.conf b/eagle-assembly/src/main/conf/sandbox-hadoopjmx-pipeline.conf new file mode 100644 index 0000000..2a75c59 --- /dev/null +++ b/eagle-assembly/src/main/conf/sandbox-hadoopjmx-pipeline.conf @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing software +# distributed under the License is distributed on an "AS IS" BASIS +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + config { + envContextConfig { + "topologyName" : "sandbox-hadoopjmx-pipeline" + } + eagleProps { + "site" : "sandbox" + "dataSource": "HADOOP" + } + } + + dataflow { + KafkaSource.hadoopNNJmxStream { + parallism = 1000 + topic = "nn_jmx_metric_sandbox" + zkConnection = "sandbox.hortonworks.com:2181" + zkConnectionTimeoutMS = 15000 + consumerGroupId = "Consumer" + fetchSize = 1048586 + transactionZKServers = "sandbox.hortonworks.com" + transactionZKPort = 2181 + transactionZKRoot = "/consumers" + transactionStateUpdateMS = 2000 + deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" + } + + Alert.hadoopNNJmxStreamAlertExecutor { + upStreamNames = [hadoopNNJmxStream] + alertExecutorId = hadoopNNJmxStreamAlertExecutor + } + + hadoopNNJmxStream -> hadoopNNJmxStreamAlertExecutor{} + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-assembly/src/main/conf/sandbox-hadoopjmx-topology.conf ---------------------------------------------------------------------- diff --git a/eagle-assembly/src/main/conf/sandbox-hadoopjmx-topology.conf b/eagle-assembly/src/main/conf/sandbox-hadoopjmx-topology.conf new file mode 100644 index 0000000..4787de2 --- /dev/null +++ b/eagle-assembly/src/main/conf/sandbox-hadoopjmx-topology.conf @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + "envContextConfig" : { + "env" : "storm", + "mode" : "cluster", + "topologyName" : "sandbox-hadoopjmx-topology", + "stormConfigFile" : "security-auditlog-storm.yaml", + "parallelismConfig" : { + "kafkaMsgConsumer" : 1, + "hadoopJmxMetricExecutor*" : 1 + }, + "nimbusHost":"sandbox.hortonworks.com", + "nimbusThriftPort":6627 + }, + "dataSourceConfig": { + "topic" : "nn_jmx_metric_sandbox", + "zkConnection" : "sandbox.hortonworks.com:2181", + "zkConnectionTimeoutMS" : 15000, + "consumerGroupId" : "EagleConsumer", + "fetchSize" : 1048586, + "deserializerClass" : "org.apache.eagle.security.hbase.parse.HbaseAuditLogKafkaDeserializer", + "transactionZKServers" : "sandbox.hortonworks.com", + "transactionZKPort" : 2181, + "transactionZKRoot" : "/consumers", + "consumerGroupId" : "eagle.hadoop.consumer", + "transactionStateUpdateMS" : 2000 + }, + "alertExecutorConfigs" : { + "hadoopJmxMetricExecutor" : { + "parallelism" : 1, + "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" + "needValidation" : "true" + } + }, + "eagleProps" : { + "site" : "sandbox", + "dataSource": "hadoop", + "dataJoinPollIntervalSec" : 30, + "mailHost" : "mailHost.com", + "mailSmtpPort":"25", + "mailDebug" : "true", + "eagleService": { + "host": "localhost", + "port": 9099 + "username": "admin", + "password": "secret" + } + }, + "dynamicConfigSource" : { + "enabled" : true, + "initDelayMillis" : 0, + "delayMillis" : 30000 + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-alert/eagle-alert-process/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/pom.xml b/eagle-core/eagle-alert/eagle-alert-process/pom.xml index 4161904..012cc64 100644 --- a/eagle-core/eagle-alert/eagle-alert-process/pom.xml +++ b/eagle-core/eagle-alert/eagle-alert-process/pom.xml @@ -95,7 +95,7 @@ <artifactId>jdk.tools</artifactId> </exclusion> </exclusions> - </dependency> + </dependency> <dependency> <groupId>org.wso2.siddhi</groupId> <artifactId>siddhi-extension-string</artifactId> @@ -103,10 +103,6 @@ <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml index e8965f5..0c2732b 100644 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml +++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml @@ -26,10 +26,10 @@ <modelVersion>4.0.0</modelVersion> <artifactId>eagle-stream-pipeline</artifactId> <dependencies> - <dependency> - <groupId>org.reflections</groupId> - <artifactId>reflections</artifactId> - </dependency> + <!--<dependency>--> + <!--<groupId>org.reflections</groupId>--> + <!--<artifactId>reflections</artifactId>--> + <!--</dependency>--> <dependency> <groupId>eagle</groupId> <artifactId>eagle-service-base</artifactId> @@ -59,6 +59,10 @@ <artifactId>servlet-api</artifactId> <groupId>javax.servlet</groupId> </exclusion> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> </exclusions> </dependency> <dependency> @@ -67,14 +71,19 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - <version>${scala.version}.0</version> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> </dependency> + <!--<dependency>--> + <!--<groupId>org.scala-lang</groupId>--> + <!--<artifactId>scala-reflect</artifactId>--> + <!--</dependency>--> + <!--<dependency>--> + <!--<groupId>org.scala-lang</groupId>--> + <!--<artifactId>scala-compiler</artifactId>--> + <!--<version>${scala.version}.0</version>--> + <!--</dependency>--> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> @@ -85,22 +94,12 @@ <artifactId>eagle-stream-process-api</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>com.typesafe.akka</groupId> - <artifactId>akka-actor_${scala.version}</artifactId> - <version>${akka.actor.version}</version> - </dependency> - <dependency> - <groupId>com.typesafe.akka</groupId> - <artifactId>akka-testkit_${scala.version}</artifactId> - <version>${akka.actor.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <scope>compile</scope> - </dependency> + <!--<dependency>--> + <!--<groupId>com.typesafe.akka</groupId>--> + <!--<artifactId>akka-testkit_${scala.version}</artifactId>--> + <!--<version>${akka.actor.version}</version>--> + <!--<scope>test</scope>--> + <!--</dependency>--> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala index 9c3e9d5..2174560 100644 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala +++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala @@ -9,7 +9,7 @@ import org.apache.eagle.datastream.core._ import org.apache.eagle.partition.PartitionStrategy import org.apache.eagle.stream.pipeline.parser.Processor import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata} -import org.slf4j.LoggerFactory +//import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ @@ -106,11 +106,16 @@ object AggregatorProducer extends ModuleMapper{ } } +object KafkaSinkExecutor{ +// val LOG = LoggerFactory.getLogger(classOf[KafkaSinkExecutor]) +} + /** * @todo currently support single topic now, should support topic selector * @param config */ case class KafkaSinkExecutor(config:Map[String,AnyRef]) extends ((AnyRef) => Unit) with Serializable{ + val TOPIC_KEY = "topic" def getDefaultProps = { val props = new Properties() @@ -132,19 +137,17 @@ case class KafkaSinkExecutor(config:Map[String,AnyRef]) extends ((AnyRef) => Uni @transient var topic:String = null @transient var timeoutMs:Long = 3000 - val LOG = LoggerFactory.getLogger(classOf[KafkaSinkExecutor]) - private def init():Unit = { if(this.initialized != null && this.initialized.get()){ - LOG.info("Already initialized, skip") +// LOG.info("Already initialized, skip") return } this.initialized = new AtomicBoolean(false) if (producer != null) { - LOG.info(s"Closing $producer") +// LOG.info(s"Closing $producer") producer.close() } - LOG.info("Initializing and creating Kafka Producer") +// LOG.info("Initializing and creating Kafka Producer") if (config.contains(TOPIC_KEY)) { this.topic = config.get(TOPIC_KEY).get.asInstanceOf[String] } else { @@ -153,7 +156,7 @@ case class KafkaSinkExecutor(config:Map[String,AnyRef]) extends ((AnyRef) => Uni val props = getDefaultProps props.putAll((config - TOPIC_KEY).asJava) producer = new KafkaProducer[String, AnyRef](props) - LOG.info(s"Created new KafkaProducer: $producer") +// LOG.info(s"Created new KafkaProducer: $producer") initialized.set(true) } @@ -174,7 +177,8 @@ case class KafkaSinkExecutor(config:Map[String,AnyRef]) extends ((AnyRef) => Uni producer.send(record,new Callback(){ override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = { if(exception!=null){ - LOG.error(s"Failed to send record $value to topic: $topic",exception) +// LOG.error(s"Failed to send record $value to topic: $topic",exception) + throw new IllegalStateException(s"Failed to send record $value to topic: $topic",exception) } } }) http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf index d285a6f..3e8f69c 100644 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf +++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf @@ -21,7 +21,7 @@ "mailDebug" : "true" "eagleService": { "host": "localhost" - "port": 38080 + "port": 9099 "username": "admin" "password": "secret" } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf index 609e905..9054f8f 100644 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf +++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf @@ -21,7 +21,7 @@ "topologyName" : "dsl-topology" "parallelismConfig" : { "kafkaMsgConsumer" : 1 - } + }, } alertExecutorConfigs { defaultAlertExecutor { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf new file mode 100644 index 0000000..d3e5f1d --- /dev/null +++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf @@ -0,0 +1,110 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing software +# distributed under the License is distributed on an "AS IS" BASIS +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + config { + envContextConfig { + "env" : "storm" + "mode" : "cluster" + "topologyName" : "dynamical-topology-5" + "parallelismConfig" : { + "kafkaMsgConsumer" : 1 + }, + "nimbusHost":"sandbox.hortonworks.com", + "nimbusThriftPort":6627 + } + alertExecutorConfigs { + defaultAlertExecutor { + "parallelism" : 1 + "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" + "needValidation" : "true" + } + } + eagleProps { + "site" : "sandbox" + "dataSource": "HADOOP" + } + } + + dataflow { + KafkaSource.JmxStreamOne { + parallism = 1000 + topic = "metric_event_1" + zkConnection = "sandbox.hortonworks.com:2181" + zkConnectionTimeoutMS = 15000 + consumerGroupId = "Consumer" + fetchSize = 1048586 + transactionZKServers = "sandbox.hortonworks.com" + transactionZKPort = 2181 + transactionZKRoot = "/consumers" + transactionStateUpdateMS = 2000 + deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" + } + + KafkaSource.JmxStreamTwo { + parallism = 1000 + topic = "metric_event_2" + zkConnection = "sandbox.hortonworks.com:2181" + zkConnectionTimeoutMS = 15000 + consumerGroupId = "Consumer" + fetchSize = 1048586 + transactionZKServers = "sandbox.hortonworks.com" + transactionZKPort = 2181 + transactionZKRoot = "/consumers" + transactionStateUpdateMS = 2000 + deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" + } + + KafkaSource.JmxStreamThree{ + parallism = 1000 + topic = "metric_event_3" + zkConnection = "sandbox.hortonworks.com:2181" + zkConnectionTimeoutMS = 15000 + consumerGroupId = "Consumer" + fetchSize = 1048586 + transactionZKServers = "sandbox.hortonworks.com" + transactionZKPort = 2181 + transactionZKRoot = "/consumers" + transactionStateUpdateMS = 2000 + deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" + } + + Console.printer { + format = "%s" + } + + KafkaSink.metricStore { + "topic" = "metric_event_persist" + "bootstrap.servers" = "sandbox.hortonworks.com:6667" + } + + Alert.defaultAlertExecutor { + // upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree] + // alertExecutorId = defaultAlertExecutor + } + + JmxStreamOne|JmxStreamTwo|JmxStreamThree -> defaultAlertExecutor { + grouping = shuffle + } + + JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore { + grouping = shuffle + } + + JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer { + grouping = shuffle + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala index c87475b..5e2007d 100644 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala +++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala @@ -52,6 +52,10 @@ object PipelineSpec_4 extends App { Pipeline(args).submit[storm]("pipeline_4.conf") } +object PipelineSpec_5 extends App { + Pipeline(args).submit[storm]("pipeline_5.conf") +} + object PipelineCLISpec extends App{ Pipeline.main(args) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala index 1f753d1..5d64c4c 100644 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala @@ -23,6 +23,7 @@ import backtype.storm.generated.StormTopology import backtype.storm.utils.Utils import backtype.storm.{Config, LocalCluster, StormSubmitter} import org.apache.eagle.datastream.core.AbstractTopologyExecutor +import org.apache.thrift7.transport.TTransportException import org.slf4j.LoggerFactory import org.yaml.snakeyaml.Yaml @@ -37,6 +38,7 @@ case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesa conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32)) conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384)) conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384)) + conf.put(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000)) if(config.hasPath("envContextConfig.stormConfigFile")) { val file = new File(config.getString("envContextConfig.stormConfigFile")) @@ -59,8 +61,32 @@ case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesa val topologyName = config.getString("envContextConfig.topologyName") if (!localMode) { + if(config.hasPath("envContextConfig.nimbusHost")) { + LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_HOST} as ${config.getString("envContextConfig.nimbusHost")}") + conf.put(backtype.storm.Config.NIMBUS_HOST, config.getString("envContextConfig.nimbusHost")) + } + + if(config.hasPath("envContextConfig.nimbusThriftPort")) { + LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_THRIFT_PORT} as ${config.getString("envContextConfig.nimbusThriftPort")}") + conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, config.getNumber("envContextConfig.nimbusThriftPort")) + } + + if(config.hasPath("envContextConfig.jarFile")){ + LOG.info(s"Setting storm.jar as ${config.getString("envContextConfig.jarFile")}") + System.setProperty("storm.jar",config.getString("envContextConfig.jarFile")) + } + LOG.info("Submitting as cluster mode") - StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology) + try { + StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology) + } catch { + case e:TTransportException => + LOG.error(s"Got thrift exception, type: ${e.getType}") + throw e + } + finally { + System.clearProperty("storm.jar") + } } else { LOG.info("Submitting as local mode") val cluster: LocalCluster = new LocalCluster @@ -68,8 +94,7 @@ case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesa while(true) { try { Utils.sleep(Integer.MAX_VALUE) - } - catch { + } catch { case _: Throwable => () // Do nothing } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-embed/eagle-embed-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/pom.xml b/eagle-core/eagle-embed/eagle-embed-hbase/pom.xml index 5325fe6..ab9a253 100644 --- a/eagle-core/eagle-embed/eagle-embed-hbase/pom.xml +++ b/eagle-core/eagle-embed/eagle-embed-hbase/pom.xml @@ -43,10 +43,10 @@ <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </dependency> + <!--<dependency>--> + <!--<groupId>org.slf4j</groupId>--> + <!--<artifactId>slf4j-log4j12</artifactId>--> + <!--</dependency>--> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-embed/eagle-embed-server/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-embed/eagle-embed-server/pom.xml b/eagle-core/eagle-embed/eagle-embed-server/pom.xml index e16f852..d5b5e50 100644 --- a/eagle-core/eagle-embed/eagle-embed-server/pom.xml +++ b/eagle-core/eagle-embed/eagle-embed-server/pom.xml @@ -57,10 +57,10 @@ <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </dependency> + <!--<dependency>--> + <!--<groupId>org.slf4j</groupId>--> + <!--<artifactId>slf4j-log4j12</artifactId>--> + <!--</dependency>--> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-core/eagle-query/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/pom.xml b/eagle-core/eagle-query/pom.xml index af9a26e..b6068ee 100644 --- a/eagle-core/eagle-query/pom.xml +++ b/eagle-core/eagle-query/pom.xml @@ -54,10 +54,6 @@ </dependency> <dependency> <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-external/eagle-log4jkafka/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-external/eagle-log4jkafka/pom.xml b/eagle-external/eagle-log4jkafka/pom.xml index 30567ac..2c4871b 100644 --- a/eagle-external/eagle-log4jkafka/pom.xml +++ b/eagle-external/eagle-log4jkafka/pom.xml @@ -34,10 +34,10 @@ <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </dependency> + <!--<dependency>--> + <!--<groupId>org.slf4j</groupId>--> + <!--<artifactId>slf4j-log4j12</artifactId>--> + <!--</dependency>--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-external/hadoop_jmx_collector/config.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/config.json b/eagle-external/hadoop_jmx_collector/config.json index 7ca926e..1f5cb7e 100644 --- a/eagle-external/hadoop_jmx_collector/config.json +++ b/eagle-external/hadoop_jmx_collector/config.json @@ -13,7 +13,7 @@ "output": { "kafka": { "topic": "nn_jmx_metric_sandbox", - "brokerList": ["localhost:9092"] + "brokerList": ["sandbox.hortonworks.com:6667"] } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-external/hadoop_jmx_collector/util_func.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/util_func.py b/eagle-external/hadoop_jmx_collector/util_func.py index bebc2ed..c02abbd 100644 --- a/eagle-external/hadoop_jmx_collector/util_func.py +++ b/eagle-external/hadoop_jmx_collector/util_func.py @@ -41,7 +41,7 @@ def kafka_close(kafka, producer): def send_output_message(producer, topic, kafka_dict, metric, value): kafka_dict["timestamp"] = int(round(time.time() * 1000)) kafka_dict["metric"] = metric.lower() - kafka_dict["value"] = str(value) + kafka_dict["value"] = float(str(value)) kafka_json = json.dumps(kafka_dict) if producer != None: http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml ---------------------------------------------------------------------- diff --git a/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml b/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml index e15e77d..e397efa 100644 --- a/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml +++ b/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml @@ -46,6 +46,8 @@ <exclude>log4j:log4j</exclude> <exclude>asm:asm</exclude> <exclude>org.apache.log4j.wso2:log4j</exclude> + <exclude>log4j:apache-log4j-extras</exclude> + <exclude>org.apache.zookeeper:zookeeper</exclude> </excludes> </dependencySet> </dependencySets> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2ac5985f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9d13f74..65b852a 100755 --- a/pom.xml +++ b/pom.xml @@ -193,9 +193,10 @@ <!-- Utility --> <joda-time.version>2.7</joda-time.version> <joda-convert.version>1.7</joda-convert.version> + <!--<log4j.version>1.2.17</log4j.version>--> <log4j.version>1.2.17</log4j.version> - <slf4j.version>1.7.5</slf4j.version> - <log4j-over-slf4j.version>1.7.2</log4j-over-slf4j.version> + <slf4j.version>1.6.5</slf4j.version> + <log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version> <quartz.version>2.2.1</quartz.version> <scopt.version>3.3.0</scopt.version> <akka.actor.version>2.3.14</akka.actor.version> @@ -335,17 +336,22 @@ <version>${slf4j.version}</version> <scope>compile</scope> </dependency> + <!--<dependency>--> + <!--<groupId>org.slf4j</groupId>--> + <!--<artifactId>slf4j-log4j12</artifactId>--> + <!--<version>${slf4j.version}</version>--> + <!--<scope>compile</scope>--> + <!--</dependency>--> <dependency> <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>${slf4j.version}</version> - <scope>compile</scope> + <artifactId>log4j-over-slf4j</artifactId> + <version>${log4j-over-slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> - <artifactId>log4j-over-slf4j</artifactId> + <artifactId>slf4j-simple</artifactId> <version>${slf4j.version}</version> - <scope>compile</scope> + <scope>test</scope> </dependency> <dependency> <groupId>log4j</groupId> @@ -910,8 +916,7 @@ <exclude>**/dev-supports/**/*.json</exclude> <exclude>**/dev-supports/**/useractivity-agg-json.txt</exclude> <exclude>**/conf/sandbox-userprofile-topology.conf</exclude> - <exclude>**/kafka-python/**</exclude> - <exclude>**/six/**</exclude> + <exclude>**/hadoop_jmx_collector/**</exclude> <!-- Fonts and Images --> <exclude>**/fonts/**</exclude> <exclude>**/images/**</exclude>