Repository: incubator-eagle Updated Branches: refs/heads/master 91aa216a8 -> 3ee73e8d5
add metric topology for offline metric collection Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/34fa6d90 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/34fa6d90 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/34fa6d90 Branch: refs/heads/master Commit: 34fa6d90a8cfe21a8ad85ce852b648a4a05cc363 Parents: 6f29955 Author: sunlibin <abnersunli...@gmail.com> Authored: Sat Nov 21 18:04:54 2015 +0800 Committer: sunlibin <abnersunli...@gmail.com> Committed: Mon Nov 30 11:48:27 2015 +0800 ---------------------------------------------------------------------- .../apache/eagle/executor/AlertExecutor.java | 10 +- .../config/RunningJobCrawlConfig.java | 14 +- .../storm/kafka/KafkaSourcedSpoutProvider.java | 4 - .../storm/kafka/KafkaSourcedSpoutScheme.java | 1 - .../impl/storm/zookeeper/ZKStateConfig.java | 28 ++++ .../eagle/datastream/StreamProducer.scala | 4 + .../org/apache/eagle/metric/CountingMetric.java | 8 +- .../java/org/apache/eagle/metric/Metric.java | 5 +- .../manager/EagleMetricReportManager.java | 45 +++++++ .../metric/report/EagleSerivceMetricReport.java | 61 --------- .../metric/report/EagleServiceMetricReport.java | 60 +++++++++ .../metric/report/MetricEntityConvert.java | 2 +- .../eagle/metric/report/MetricReport.java | 4 +- eagle-security/eagle-metric-collection/pom.xml | 95 ++++++++++++++ .../metric/kafka/EagleMetricCollectorMain.java | 127 ++++++++++++++++++ .../eagle/metric/kafka/KafkaConsumerOffset.java | 27 ++++ .../kafka/KafkaConsumerOffsetFetcher.java | 70 ++++++++++ .../metric/kafka/KafkaLatestOffsetFetcher.java | 98 ++++++++++++++ .../kafka/KafkaMessageDistributionExecutor.java | 126 ++++++++++++++++++ .../metric/kafka/KafkaOffsetCheckerConfig.java | 50 +++++++ .../kafka/KafkaOffsetSourceSpoutProvider.java | 53 ++++++++ .../eagle/metric/kafka/KafkaOffsetSpout.java | 131 +++++++++++++++++++ .../src/main/resources/application.conf | 39 ++++++ .../src/main/resources/log4j.properties | 39 ++++++ .../src/test/java/TestKafkaOffset.java | 2 + .../auditlog/HdfsAuditLogProcessorMain.java | 1 - ...HiveJobRunningSourcedStormSpoutProvider.java | 2 +- eagle-security/pom.xml | 1 + eagle-topology-assembly/pom.xml | 5 + 29 files changed, 1021 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java index 4f0f4b3..d86a846 100644 --- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java +++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java @@ -28,7 +28,7 @@ import org.apache.eagle.datastream.JavaStormStreamExecutor2; import org.apache.eagle.datastream.Tuple2; import org.apache.eagle.metric.CountingMetric; import org.apache.eagle.metric.Metric; -import org.apache.eagle.metric.report.EagleSerivceMetricReport; +import org.apache.eagle.metric.report.EagleServiceMetricReport; import com.sun.jersey.client.impl.CopyOnWriteHashMap; import com.typesafe.config.Config; import org.apache.eagle.alert.policy.*; @@ -73,7 +73,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti private Map<String, Map<String, String>> dimensionsMap; // cache it for performance private Map<String, String> baseDimensions; private Thread metricReportThread; - private EagleSerivceMetricReport metricReport; + private EagleServiceMetricReport metricReport; public AlertExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq, AlertDefinitionDAO alertDefinitionDao, String[] sourceStreams){ @@ -122,7 +122,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null; String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null; - this.metricReport = new EagleSerivceMetricReport(eagleServiceHost, eagleServicePort, username, password); + this.metricReport = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password); metricMap = new ConcurrentHashMap<String, Metric>(); baseDimensions = new HashMap<String, String>(); @@ -172,7 +172,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti } policyEvaluators = new CopyOnWriteHashMap<>(); - // for efficency, we don't put single policy evaluator + // for efficiency, we don't put single policy evaluator policyEvaluators.putAll(tmpPolicyEvaluators); DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance(); @@ -258,7 +258,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti long previous = metric.getTimestamp(); if (current > previous + MERITE_GRANULARITY) { metricList.add(metric); - metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), metric.getDemensions(), metric.getMetricName())); + metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), metric.getDimensions(), metric.getMetricName())); } } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java index b17a41d..79a8928 100644 --- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java +++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java @@ -16,10 +16,11 @@ */ package org.apache.eagle.jobrunning.config; -import java.io.Serializable; - +import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig; import org.apache.eagle.job.JobPartitioner; +import java.io.Serializable; + public class RunningJobCrawlConfig implements Serializable{ private static final long serialVersionUID = 1L; public RunningJobEndpointConfig endPointConfig; @@ -49,13 +50,4 @@ public class RunningJobCrawlConfig implements Serializable{ public Class<? extends JobPartitioner> partitionerCls; public int numTotalPartitions = 1; } - - public static class ZKStateConfig implements Serializable{ - private static final long serialVersionUID = 1L; - public String zkQuorum; - public String zkRoot; - public int zkSessionTimeoutMs; - public int zkRetryTimes; - public int zkRetryInterval; - } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java index 06d37ef..373b3ca 100644 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java @@ -51,10 +51,6 @@ public class KafkaSourcedSpoutProvider extends AbstractStormSpoutProvider{ String zkConnString = context.getString("dataSourceConfig.zkConnection"); // transaction zkRoot String zkRoot = context.getString("dataSourceConfig.transactionZKRoot"); - // Site - String site = context.getString("eagleProps.site"); - - //String realTopic = (site ==null)? topic : String.format("%s_%s",site,topic); LOG.info(String.format("Use topic id: %s",topic)); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java index 8bdbcb5..8b65c1f 100644 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java @@ -51,7 +51,6 @@ public class KafkaSourcedSpoutScheme implements Scheme { @Override public List<Object> deserialize(byte[] ser) { Object tmp = deserializer.deserialize(ser); - Map<String, Object> map = (Map<String, Object>)tmp; if(tmp == null) return null; // the following tasks are executed within the same process of kafka spout http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java new file mode 100644 index 0000000..f9515f5 --- /dev/null +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.dataproc.impl.storm.zookeeper; + +import java.io.Serializable; + +public class ZKStateConfig implements Serializable { + private static final long serialVersionUID = 1L; + public String zkQuorum; + public String zkRoot; + public int zkSessionTimeoutMs; + public int zkRetryTimes; + public int zkRetryInterval; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala index 9fb3e22..40d4904 100644 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala @@ -103,6 +103,10 @@ trait StreamProducer{ ret } + def streamUnion(others : util.List[StreamProducer]) : StreamProducer = { + streamUnion(others); + } + def streamUnion(others : Seq[StreamProducer]) : StreamProducer = { val ret = StreamUnionProducer(incrementAndGetId(), others) hookupDAG(graph, this, ret) http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java index f4d5cd5..4f65b8e 100644 --- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java +++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java @@ -25,16 +25,20 @@ import com.google.common.util.concurrent.AtomicDouble; */ public class CountingMetric extends Metric{ + public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName, double value) { + super(timestamp, dimensions, metricName, new AtomicDouble(value)); + } + public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName, AtomicDouble value) { super(timestamp, dimensions, metricName, value); } - + public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName) { this(timestamp, dimensions, metricName, new AtomicDouble(0.0)); } public CountingMetric(CountingMetric metric) { - this(metric.timestamp, new HashMap<String, String>(metric.dimensions), metric.metricName, metric.value); + this(metric.timestamp, new HashMap<>(metric.dimensions), metric.metricName, metric.value); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java index 993906e..616c82b 100644 --- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java +++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java @@ -16,6 +16,7 @@ */ package org.apache.eagle.metric; +import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -32,7 +33,7 @@ public abstract class Metric implements MetricOperator{ public Metric(long timestamp, Map<String, String> dimensions, String metricName, AtomicDouble value) { this.timestamp = timestamp; - this.dimensions = dimensions; + this.dimensions = new HashMap<>(dimensions); this.metricName = metricName; this.value = value; } @@ -45,7 +46,7 @@ public abstract class Metric implements MetricOperator{ return timestamp; } - public Map<String, String> getDemensions() { + public Map<String, String> getDimensions() { return dimensions; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java new file mode 100644 index 0000000..b63944d --- /dev/null +++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java @@ -0,0 +1,45 @@ +package org.apache.eagle.metric.manager; + +import org.apache.eagle.metric.Metric; +import org.apache.eagle.metric.report.MetricReport; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class EagleMetricReportManager { + + private static EagleMetricReportManager manager = new EagleMetricReportManager(); + private Map<String, MetricReport> metricReportMap = new ConcurrentHashMap<>(); + + private EagleMetricReportManager() { + + } + + public static EagleMetricReportManager getInstance () { + return manager; + } + + public boolean register(String name, MetricReport report) { + if (metricReportMap.get(name) == null) { + synchronized (metricReportMap) { + if (metricReportMap.get(name) == null) { + metricReportMap.put(name, report); + return true; + } + } + } + return false; + } + + public Map<String, MetricReport> getRegisteredReports() { + return metricReportMap; + } + + public void emit(List<Metric> list) { + synchronized (this.metricReportMap) { + for (MetricReport report : metricReportMap.values()) { + report.emit(list); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java deleted file mode 100644 index 31056f2..0000000 --- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.metric.report; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.eagle.metric.Metric; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.eagle.log.entity.GenericMetricEntity; -import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; -import org.apache.eagle.service.client.impl.EagleServiceClientImpl; - -public class EagleSerivceMetricReport implements MetricReport{ - - private EagleServiceClientImpl client; - private static final Logger LOG = LoggerFactory.getLogger(EagleSerivceMetricReport.class); - - public EagleSerivceMetricReport(String host, int port, String username, String password) { - client = new EagleServiceClientImpl(host, port, username, password); - } - - public EagleSerivceMetricReport(String host, int port) { - client = new EagleServiceClientImpl(host, port, null, null); - } - - public void emit(List<Metric> list) { - List<GenericMetricEntity> entities = new ArrayList<GenericMetricEntity>(); - for (Metric metric : list) { - entities.add(MetricEntityConvert.convert(metric)); - } - try { - int total = entities.size(); - GenericServiceAPIResponseEntity<String> response = client.create(entities, GenericMetricEntity.GENERIC_METRIC_SERVICE); - if(response.isSuccess()) { - LOG.info("Wrote " + total + " entities to service"); - }else{ - LOG.error("Failed to write " + total + " entities to service, due to server exception: "+ response.getException()); - } - } - catch (Exception ex) { - LOG.error("Got exception while writing entities: ", ex); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java new file mode 100644 index 0000000..7ff415e --- /dev/null +++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric.report; + +import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.metric.Metric; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class EagleServiceMetricReport implements MetricReport{ + + private EagleServiceClientImpl client; + private static final Logger LOG = LoggerFactory.getLogger(EagleServiceMetricReport.class); + + public EagleServiceMetricReport(String host, int port, String username, String password) { + client = new EagleServiceClientImpl(host, port, username, password); + } + + public EagleServiceMetricReport(String host, int port) { + client = new EagleServiceClientImpl(host, port, null, null); + } + + public void emit(List<Metric> list) { + List<GenericMetricEntity> entities = new ArrayList<GenericMetricEntity>(); + for (Metric metric : list) { + entities.add(MetricEntityConvert.convert(metric)); + } + try { + int total = entities.size(); + GenericServiceAPIResponseEntity<String> response = client.create(entities, GenericMetricEntity.GENERIC_METRIC_SERVICE); + if(response.isSuccess()) { + LOG.info("Wrote " + total + " entities to service"); + }else{ + LOG.error("Failed to write " + total + " entities to service, due to server exception: "+ response.getException()); + } + } + catch (Exception ex) { + LOG.error("Got exception while writing entities: ", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java index 10f05ca..c389fa7 100644 --- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java +++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java @@ -25,7 +25,7 @@ public class MetricEntityConvert { GenericMetricEntity entity = new GenericMetricEntity(); entity.setPrefix(metric.getMetricName()); entity.setValue(new double[]{metric.getValue().get()}); - entity.setTags(metric.getDemensions()); + entity.setTags(metric.getDimensions()); entity.setTimestamp(metric.getTimestamp()); return entity; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java index c03a89f..85d423b 100644 --- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java +++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java @@ -21,6 +21,6 @@ import java.util.List; import org.apache.eagle.metric.Metric; public interface MetricReport { - - public void emit(List<Metric> list); + // The method should be thread safe + void emit(List<Metric> list); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/pom.xml b/eagle-security/eagle-metric-collection/pom.xml new file mode 100644 index 0000000..f2e78a6 --- /dev/null +++ b/eagle-security/eagle-metric-collection/pom.xml @@ -0,0 +1,95 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>eagle</groupId> + <artifactId>eagle-security-parent</artifactId> + <version>0.3.0</version> + <relativePath>../pom.xml</relativePath> + </parent> + <artifactId>eagle-metric-collection</artifactId> + <packaging>jar</packaging> + <name>eagle-metric-collection</name> + <url>http://maven.apache.org</url> + <dependencies> + <dependency> + <groupId>eagle</groupId> + <artifactId>eagle-security-hdfs-auditlog</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-kafka</artifactId> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>eagle</groupId> + <artifactId>eagle-alert-process</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>eagle</groupId> + <artifactId>eagle-stream-process-base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>eagle</groupId> + <artifactId>eagle-stream-process-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.hierynomus</groupId> + <artifactId>sshj</artifactId> + <version>0.13.0</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java new file mode 100644 index 0000000..65fe68a --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric.kafka; + +import backtype.storm.spout.SchemeAsMultiScheme; +import backtype.storm.topology.base.BaseRichSpout; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; +import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme; +import org.apache.eagle.dataproc.util.ConfigOptionParser; +import org.apache.eagle.datastream.ExecutionEnvironmentFactory; +import org.apache.eagle.datastream.StormExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import storm.kafka.BrokerHosts; +import storm.kafka.KafkaSpout; +import storm.kafka.SpoutConfig; +import storm.kafka.ZkHosts; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class EagleMetricCollectorMain { + + private static final Logger LOG = LoggerFactory.getLogger(EagleMetricCollectorMain.class); + + public static void main(String[] args) throws Exception { + new ConfigOptionParser().load(args); + //System.setProperty("config.resource", "/application.local.conf"); + + Config config = ConfigFactory.load(); + + StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config); + + String deserClsName = config.getString("dataSourceConfig.deserializerClass"); + final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) { + @Override + public List<Object> deserialize(byte[] ser) { + Object tmp = deserializer.deserialize(ser); + Map<String, Object> map = (Map<String, Object>)tmp; + if(tmp == null) return null; + return Arrays.asList(map.get("user"), map.get("timestamp")); + } + }; + + KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() { + @Override + public BaseRichSpout getSpout(Config context) { + // Kafka topic + String topic = context.getString("dataSourceConfig.topic"); + // Kafka consumer group id + String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId"); + // Kafka fetch size + int fetchSize = context.getInt("dataSourceConfig.fetchSize"); + // Kafka deserializer class + String deserClsName = context.getString("dataSourceConfig.deserializerClass"); + + // Kafka broker zk connection + String zkConnString = context.getString("dataSourceConfig.zkQuorum"); + + // transaction zkRoot + String zkRoot = context.getString("dataSourceConfig.transactionZKRoot"); + + LOG.info(String.format("Use topic id: %s",topic)); + + String brokerZkPath = null; + if(context.hasPath("dataSourceConfig.brokerZkPath")) { + brokerZkPath = context.getString("dataSourceConfig.brokerZkPath"); + } + + BrokerHosts hosts; + if(brokerZkPath == null) { + hosts = new ZkHosts(zkConnString); + } else { + hosts = new ZkHosts(zkConnString, brokerZkPath); + } + + SpoutConfig spoutConfig = new SpoutConfig(hosts, + topic, + zkRoot + "/" + topic, + groupId); + + // transaction zkServers + String[] zkConnections = zkConnString.split(","); + List<String> zkHosts = new ArrayList<>(); + for (String zkConnection : zkConnections) { + zkHosts.add(zkConnection.split(":")[0]); + } + Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]); + + spoutConfig.zkServers = zkHosts; + // transaction zkPort + spoutConfig.zkPort = zkPort; + // transaction update interval + spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS"); + // Kafka fetch size + spoutConfig.fetchSizeBytes = fetchSize; + + spoutConfig.scheme = new SchemeAsMultiScheme(scheme); + KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); + return kafkaSpout; + } + }; + + env.newSource(new KafkaOffsetSourceSpoutProvider().getSpout(config)).renameOutputFields(0).withName("kafkaLogLagChecker"); + env.newSource(kafkaMessageSpoutProvider.getSpout(config)).renameOutputFields(2).withName("kafkaMessageDistributionCheck").groupBy(Arrays.asList(0)) + .flatMap(new KafkaMessageDistributionExecutor()); + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java new file mode 100644 index 0000000..5cf0e11 --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric.kafka; + +import java.util.Map; + +public class KafkaConsumerOffset { + public Map<String, String> topology; + public Long offset; + public Long partition; + public Map<String, String> broker; + public String topic; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java new file mode 100644 index 0000000..f34f195 --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric.kafka; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class KafkaConsumerOffsetFetcher { + + public CuratorFramework curator; + public String zkRoot; + public ObjectMapper mapper; + public String topic; + public String group; + + public KafkaConsumerOffsetFetcher(ZKStateConfig config, String topic, String group) { + try { + this.curator = CuratorFrameworkFactory.newClient(config.zkQuorum, config.zkSessionTimeoutMs, 15000, + new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)); + curator.start(); + this.zkRoot = config.zkRoot; + mapper = new ObjectMapper(); + Module module = new SimpleModule("offset").registerSubtypes(new NamedType(KafkaConsumerOffset.class)); + mapper.registerModule(module); + this.topic = topic; + this.group = group; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public Map<String, Long> fetch() throws Exception { + Map<String, Long> map = new HashMap<String, Long>(); + String path = zkRoot + "/" + topic + "/" + group; + if (curator.checkExists().forPath(path) != null) { + List<String> partitions = curator.getChildren().forPath(path); + for (String partition : partitions) { + String partitionPath = path + "/" + partition; + String data = new String(curator.getData().forPath(partitionPath)); + KafkaConsumerOffset offset = mapper.readValue(data, KafkaConsumerOffset.class); + map.put(partition, offset.offset); + } + } + return map; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java new file mode 100644 index 0000000..de93ea3 --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric.kafka; + +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.TopicAndPartition; +import kafka.javaapi.*; +import kafka.javaapi.consumer.SimpleConsumer; +import java.util.*; + +public class KafkaLatestOffsetFetcher { + + private List<String> brokerList; + private int port; + + public KafkaLatestOffsetFetcher(String connectString) { + brokerList = new ArrayList<>(); + String[] brokers = connectString.split(","); + for (String broker : brokers) { + brokerList.add(broker.split(":")[0]); + } + this.port = Integer.valueOf(brokers[0].split(":")[1]); + } + + public Map<Integer, Long> fetch(String topic, int partitionCount) { + Map<Integer, PartitionMetadata> metadatas = fetchPartitionMetadata(brokerList, port, topic, partitionCount); + Map<Integer, Long> ret = new HashMap<>(); + for (int partition = 0; partition < partitionCount; partition++) { + PartitionMetadata metadata = metadatas.get(partition); + if (metadata == null) { + throw new RuntimeException("Can't find metadata for Topic and Partition. Exiting"); + } + if (metadata.leader() == null) { + throw new RuntimeException("Can't find Leader for Topic and Partition. Exiting"); + } + String leadBroker = metadata.leader().host(); + String clientName = "Client_" + topic + "_" + partition; + SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); + long lastestOffset = getLatestOffset(consumer, topic, partition, clientName); + if (consumer != null) consumer.close(); + ret.put(partition, lastestOffset); + } + return ret; + } + + public long getLatestOffset(SimpleConsumer consumer, String topic, int partition, String clientName) { + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); + Map<TopicAndPartition, kafka.api.PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1)); + kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); + OffsetResponse response = consumer.getOffsetsBefore(request); + if (response.hasError()) { + throw new RuntimeException("Error fetching data offset from the broker. Reason: " + response.errorCode(topic, partition) ); + } + long[] offsets = response.offsets(topic, partition); + return offsets[0]; + } + + private Map<Integer, PartitionMetadata> fetchPartitionMetadata(List<String> brokerList, int port, String topic, int partitionCount) { + Map<Integer, PartitionMetadata> partitionMetadata = new HashMap<>(); + for (String broker : brokerList) { + SimpleConsumer consumer = null; + try { + consumer = new SimpleConsumer(broker, port, 100000, 64 * 1024, "leaderLookup"); + List<String> topics = Collections.singletonList(topic); + TopicMetadataRequest req = new TopicMetadataRequest(topics); + kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); + + List<TopicMetadata> metaData = resp.topicsMetadata(); + for (TopicMetadata item : metaData) { + for (PartitionMetadata part : item.partitionsMetadata()) { + partitionMetadata.put(part.partitionId(), part); + } + } + if (partitionMetadata.size() == partitionCount) break; + } catch (Exception e) { + throw new RuntimeException("Error communicating with Broker [" + broker + "] " + "to find Leader for [" + topic + "] Reason: " + e); + } finally { + if (consumer != null) consumer.close(); + } + } + return partitionMetadata; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java new file mode 100644 index 0000000..be6d0f7 --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.metric.kafka; + +import com.typesafe.config.Config; +import org.apache.eagle.common.config.EagleConfigConstants; +import org.apache.eagle.datastream.Collector; +import org.apache.eagle.datastream.JavaStormStreamExecutor1; +import org.apache.eagle.datastream.Tuple1; +import org.apache.eagle.metric.CountingMetric; +import org.apache.eagle.metric.Metric; +import org.apache.eagle.metric.manager.EagleMetricReportManager; +import org.apache.eagle.metric.report.EagleServiceMetricReport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<String> { + + private Config config; + private Map<String, String> baseMetricDimension; + private Map<String, EventMetric> eventMetrics; + private static final long DEFAULT_METRIC_GRANULARITY = 5 * 60 * 1000; + private static final String metricName = "kafka.message.user.count"; + private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageDistributionExecutor.class); + + public static class EventMetric { + long latestMessageTime; + Metric metric; + + public EventMetric(long latestMessageTime, Metric metric) { + this.latestMessageTime = latestMessageTime; + this.metric = metric; + } + + public void update(double d) { + this.metric.update(d); + } + } + + @Override + public void prepareConfig(Config config) { + this.config = config; + } + + @Override + public void init() { + String site = config.getString("dataSourceConfig.site"); + String topic = config.getString("dataSourceConfig.topic"); + this.baseMetricDimension = new HashMap<>(); + this.baseMetricDimension.put("site", site); + this.baseMetricDimension.put("topic", topic); + String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); + int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); + String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME); + String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD); + + EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password); + EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report); + eventMetrics = new ConcurrentHashMap<>(); + } + + public long trimTimestamp(long timestamp, long granularity) { + return timestamp / granularity * granularity; + } + + public void putNewMetric(long currentMessageTime, String user) { + Map<String ,String> dimensions = new HashMap<>(); + dimensions.putAll(baseMetricDimension); + dimensions.put("user", user); + long trimTimestamp = trimTimestamp(currentMessageTime, DEFAULT_METRIC_GRANULARITY); + Metric metric = new CountingMetric(trimTimestamp, dimensions, metricName, 1); + eventMetrics.put(user, new EventMetric(currentMessageTime, metric)); + } + + public void update(long currentMessageTime, String user) { + if (eventMetrics.get(user) == null) { + LOG.info("Got metrics for new user: " + user); + putNewMetric(currentMessageTime, user); + } + else { + long latestMessageTime = eventMetrics.get(user).latestMessageTime; + if (currentMessageTime > latestMessageTime + DEFAULT_METRIC_GRANULARITY) { + EagleMetricReportManager.getInstance().emit(Arrays.asList(eventMetrics.remove(user).metric)); + putNewMetric(currentMessageTime, user); + } + else { + eventMetrics.get(user).update(1); + } + } + } + + @Override + public void flatMap(List<Object> input, Collector<Tuple1<String>> collector) { + try { + String user = (String) input.get(0); + Long timestamp = (Long) (input.get(1)); + update(timestamp, user); + } + catch (Exception ex) { + LOG.error("Got an exception, ex: ", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java new file mode 100644 index 0000000..5a06c82 --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.metric.kafka; + +import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig; + +import java.io.Serializable; + +public class KafkaOffsetCheckerConfig implements Serializable { + public static class KafkaConfig implements Serializable{ + public String kafkaEndPoints; + public String topic; + public String site; + public String group; + } + + public static class ServiceConfig implements Serializable{ + public String serviceHost; + public Integer servicePort; + public String username; + public String password; + } + + public ZKStateConfig zkConfig; + public KafkaConfig kafkaConfig; + public ServiceConfig serviceConfig; + + public KafkaOffsetCheckerConfig (ServiceConfig serviceConfig, ZKStateConfig zkConfig, KafkaConfig kafkaConfig) { + this.serviceConfig = serviceConfig; + this.zkConfig = zkConfig; + this.kafkaConfig = kafkaConfig; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java new file mode 100644 index 0000000..c794632 --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric.kafka; + +import backtype.storm.topology.base.BaseRichSpout; +import com.typesafe.config.Config; +import org.apache.eagle.common.config.EagleConfigConstants; +import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaOffsetSourceSpoutProvider { + private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetSourceSpoutProvider.class); + + public BaseRichSpout getSpout(Config config){ + + ZKStateConfig zkStateConfig = new ZKStateConfig(); + zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum"); + zkStateConfig.zkRoot = config.getString("dataSourceConfig.transactionZKRoot"); + zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs"); + zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes"); + zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval"); + + KafkaOffsetCheckerConfig.ServiceConfig serviceConfig = new KafkaOffsetCheckerConfig.ServiceConfig(); + serviceConfig.serviceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); + serviceConfig.servicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); + serviceConfig.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME); + serviceConfig.password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD); + + KafkaOffsetCheckerConfig.KafkaConfig kafkaConfig = new KafkaOffsetCheckerConfig.KafkaConfig(); + kafkaConfig.kafkaEndPoints = config.getString("dataSourceConfig.kafkaEndPoints"); + kafkaConfig.site = config.getString("dataSourceConfig.site"); + kafkaConfig.topic = config.getString("dataSourceConfig.topic"); + kafkaConfig.group = config.getString("dataSourceConfig.hdfsTopologyConsumerGroupId"); + KafkaOffsetCheckerConfig checkerConfig = new KafkaOffsetCheckerConfig(serviceConfig, zkStateConfig, kafkaConfig); + KafkaOffsetSpout spout = new KafkaOffsetSpout(checkerConfig); + return spout; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java new file mode 100644 index 0000000..d6f7298 --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric.kafka; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import com.typesafe.config.Config; +import org.apache.eagle.common.config.EagleConfigConstants; +import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig; +import org.apache.eagle.metric.CountingMetric; +import org.apache.eagle.metric.Metric; +import org.apache.eagle.metric.manager.EagleMetricReportManager; +import org.apache.eagle.metric.report.EagleServiceMetricReport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class KafkaOffsetSpout extends BaseRichSpout { + private static final long serialVersionUID = 1L; + private static final long DEFAULT_ROUND_INTERVALS = 5 * 60 * 1000; + private KafkaOffsetCheckerConfig config; + private KafkaConsumerOffsetFetcher consumerOffsetFetcher; + private KafkaLatestOffsetFetcher latestOffsetFetcher; + private Map<String, String> baseMetricDimension; + private long lastRoundTime = 0; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetSpout.class); + + public KafkaOffsetSpout(KafkaOffsetCheckerConfig config) {//Config config, ZKStateConfig zkStateConfig, String kafkaEndPoints){ + this.config = config; + } + + @SuppressWarnings("rawtypes") + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + consumerOffsetFetcher = new KafkaConsumerOffsetFetcher(config.zkConfig, config.kafkaConfig.topic, config.kafkaConfig.group); + latestOffsetFetcher = new KafkaLatestOffsetFetcher(config.kafkaConfig.kafkaEndPoints); + + this.baseMetricDimension = new HashMap<>(); + this.baseMetricDimension.put("site", config.kafkaConfig.site); + this.baseMetricDimension.put("topic", config.kafkaConfig.topic); + this.baseMetricDimension.put("group", config.kafkaConfig.group); + String eagleServiceHost = config.serviceConfig.serviceHost; + Integer eagleServicePort = config.serviceConfig.servicePort; + String username = config.serviceConfig.serviceHost; + String password = config.serviceConfig.serviceHost; + EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password); + EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report); + } + + public Metric constructMetric(long timestamp, String partition, double value) { + Map<String, String> dimensions = new HashMap<>(); + dimensions.putAll(baseMetricDimension); + dimensions.put("partition", partition); + String metricName = "eagle.kafka.message.consumer.lag"; + Metric metric = new CountingMetric(timestamp, dimensions, metricName, value); + return metric; + } + + @Override + public void nextTuple() { + Long currentTime = System.currentTimeMillis(); + if (currentTime - lastRoundTime > DEFAULT_ROUND_INTERVALS) { + try { + Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch(); + Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(config.kafkaConfig.topic, consumedOffset.size()); + List<Metric> list = new ArrayList<>(); + for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) { + String partition = entry.getKey(); + Integer partitionNumber = Integer.valueOf(partition.split("_")[1]); + Long lag = latestOffset.get(partitionNumber) - entry.getValue(); + list.add(constructMetric(currentTime, partition, lag)); + } + EagleMetricReportManager.getInstance().emit(list); + } catch (Exception ex) { + LOG.error("Got an exception, ex: ", ex); + } + } + try{ + Thread.sleep(10 * 1000); + }catch(Throwable t){ + //Do nothing + } + } + + /** + * empty because framework will take care of output fields declaration + */ + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public void ack(Object msgId) { + } + + @Override + public void fail(Object msgId) { + } + + @Override + public void deactivate() { + + } + + @Override + public void close() { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/resources/application.conf b/eagle-security/eagle-metric-collection/src/main/resources/application.conf new file mode 100644 index 0000000..4b07019 --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/resources/application.conf @@ -0,0 +1,39 @@ +{ + "envContextConfig" : { + "env" : "storm", + "mode" : "local", + "topologyName" : "metricCollectionTopology", + "stormConfigFile" : "security-auditlog-storm.yaml", + "parallelismConfig" : { + "kafkaMsgConsumer" : 1 + } + }, + "dataSourceConfig": { + # For fetch gap + "site" : "sandbox", + "topic" : "sandbox_hdfs_audit_log", + "zkQuorum" : "localhost:2191", + "hdfsTopologyConsumerGroupId" : "eagle.hdfsaudit.consumer", + "zkSessionTimeoutMs" : 15000, + "zkRetryTimes" : 3, + "zkRetryInterval" : 2000, + "zkConnectionTimeoutMS" : 15000, + #"fetchSize" : 1048586, + "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer", + "metricCollectionConsumerId" : "eagle.metric.collection.consumer", + # For kafka spout + #"transactionZKServers" : "localhost", + #"transactionZKPort" : "2181", + "transactionZKRoot" : "/consumers", + #"transactionStateUpdateMS" : 2000, + "kafkaEndPoints" : "localhost:9092" + }, + "eagleProps" : { + "eagleService": { + "host": "localhost", + "port": 38080, + "username": "admin", + "password": "secret" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties b/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties new file mode 100644 index 0000000..8a0919a --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout, DRFA + +eagle.log.dir=./logs +eagle.log.file=eagle.log + + +#log4j.logger.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG +#log4j.logger.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG +#log4j.logger.eagle.executor.AlertExecutor=DEBUG +# standard output +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n + +# Daily Rolling File Appender +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file} +log4j.appender.DRFA.DatePattern=yyyy-MM-dd +## 30-day backup +# log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java new file mode 100644 index 0000000..bfba783 --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java @@ -0,0 +1,2 @@ +public class TestKafkaOffset { +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java index fa712d4..327cb8d 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java @@ -41,7 +41,6 @@ public class HdfsAuditLogProcessorMain { Config config = new ConfigOptionParser().load(args); LOG.info("Config class: " + config.getClass().getCanonicalName()); - if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise())); StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java index 16f77c3..9ddf0b2 100644 --- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java +++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java @@ -21,7 +21,7 @@ import org.apache.eagle.job.JobPartitioner; import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig; import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.ControlConfig; import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.RunningJobEndpointConfig; -import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.ZKStateConfig; +import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig; import org.apache.eagle.jobrunning.storm.JobRunningSpout; import com.typesafe.config.Config; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-security/pom.xml b/eagle-security/pom.xml index 0f45c49..599cff6 100644 --- a/eagle-security/pom.xml +++ b/eagle-security/pom.xml @@ -41,5 +41,6 @@ <module>eagle-security-hdfs-securitylog</module> <module>eagle-security-hbase-securitylog</module> <module>eagle-security-hbase-web</module> + <module>eagle-metric-collection</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-topology-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml index 838c2f8..2132bb3 100644 --- a/eagle-topology-assembly/pom.xml +++ b/eagle-topology-assembly/pom.xml @@ -54,6 +54,11 @@ <artifactId>eagle-security-hbase-securitylog</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>eagle</groupId> + <artifactId>eagle-metric-collection</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build>