http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java index 7495ef5..40f08c1 100644 --- a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java +++ b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/provider/JDBCDataSourceProvider.java @@ -45,7 +45,7 @@ public class JDBCDataSourceProvider implements Provider<DataSource> { @Override public void run() { try { - LOGGER.info("Shutting down data source"); + LOGGER.info("Shutting down data fromStream"); datasource.close(); } catch (SQLException e) { LOGGER.error("SQLException: {}", e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java index 6ddf3c6..02bf1cc 100644 --- a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java +++ b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/ApplicationEntityServiceJDBCImpl.java @@ -210,10 +210,10 @@ public class ApplicationEntityServiceJDBCImpl implements ApplicationEntityServic ExecutionRuntime runtime = ExecutionRuntimeManager.getInstance().getRuntime( applicationProviderService.getApplicationProviderByType(entity.getDescriptor().getType()).getApplication().getEnvironmentType(), config); StreamSinkConfig streamSinkConfig = runtime.environment() - .streamSink().getSinkConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), effectiveConfig); + .stream().getSinkConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), effectiveConfig); StreamDesc streamDesc = new StreamDesc(); streamDesc.setSchema(copied); - streamDesc.setSink(streamSinkConfig); + streamDesc.setSinkConfig(streamSinkConfig); streamDesc.setStreamId(copied.getStreamId()); streamDesc.getSchema().setDataSource(entity.getAppId()); return streamDesc; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java index d06a3e4..fa59e8a 100644 --- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java +++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java @@ -25,7 +25,8 @@ import java.io.Serializable; /** * Some common codes to enable DAO through eagle service including service host/post, credential population etc. */ -public class EagleServiceConnector implements Serializable{ +@Deprecated +public class EagleServiceConnector implements Serializable { private final String eagleServiceHost; private final Integer eagleServicePort; private String username; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java index 912f1f7..f0b6283 100644 --- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java +++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceClientImpl.java @@ -16,6 +16,7 @@ */ package org.apache.eagle.service.client.impl; +import com.typesafe.config.Config; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.service.client.EagleServiceClientException; @@ -33,14 +34,24 @@ import java.util.Map; public class EagleServiceClientImpl extends EagleServiceBaseClient { private final static Logger LOG = LoggerFactory.getLogger(EagleServiceClientImpl.class); - public EagleServiceClientImpl(String host, int port){ + public EagleServiceClientImpl(String host, int port) { super(host, port); } - public EagleServiceClientImpl(EagleServiceConnector connector){ + @Deprecated + public EagleServiceClientImpl(EagleServiceConnector connector) { this(connector.getEagleServiceHost(), connector.getEagleServicePort(), connector.getUsername(), connector.getPassword()); } + public EagleServiceClientImpl (Config config) { + super( + config.hasPath("service.host") ? config.getString("service.host") : "localhost", + config.hasPath("service.port") ? config.getInt("service.port") : 9090, + config.hasPath("service.username") ? config.getString("service.username") : null, + config.hasPath("service.password") ? config.getString("service.password") : null + ); + } + public EagleServiceClientImpl(String host, int port, String username, String password){ super(host, port, username, password); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java ---------------------------------------------------------------------- diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java index c78cd3c..f1f4e58 100644 --- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java +++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java @@ -57,7 +57,7 @@ public class ExampleStormApplication extends StormApplication{ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value")); + outputFieldsDeclarer.declare(new Fields("metric","timestamp","fromStream","value")); } } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-examples/eagle-app-example/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-examples/eagle-app-example/src/test/resources/application.conf b/eagle-examples/eagle-app-example/src/test/resources/application.conf index de9be89..0c9ba51 100644 --- a/eagle-examples/eagle-app-example/src/test/resources/application.conf +++ b/eagle-examples/eagle-app-example/src/test/resources/application.conf @@ -44,7 +44,7 @@ }, "application": { "sink": { - "type": "org.apache.eagle.app.sink.KafkaStreamSink", + "type": "org.apache.eagle.app.messaging.KafkaStreamSink", "config": { "kafkaBrokerHost": "", "kafkaZkConnection": "" http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java index a3052d8..1453c3e 100644 --- a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java +++ b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java @@ -29,7 +29,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; -import org.apache.eagle.app.sink.StormStreamSink; +import org.apache.eagle.app.messaging.StormStreamSink; import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider; import org.apache.eagle.gc.executor.GCLogAnalyzerBolt; import org.apache.eagle.gc.executor.GCMetricGeneratorBolt; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/pom.xml b/eagle-hadoop-metric/pom.xml index ad1521f..a3d1cb0 100644 --- a/eagle-hadoop-metric/pom.xml +++ b/eagle-hadoop-metric/pom.xml @@ -31,6 +31,20 @@ <groupId>org.apache.eagle</groupId> <artifactId>eagle-app-base</artifactId> <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + </exclusions> </dependency> </dependencies> -</project> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java new file mode 100644 index 0000000..7f5e21b --- /dev/null +++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric; + +import backtype.storm.generated.StormTopology; +import com.typesafe.config.Config; +import org.apache.eagle.app.StormApplication; +import org.apache.eagle.app.environment.builder.CEPFunction; +import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.app.environment.builder.MetricDefinition; + +public class HadoopMetricMonitorApp extends StormApplication { + @Override + public StormTopology execute(Config config, StormEnvironment environment) { + return environment.newApp(config) + .fromStream("HADOOP_JMX_METRIC_STREAM") + .saveAsMetric(MetricDefinition + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host","component","site") + .valueField("value")) + .toTopology(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java index e6ebde1..dc7ea97 100644 --- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java +++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiver.java @@ -16,8 +16,14 @@ */ package org.apache.eagle.metric; -import org.apache.eagle.app.StaticApplicationProvider; +import org.apache.eagle.app.spi.AbstractApplicationProvider; -public class HadoopMetricMonitorAppProdiver extends StaticApplicationProvider { - // Metadata: META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml +/** + * Metadata: META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml. + */ +public class HadoopMetricMonitorAppProdiver extends AbstractApplicationProvider<HadoopMetricMonitorApp> { + @Override + public HadoopMetricMonitorApp getApplication() { + return new HadoopMetricMonitorApp(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml b/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml index 752c0cb..07270a5 100644 --- a/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml +++ b/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml @@ -21,7 +21,7 @@ <name>Hadoop JMX Metric Monitor</name> <version>0.5.0-incubating</version> <configuration> - <!-- data source configurations --> + <!-- data fromStream configurations --> <property> <name>dataSinkConfig.HADOOP_JMX_METRIC_STREAM.topic</name> <displayName>JMX Metric Kafka Topic</displayName> @@ -41,8 +41,6 @@ <stream> <streamId>HADOOP_JMX_METRIC_STREAM</streamId> <description>Hadoop JMX Metric Stream including name node, resource manager, etc.</description> - <validate>true</validate> - <timeseries>true</timeseries> <columns> <column> <name>host</name> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/main/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiverTest.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiverTest.java b/eagle-hadoop-metric/src/main/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiverTest.java deleted file mode 100644 index ee0b3c0..0000000 --- a/eagle-hadoop-metric/src/main/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProdiverTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.metric; - - -import com.google.inject.Inject; -import org.apache.eagle.app.resource.ApplicationResource; -import org.apache.eagle.app.service.ApplicationOperations; -import org.apache.eagle.app.test.ApplicationSimulator; -import org.apache.eagle.app.test.ApplicationTestBase; -import org.apache.eagle.metadata.model.ApplicationEntity; -import org.apache.eagle.metadata.model.SiteEntity; -import org.apache.eagle.metadata.resource.SiteResource; -import org.apache.eagle.metadata.service.ApplicationStatusUpdateService; -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -public class HadoopMetricMonitorAppProdiverTest extends ApplicationTestBase { - - @Inject - private SiteResource siteResource; - @Inject - private ApplicationResource applicationResource; - @Inject - private ApplicationSimulator simulator; - @Inject - ApplicationStatusUpdateService statusUpdateService; - - @Test - public void testApplicationLifecycle() throws InterruptedException { - // Create local site - SiteEntity siteEntity = new SiteEntity(); - siteEntity.setSiteId("test_site"); - siteEntity.setSiteName("Test Site"); - siteEntity.setDescription("Test Site for HADOOP_JMX_METRIC_MONITOR"); - siteResource.createSite(siteEntity); - Assert.assertNotNull(siteEntity.getUuid()); - - ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site", "HADOOP_JMX_METRIC_MONITOR", ApplicationEntity.Mode.LOCAL); - installOperation.setConfiguration(getConf()); - // Install application - ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData(); - // Uninstall application - applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid())); - try { - applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid()); - Assert.fail("Application instance (UUID: " + applicationEntity.getUuid() + ") should have been uninstalled"); - } catch (Exception ex) { - // Expected exception - } - } - - private Map<String, Object> getConf() { - Map<String, Object> conf = new HashMap<>(); - conf.put("dataSinkConfig.topic", "testTopic"); - conf.put("dataSinkConfig.brokerList", "broker"); - conf.put("dataSinkConfig.serializerClass", "serializerClass"); - conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass"); - conf.put("dataSinkConfig.producerType", "async"); - conf.put("dataSinkConfig.numBatchMessages", 4096); - conf.put("dataSinkConfig.maxQueueBufferMs", 5000); - conf.put("dataSinkConfig.requestRequiredAcks", 0); - conf.put("spoutNum", 2); - conf.put("mode", "LOCAL"); - return conf; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java new file mode 100644 index 0000000..03ba4ee --- /dev/null +++ b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric; + +public class HadoopMetricMonitorAppDebug { + public static void main(String[] args) { + new HadoopMetricMonitorApp().run(args); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProviderTest.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProviderTest.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProviderTest.java new file mode 100644 index 0000000..eb343d9 --- /dev/null +++ b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppProviderTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric; + + +import com.google.inject.Inject; +import org.apache.eagle.app.resource.ApplicationResource; +import org.apache.eagle.app.service.ApplicationOperations; +import org.apache.eagle.app.test.ApplicationSimulator; +import org.apache.eagle.app.test.ApplicationTestBase; +import org.apache.eagle.metadata.model.ApplicationEntity; +import org.apache.eagle.metadata.model.SiteEntity; +import org.apache.eagle.metadata.resource.SiteResource; +import org.apache.eagle.metadata.service.ApplicationStatusUpdateService; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class HadoopMetricMonitorAppProviderTest extends ApplicationTestBase { + + @Inject + private SiteResource siteResource; + @Inject + private ApplicationResource applicationResource; + @Inject + private ApplicationSimulator simulator; + @Inject + ApplicationStatusUpdateService statusUpdateService; + + @Test + public void testApplicationLifecycle() throws InterruptedException { + // Create local site + SiteEntity siteEntity = new SiteEntity(); + siteEntity.setSiteId("test_site"); + siteEntity.setSiteName("Test Site"); + siteEntity.setDescription("Test Site for HADOOP_JMX_METRIC_MONITOR"); + siteResource.createSite(siteEntity); + Assert.assertNotNull(siteEntity.getUuid()); + + ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site", "HADOOP_JMX_METRIC_MONITOR", ApplicationEntity.Mode.LOCAL); + installOperation.setConfiguration(getConf()); + // Install application + ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData(); + // Uninstall application + applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid())); + try { + applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid()); + Assert.fail("Application instance (UUID: " + applicationEntity.getUuid() + ") should have been uninstalled"); + } catch (Exception ex) { + // Expected exception + } + } + + private Map<String, Object> getConf() { + Map<String, Object> conf = new HashMap<>(); + conf.put("dataSinkConfig.topic", "testTopic"); + conf.put("dataSinkConfig.brokerList", "broker"); + conf.put("dataSinkConfig.serializerClass", "serializerClass"); + conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass"); + conf.put("dataSinkConfig.producerType", "async"); + conf.put("dataSinkConfig.numBatchMessages", 4096); + conf.put("dataSinkConfig.maxQueueBufferMs", 5000); + conf.put("dataSinkConfig.requestRequiredAcks", 0); + conf.put("spoutNum", 2); + conf.put("mode", "LOCAL"); + return conf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java new file mode 100644 index 0000000..67b94f8 --- /dev/null +++ b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric; + +import com.google.common.base.Preconditions; +import com.typesafe.config.ConfigFactory; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import org.apache.commons.io.IOUtils; +import org.apache.eagle.app.messaging.KafkaStreamProvider; +import org.apache.eagle.app.messaging.KafkaStreamSinkConfig; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.util.Properties; + +public class SendSampleDataToKafka { + public static void main(String[] args) throws URISyntaxException, IOException { + KafkaStreamSinkConfig config = new KafkaStreamProvider().getSinkConfig("HADOOP_JMX_METRIC_STREAM",ConfigFactory.load()); + Properties properties = new Properties(); + properties.put("metadata.broker.list", config.getBrokerList()); + properties.put("serializer.class", config.getSerializerClass()); + properties.put("key.serializer.class", config.getKeySerializerClass()); + // new added properties for async producer + properties.put("producer.type", config.getProducerType()); + properties.put("batch.num.messages", config.getNumBatchMessages()); + properties.put("request.required.acks", config.getRequestRequiredAcks()); + properties.put("queue.buffering.max.ms", config.getMaxQueueBufferMs()); + ProducerConfig producerConfig = new ProducerConfig(properties); + kafka.javaapi.producer.Producer producer = new kafka.javaapi.producer.Producer(producerConfig); + try { + InputStream is = SendSampleDataToKafka.class.getResourceAsStream("hadoop_jmx_metric_sample.json"); + Preconditions.checkNotNull(is, "hadoop_jmx_metric_sample.json"); + String value = IOUtils.toString(is); + producer.send(new KeyedMessage(config.getTopicId(), value)); + } finally { + producer.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/test/resources/application.conf b/eagle-hadoop-metric/src/test/resources/application.conf new file mode 100644 index 0000000..8ff6016 --- /dev/null +++ b/eagle-hadoop-metric/src/test/resources/application.conf @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + + + +{ + service { + env = "testing" + host = "localhost" + port = 9090 + username = "admin" + password = "secret" + readTimeOutSeconds = 60 + context = "/rest" + timezone = "UTC" + } + + "appId" : "HadoopJmxAppForTest", + "mode" : "LOCAL", + "siteId" : "testsite", + "dataSourceConfig": { + "topic" : "hadoop_jmx_metric", + "zkConnection" : "localhost:2181", + "txZkServers" : "localhost:2181" + } + "dataSinkConfig": { + "topic" : "hadoop_jmx_metric", + "brokerList" : "localhost:6667", + "serializerClass" : "kafka.serializer.StringEncoder", + "keySerializerClass" : "kafka.serializer.StringEncoder" + "producerType" : "async", + "numBatchMessages" : "4096", + "maxQueueBufferMs" : "5000", + "requestRequiredAcks" : "0" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json b/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json new file mode 100644 index 0000000..f0f62f2 --- /dev/null +++ b/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json @@ -0,0 +1,8 @@ +{ + "host":"localhost", + "timestamp": 1480319107, + "metric": "hadoop.cpu.usage", + "component": "namenode", + "site": "test", + "value": 0.96 +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java index 6a7535c..6471dfc 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java @@ -1,102 +1,102 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.hadoop.queue; - -import com.typesafe.config.Config; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase; -import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; -import org.apache.eagle.jpm.util.Constants; -import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; -import org.apache.eagle.metadata.model.ApplicationEntity; -import org.apache.eagle.service.client.IEagleServiceClient; -import org.apache.eagle.service.client.impl.EagleServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; - -public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthCheckBase { - private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningApplicationHealthCheck.class); - private static final long DEFAULT_MAX_DELAY_TIME = 10 * 60 * 1000L; - - private HadoopQueueRunningAppConfig hadoopQueueRunningAppConfig; - - public HadoopQueueRunningApplicationHealthCheck(Config config) { - super(config); - this.hadoopQueueRunningAppConfig = new HadoopQueueRunningAppConfig(config); - } - - @Override - public Result check() { - HadoopQueueRunningAppConfig.EagleProps eagleServiceConfig = this.hadoopQueueRunningAppConfig.eagleProps; - IEagleServiceClient client = new EagleServiceClientImpl( - eagleServiceConfig.eagleService.host, - eagleServiceConfig.eagleService.port, - eagleServiceConfig.eagleService.username, - eagleServiceConfig.eagleService.password); - - client.getJerseyClient().setReadTimeout(60000); - - String message = ""; - try { - ApplicationEntity.Status status = getApplicationStatus(); - if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) { - message += String.format("Application is not RUNNING, status is %s. ", status.toString()); - } - - - String query = String.format("%s[@site=\"%s\"]<@site>{max(timestamp)}", - Constants.GENERIC_METRIC_SERVICE, - hadoopQueueRunningAppConfig.eagleProps.site); - - GenericServiceAPIResponseEntity response = client - .search(query) - .metricName(HadoopClusterConstants.MetricName.HADOOP_CLUSTER_ALLOCATED_MEMORY) - .startTime(System.currentTimeMillis() - 24 * 60 * 60000L) - .endTime(System.currentTimeMillis()) - .pageSize(10) - .send(); - List<Map<List<String>, List<Double>>> results = response.getObj(); - long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue(); - long currentTimeStamp = System.currentTimeMillis(); - long maxDelayTime = DEFAULT_MAX_DELAY_TIME; - if (hadoopQueueRunningAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) { - maxDelayTime = hadoopQueueRunningAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY); - } - - if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime) { - message += String.format("Current process time is %sms, delay %s minutes.", - currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L); - return Result.unhealthy(message); - } else { - return Result.healthy(); - } - } catch (Exception e) { - return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e))); - } finally { - client.getJerseyClient().destroy(); - try { - client.close(); - } catch (Exception e) { - LOG.warn("{}", e); - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.hadoop.queue; + +import com.typesafe.config.Config; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.metadata.model.ApplicationEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthCheckBase { + private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningApplicationHealthCheck.class); + private static final long DEFAULT_MAX_DELAY_TIME = 10 * 60 * 1000L; + + private HadoopQueueRunningAppConfig hadoopQueueRunningAppConfig; + + public HadoopQueueRunningApplicationHealthCheck(Config config) { + super(config); + this.hadoopQueueRunningAppConfig = new HadoopQueueRunningAppConfig(config); + } + + @Override + public Result check() { + HadoopQueueRunningAppConfig.EagleProps eagleServiceConfig = this.hadoopQueueRunningAppConfig.eagleProps; + IEagleServiceClient client = new EagleServiceClientImpl( + eagleServiceConfig.eagleService.host, + eagleServiceConfig.eagleService.port, + eagleServiceConfig.eagleService.username, + eagleServiceConfig.eagleService.password); + + client.getJerseyClient().setReadTimeout(60000); + + String message = ""; + try { + ApplicationEntity.Status status = getApplicationStatus(); + if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) { + message += String.format("Application is not RUNNING, status is %s. ", status.toString()); + } + + + String query = String.format("%s[@site=\"%s\"]<@site>{max(timestamp)}", + Constants.GENERIC_METRIC_SERVICE, + hadoopQueueRunningAppConfig.eagleProps.site); + + GenericServiceAPIResponseEntity response = client + .search(query) + .metricName(HadoopClusterConstants.MetricName.HADOOP_CLUSTER_ALLOCATED_MEMORY) + .startTime(System.currentTimeMillis() - 24 * 60 * 60000L) + .endTime(System.currentTimeMillis()) + .pageSize(10) + .send(); + List<Map<List<String>, List<Double>>> results = response.getObj(); + long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue(); + long currentTimeStamp = System.currentTimeMillis(); + long maxDelayTime = DEFAULT_MAX_DELAY_TIME; + if (hadoopQueueRunningAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) { + maxDelayTime = hadoopQueueRunningAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY); + } + + if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime) { + message += String.format("Current process time is %sms, delay %s minutes.", + currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L); + return Result.unhealthy(message); + } else { + return Result.healthy(); + } + } catch (Exception e) { + return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e))); + } finally { + client.getJerseyClient().destroy(); + try { + client.close(); + } catch (Exception e) { + LOG.warn("{}", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java index f19c366..e145cf3 100644 --- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java +++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/storm/AggregationSpout.java @@ -76,7 +76,7 @@ public class AggregationSpout extends BaseRichSpout { //1, get last updateTime; lastUpdateTime = AggregationTimeManager.instance().readLastFinishTime(); if (lastUpdateTime == 0L) { - //init state, just set to currentTime - 18 hours + //prepare state, just set to currentTime - 18 hours lastUpdateTime = (currentJobTime - (MAX_SAFE_TIME + MAX_WAIT_TIME)) / 3600000 * 3600000; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java index 907ccdb..66906f0 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java @@ -17,10 +17,9 @@ package org.apache.eagle.jpm.mr.history; import backtype.storm.topology.BoltDeclarer; -import com.codahale.metrics.health.HealthCheck; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; -import org.apache.eagle.app.sink.StormStreamSink; +import org.apache.eagle.app.messaging.StormStreamSink; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder; import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout; @@ -37,11 +36,11 @@ import java.util.regex.Pattern; public class MRHistoryJobApplication extends StormApplication { @Override public StormTopology execute(Config config, StormEnvironment environment) { - //1. trigger init conf + //1. trigger prepare conf MRHistoryJobConfig appConfig = MRHistoryJobConfig.newInstance(config); com.typesafe.config.Config jhfAppConf = appConfig.getConfig(); - //2. init JobHistoryContentFilter + //2. prepare JobHistoryContentFilter final JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile(); String[] confKeyPatternsSplit = jhfAppConf.getString("MRConfigureKeys.jobConfigKey").split(","); List<String> confKeyPatterns = new ArrayList<>(confKeyPatternsSplit.length); @@ -60,7 +59,7 @@ public class MRHistoryJobApplication extends StormApplication { builder.includeJobKeyPatterns(Pattern.compile(key)); } JobHistoryContentFilter filter = builder.build(); - //3. init topology + //3. prepare topology TopologyBuilder topologyBuilder = new TopologyBuilder(); String spoutName = "mrHistoryJobSpout"; int tasks = jhfAppConf.getInt("stormConfig.mrHistoryJobSpoutTasks"); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java index e5c7c87..de0d846 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java @@ -32,7 +32,7 @@ import java.util.List; public class MRRunningJobApplication extends StormApplication { @Override public StormTopology execute(Config config, StormEnvironment environment) { - //1. trigger init conf + //1. trigger prepare conf MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.newInstance(config); String[] confKeyPatternsSplit = mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobConfigKey").split(","); @@ -46,7 +46,7 @@ public class MRRunningJobApplication extends StormApplication { confKeyKeys.add(Constants.JobConfiguration.SCOOBI_JOB); confKeyKeys.add(0, mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobNameKey")); - //2. init topology + //2. prepare topology TopologyBuilder topologyBuilder = new TopologyBuilder(); String spoutName = "mrRunningJobFetchSpout"; String boltName = "mrRunningJobParseBolt"; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml index 66da734..1b7b8ed 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml @@ -1,4 +1,4 @@ -<!-- +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java index d6f5031..fdfcaad 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java @@ -1,99 +1,99 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.jpm.spark.history; - -import com.typesafe.config.Config; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase; -import org.apache.eagle.jpm.util.Constants; -import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; -import org.apache.eagle.metadata.model.ApplicationEntity; -import org.apache.eagle.service.client.IEagleServiceClient; -import org.apache.eagle.service.client.impl.EagleServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; - -public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBase { - private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobApplicationHealthCheck.class); - - private SparkHistoryJobAppConfig sparkHistoryJobAppConfig; - - public SparkHistoryJobApplicationHealthCheck(Config config) { - super(config); - this.sparkHistoryJobAppConfig = SparkHistoryJobAppConfig.newInstance(config); - } - - @Override - public Result check() { - SparkHistoryJobAppConfig.EagleInfo eagleServiceConfig = sparkHistoryJobAppConfig.eagleInfo; - IEagleServiceClient client = new EagleServiceClientImpl( - eagleServiceConfig.host, - eagleServiceConfig.port, - eagleServiceConfig.username, - eagleServiceConfig.password); - - client.getJerseyClient().setReadTimeout(eagleServiceConfig.timeout * 1000); - - String message = ""; - try { - ApplicationEntity.Status status = getApplicationStatus(); - if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) { - message += String.format("Application is not RUNNING, status is %s. ", status.toString()); - } - - String query = String.format("%s[@site=\"%s\"]<@site>{max(endTime)}", - Constants.SPARK_APP_SERVICE_ENDPOINT_NAME, - sparkHistoryJobAppConfig.stormConfig.siteId); - - GenericServiceAPIResponseEntity response = client - .search(query) - .startTime(System.currentTimeMillis() - 12 * 60 * 60000L) - .endTime(System.currentTimeMillis()) - .pageSize(10) - .send(); - - List<Map<List<String>, List<Double>>> results = response.getObj(); - long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue(); - long currentTimeStamp = System.currentTimeMillis(); - long maxDelayTime = DEFAULT_MAX_DELAY_TIME; - if (sparkHistoryJobAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) { - maxDelayTime = sparkHistoryJobAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY); - } - - if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime * 3) { - message += String.format("Current process time is %sms, delay %s hours.", - currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60); - return Result.unhealthy(message); - } else { - return Result.healthy(); - } - } catch (Exception e) { - return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e))); - } finally { - client.getJerseyClient().destroy(); - try { - client.close(); - } catch (Exception e) { - LOG.warn("{}", e); - } - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.spark.history; + +import com.typesafe.config.Config; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.metadata.model.ApplicationEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBase { + private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobApplicationHealthCheck.class); + + private SparkHistoryJobAppConfig sparkHistoryJobAppConfig; + + public SparkHistoryJobApplicationHealthCheck(Config config) { + super(config); + this.sparkHistoryJobAppConfig = SparkHistoryJobAppConfig.newInstance(config); + } + + @Override + public Result check() { + SparkHistoryJobAppConfig.EagleInfo eagleServiceConfig = sparkHistoryJobAppConfig.eagleInfo; + IEagleServiceClient client = new EagleServiceClientImpl( + eagleServiceConfig.host, + eagleServiceConfig.port, + eagleServiceConfig.username, + eagleServiceConfig.password); + + client.getJerseyClient().setReadTimeout(eagleServiceConfig.timeout * 1000); + + String message = ""; + try { + ApplicationEntity.Status status = getApplicationStatus(); + if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) { + message += String.format("Application is not RUNNING, status is %s. ", status.toString()); + } + + String query = String.format("%s[@site=\"%s\"]<@site>{max(endTime)}", + Constants.SPARK_APP_SERVICE_ENDPOINT_NAME, + sparkHistoryJobAppConfig.stormConfig.siteId); + + GenericServiceAPIResponseEntity response = client + .search(query) + .startTime(System.currentTimeMillis() - 12 * 60 * 60000L) + .endTime(System.currentTimeMillis()) + .pageSize(10) + .send(); + + List<Map<List<String>, List<Double>>> results = response.getObj(); + long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue(); + long currentTimeStamp = System.currentTimeMillis(); + long maxDelayTime = DEFAULT_MAX_DELAY_TIME; + if (sparkHistoryJobAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) { + maxDelayTime = sparkHistoryJobAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY); + } + + if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime * 3) { + message += String.format("Current process time is %sms, delay %s hours.", + currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L / 60); + return Result.unhealthy(message); + } else { + return Result.healthy(); + } + } catch (Exception e) { + return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e))); + } finally { + client.getJerseyClient().destroy(); + try { + client.close(); + } catch (Exception e) { + LOG.warn("{}", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java index 209481a..0e1aacd 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java @@ -29,10 +29,10 @@ import com.typesafe.config.Config; public class SparkRunningJobApp extends StormApplication { @Override public StormTopology execute(Config config, StormEnvironment environment) { - //1. trigger init conf + //1. trigger prepare conf SparkRunningJobAppConfig sparkRunningJobAppConfig = SparkRunningJobAppConfig.newInstance(config); - //2. init topology + //2. prepare topology TopologyBuilder topologyBuilder = new TopologyBuilder(); final String spoutName = SparkRunningJobAppConfig.JOB_FETCH_SPOUT_NAME; final String boltName = SparkRunningJobAppConfig.JOB_PARSE_BOLT_NAME; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java index c5c0388..11a22e5 100644 --- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java +++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java @@ -25,7 +25,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; -import org.apache.eagle.app.sink.StormStreamSink; +import org.apache.eagle.app.messaging.StormStreamSink; import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider; import storm.kafka.StringScheme; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java index a1daf89..6d7022b 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java @@ -24,18 +24,16 @@ import backtype.storm.topology.BoltDeclarer; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; import com.typesafe.config.Config; import org.apache.commons.lang3.time.DateUtils; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; -import org.apache.eagle.app.sink.StormStreamSink; +import org.apache.eagle.app.messaging.StormStreamSink; import org.apache.eagle.common.config.EagleConfigConstants; import org.apache.eagle.dataproc.impl.storm.partition.*; import org.apache.eagle.security.partition.DataDistributionDaoImpl; import org.apache.eagle.security.partition.GreedyPartitionAlgorithm; import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider; -import storm.kafka.StringScheme; /** * Since 8/10/16. http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java index 7a4509b..4df4a5b 100644 --- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java +++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveQueryMonitoringApplication.java @@ -26,7 +26,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; -import org.apache.eagle.app.sink.StormStreamSink; +import org.apache.eagle.app.messaging.StormStreamSink; import org.apache.eagle.security.hive.jobrunning.HiveJobRunningSourcedStormSpoutProvider; import org.apache.eagle.security.hive.jobrunning.HiveQueryParserBolt; import org.apache.eagle.security.hive.jobrunning.JobFilterBolt; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java index c1c3033..32dcc30 100644 --- a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java +++ b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java @@ -19,7 +19,7 @@ package org.apache.eagle.security.oozie.parse; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; -import org.apache.eagle.app.sink.StormStreamSink; +import org.apache.eagle.app.messaging.StormStreamSink; import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider; import org.apache.eagle.security.oozie.parse.sensitivity.OozieResourceSensitivityDataJoinBolt; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-server-assembly/src/main/conf/eagle.conf ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/conf/eagle.conf b/eagle-server-assembly/src/main/conf/eagle.conf index 5f6c240..705ef6f 100644 --- a/eagle-server-assembly/src/main/conf/eagle.conf +++ b/eagle-server-assembly/src/main/conf/eagle.conf @@ -83,8 +83,8 @@ metadata { # Eagle Application Configuration # --------------------------------------------- application { - sink { - type = org.apache.eagle.app.sink.KafkaStreamSink + stream { + provider = org.apache.eagle.app.messaging.KafkaStreamProvider } storm { nimbusHost = "server.eagle.apache.org" http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-server/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/resources/application.conf b/eagle-server/src/main/resources/application.conf index ce68550..20f5b2e 100644 --- a/eagle-server/src/main/resources/application.conf +++ b/eagle-server/src/main/resources/application.conf @@ -85,8 +85,8 @@ metadata { # Eagle Application Configuration # --------------------------------------------- application { - sink { - type = org.apache.eagle.app.sink.KafkaStreamSink + stream { + provider = org.apache.eagle.app.messaging.KafkaStreamProvider } storm { nimbusHost = "server.eagle.apache.org" http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java index 93a06f8..ba5914b 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApp.java @@ -23,7 +23,7 @@ import backtype.storm.topology.TopologyBuilder; import com.typesafe.config.Config; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; -import org.apache.eagle.app.sink.StormStreamSink; +import org.apache.eagle.app.messaging.StormStreamSink; import org.apache.eagle.topology.storm.HealthCheckParseBolt; import org.apache.eagle.topology.storm.TopologyCheckAppSpout; import org.apache.eagle.topology.storm.TopologyDataPersistBolt; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/10572c29/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java index 7860cb5..bf5e695 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java @@ -1,109 +1,109 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.topology; - -import com.typesafe.config.Config; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase; -import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; -import org.apache.eagle.metadata.model.ApplicationEntity; -import org.apache.eagle.service.client.IEagleServiceClient; -import org.apache.eagle.service.client.impl.EagleServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; - -public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckBase { - private static final Logger LOG = LoggerFactory.getLogger(TopologyCheckApplicationHealthCheck.class); - private static final long DEFAULT_MAX_DELAY_TIME = 10 * 60 * 1000L; - - private TopologyCheckAppConfig topologyCheckAppConfig; - - public TopologyCheckApplicationHealthCheck(Config config) { - super(config); - topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config); - } - - @Override - public Result check() { - //FIXME, this application owner please add eagle server config to Class TopologyCheckAppConfig - IEagleServiceClient client = new EagleServiceClientImpl( - topologyCheckAppConfig.getConfig().getString("service.host"), - topologyCheckAppConfig.getConfig().getInt("service.port"), - topologyCheckAppConfig.getConfig().getString("service.username"), - topologyCheckAppConfig.getConfig().getString("service.password")); - - client.getJerseyClient().setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000); - - String message = ""; - try { - ApplicationEntity.Status status = getApplicationStatus(); - if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) { - message += String.format("Application is not RUNNING, status is %s. ", status.toString()); - } - - long currentProcessTimeStamp = Math.min( - Math.min( - getServiceLatestUpdateTime(TopologyConstants.HBASE_INSTANCE_SERVICE_NAME, client), - getServiceLatestUpdateTime(TopologyConstants.HDFS_INSTANCE_SERVICE_NAME, client) - ), getServiceLatestUpdateTime(TopologyConstants.MR_INSTANCE_SERVICE_NAME, client)); - long currentTimeStamp = System.currentTimeMillis(); - long maxDelayTime = DEFAULT_MAX_DELAY_TIME; - if (topologyCheckAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) { - maxDelayTime = topologyCheckAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY); - } - - if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime) { - message += String.format("Current process time is %sms, delay %s minutes.", - currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L); - return Result.unhealthy(message); - } else { - return Result.healthy(); - } - } catch (Exception e) { - return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e))); - } finally { - client.getJerseyClient().destroy(); - try { - client.close(); - } catch (Exception e) { - LOG.warn("{}", e); - } - } - } - - private long getServiceLatestUpdateTime(String serviceName, IEagleServiceClient client) throws Exception { - String query = String.format("%s[@site=\"%s\"]<@site>{max(lastUpdateTime)}", - serviceName, - topologyCheckAppConfig.dataExtractorConfig.site); - - GenericServiceAPIResponseEntity response = client - .search(query) - .pageSize(10) - .send(); - - List<Map<List<String>, List<Double>>> results = response.getObj(); - if (results.size() == 0) { - return Long.MAX_VALUE; - } - long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue(); - return currentProcessTimeStamp; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.topology; + +import com.typesafe.config.Config; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.eagle.app.service.impl.ApplicationHealthCheckBase; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.metadata.model.ApplicationEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckBase { + private static final Logger LOG = LoggerFactory.getLogger(TopologyCheckApplicationHealthCheck.class); + private static final long DEFAULT_MAX_DELAY_TIME = 10 * 60 * 1000L; + + private TopologyCheckAppConfig topologyCheckAppConfig; + + public TopologyCheckApplicationHealthCheck(Config config) { + super(config); + topologyCheckAppConfig = TopologyCheckAppConfig.newInstance(config); + } + + @Override + public Result check() { + //FIXME, this application owner please add eagle server config to Class TopologyCheckAppConfig + IEagleServiceClient client = new EagleServiceClientImpl( + topologyCheckAppConfig.getConfig().getString("service.host"), + topologyCheckAppConfig.getConfig().getInt("service.port"), + topologyCheckAppConfig.getConfig().getString("service.username"), + topologyCheckAppConfig.getConfig().getString("service.password")); + + client.getJerseyClient().setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000); + + String message = ""; + try { + ApplicationEntity.Status status = getApplicationStatus(); + if (!status.toString().equals(ApplicationEntity.Status.RUNNING.toString())) { + message += String.format("Application is not RUNNING, status is %s. ", status.toString()); + } + + long currentProcessTimeStamp = Math.min( + Math.min( + getServiceLatestUpdateTime(TopologyConstants.HBASE_INSTANCE_SERVICE_NAME, client), + getServiceLatestUpdateTime(TopologyConstants.HDFS_INSTANCE_SERVICE_NAME, client) + ), getServiceLatestUpdateTime(TopologyConstants.MR_INSTANCE_SERVICE_NAME, client)); + long currentTimeStamp = System.currentTimeMillis(); + long maxDelayTime = DEFAULT_MAX_DELAY_TIME; + if (topologyCheckAppConfig.getConfig().hasPath(MAX_DELAY_TIME_KEY)) { + maxDelayTime = topologyCheckAppConfig.getConfig().getLong(MAX_DELAY_TIME_KEY); + } + + if (!message.isEmpty() || currentTimeStamp - currentProcessTimeStamp > maxDelayTime) { + message += String.format("Current process time is %sms, delay %s minutes.", + currentProcessTimeStamp, (currentTimeStamp - currentProcessTimeStamp) * 1.0 / 60000L); + return Result.unhealthy(message); + } else { + return Result.healthy(); + } + } catch (Exception e) { + return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e))); + } finally { + client.getJerseyClient().destroy(); + try { + client.close(); + } catch (Exception e) { + LOG.warn("{}", e); + } + } + } + + private long getServiceLatestUpdateTime(String serviceName, IEagleServiceClient client) throws Exception { + String query = String.format("%s[@site=\"%s\"]<@site>{max(lastUpdateTime)}", + serviceName, + topologyCheckAppConfig.dataExtractorConfig.site); + + GenericServiceAPIResponseEntity response = client + .search(query) + .pageSize(10) + .send(); + + List<Map<List<String>, List<Double>>> results = response.getObj(); + if (results.size() == 0) { + return Long.MAX_VALUE; + } + long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue(); + return currentProcessTimeStamp; + } +}