[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2018-01-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2480


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2018-01-03 Thread omkreddy
Github user omkreddy commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159413588
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -144,10 +147,37 @@ public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collect
 setCommitMetadata(context);
 
 tupleListener.open(conf, context);
+if (canRegisterMetrics()) registerMetric();
--- End diff --

Updated the PR.


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2018-01-03 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159407800
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -144,10 +147,37 @@ public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collect
 setCommitMetadata(context);
 
 tupleListener.open(conf, context);
+if (canRegisterMetrics()) registerMetric();
--- End diff --

Nit: Use braces.


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-31 Thread omkreddy
Github user omkreddy commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159149753
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+this.offsetManagerSupplier = offsetManagerSupplier;
+this.consumerSupplier = consumerSupplier;
+}
+
+@Override
+public Object getValueAndReset() {
+
+Map offsetManagers = 
offsetManagerSupplier.get();
+KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
+return null;
+}
+
+Map topicMetricsMap = new HashMap<>();
+Set topicPartitions = offsetManagers.keySet();
+
+Map beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
+Map endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
+Map result = new HashMap<>();
--- End diff --

added a comment and added java docs to the classs


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-31 Thread omkreddy
Github user omkreddy commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159149749
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+this.offsetManagerSupplier = offsetManagerSupplier;
+this.consumerSupplier = consumerSupplier;
+}
+
+@Override
+public Object getValueAndReset() {
+
+Map offsetManagers = 
offsetManagerSupplier.get();
+KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
--- End diff --

This can be at info level. updated the PR.


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-30 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159120385
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
--- End diff --

The consumer is replaced when activating/deactivating the spout, so we 
can't just pass the object in here. The alternative is to have a setConsumer 
method on the metric that the spout then has to call. For the offset manager 
map, see https://github.com/apache/storm/pull/2480#discussion_r158958224. 


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159119938
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
--- End diff --

What's the reasoning behind passing Supplier rather than the actual object?


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159119896
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+this.offsetManagerSupplier = offsetManagerSupplier;
+this.consumerSupplier = consumerSupplier;
+}
+
+@Override
+public Object getValueAndReset() {
+
+Map offsetManagers = 
offsetManagerSupplier.get();
+KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
+return null;
+}
+
+Map topicMetricsMap = new HashMap<>();
+Set topicPartitions = offsetManagers.keySet();
+
+Map beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
+Map endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
+Map result = new HashMap<>();
--- End diff --

it would be useful to have a comment saying what is in this result map


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159119878
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+this.offsetManagerSupplier = offsetManagerSupplier;
+this.consumerSupplier = consumerSupplier;
+}
+
+@Override
+public Object getValueAndReset() {
+
+Map offsetManagers = 
offsetManagerSupplier.get();
+KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
--- End diff --

Should this be INFO level? Is this going to print this message periodically?


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159119706
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -739,4 +764,9 @@ public boolean shouldPoll() {
 return !this.pollablePartitions.isEmpty();
 }
 }
+
+@VisibleForTesting
+KafkaOffsetMetric getKafkaOffsetMetric() {
--- End diff --

If we start adding a lot of these test methods we would be better off but 
creating a class in the tests packaged called KafkaSpoutTest that extends 
KafkaSpout and use that one in the tests. All of these methods should go in 
this class. WE don't want this class to be very bloated.


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-29 Thread omkreddy
Github user omkreddy commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159080037
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+this.offsetManagerSupplier = offsetManagerSupplier;
+this.consumerSupplier = consumerSupplier;
+}
+
+@Override
+public Object getValueAndReset() {
+
+Map offsetManagers = 
offsetManagerSupplier.get();
+KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
+return null;
+}
+
+Map topicMetricsMap = new HashMap<>();
+Set topicPartitions = offsetManagers.keySet();
+
+Map beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
+Map endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
+Map result = new HashMap<>();
+
+for (Map.Entry entry : 
offsetManagers.entrySet()) {
+TopicPartition topicPartition = entry.getKey();
+OffsetManager offsetManager = entry.getValue();
+
+long latestTimeOffset = endOffsets.get(topicPartition);
+long earliestTimeOffset = beginningOffsets.get(topicPartition);
+
+long latestEmittedOffset = 
offsetManager.getLatestEmittedOffset();
+long latestCompletedOffset = 
offsetManager.getCommittedOffset();
+long spoutLag = latestTimeOffset - latestCompletedOffset;
+long recordsInAssignedPartitions =  latestTimeOffset - 
earliestTimeOffset;
+
+String metricPath = topicPartition.topic()  + "/partition_" + 
topicPartition.partition();
+result.put(metricPath + "/" + "spoutLag", spoutLag);
+result.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
+result.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
+result.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
+result.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
+result.put(metricPath + "/" + "recordsInAssignedPartitions", 
recordsInAssignedPartitions);
--- End diff --

Updated the PR.


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-29 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159078123
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+this.offsetManagerSupplier = offsetManagerSupplier;
+this.consumerSupplier = consumerSupplier;
+}
+
+@Override
+public Object getValueAndReset() {
+
+Map offsetManagers = 
offsetManagerSupplier.get();
+KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
+return null;
+}
+
+Map topicMetricsMap = new HashMap<>();
+Set topicPartitions = offsetManagers.keySet();
+
+Map beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
+Map endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
+Map result = new HashMap<>();
+
+for (Map.Entry entry : 
offsetManagers.entrySet()) {
+TopicPartition topicPartition = entry.getKey();
+OffsetManager offsetManager = entry.getValue();
+
+long latestTimeOffset = endOffsets.get(topicPartition);
+long earliestTimeOffset = beginningOffsets.get(topicPartition);
+
+long latestEmittedOffset = 
offsetManager.getLatestEmittedOffset();
+long latestCompletedOffset = 
offsetManager.getCommittedOffset();
+long spoutLag = latestTimeOffset - latestCompletedOffset;
+long recordsInAssignedPartitions =  latestTimeOffset - 
earliestTimeOffset;
+
+String metricPath = topicPartition.topic()  + "/partition_" + 
topicPartition.partition();
+result.put(metricPath + "/" + "spoutLag", spoutLag);
+result.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
+result.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
+result.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
+result.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
+result.put(metricPath + "/" + "recordsInAssignedPartitions", 
recordsInAssignedPartitions);
--- End diff --

Nit: Sorry, now that I look at it the name here is a little weird. Maybe 
`recordsInPartition`/`totalRecordsInPartitions`? 


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-29 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159076756
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -31,77 +32,75 @@
 public class KafkaOffsetMetric implements IMetric {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
-private Map offsetManagers;
-private KafkaConsumer kafkaConsumer;
+private Supplier> 
offsetManagerSupplier;
+private Supplier consumerSupplier;
 
-public KafkaOffsetMetric(Map 
offsetManagers) {
-this.offsetManagers = offsetManagers;
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+this.offsetManagerSupplier = offsetManagerSupplier;
+this.consumerSupplier = consumerSupplier;
 }
 
 @Override
 public Object getValueAndReset() {
-try {
-HashMap ret = new HashMap<>();
-if (offsetManagers != null && !offsetManagers.isEmpty() && 
kafkaConsumer != null) {
-Map topicMetricsMap = new HashMap<>();
-Set topicPartitions = 
offsetManagers.keySet();
-
-Map beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
-Map endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
-
-for (Map.Entry entry : 
offsetManagers.entrySet()) {
-TopicPartition topicPartition = entry.getKey();
-OffsetManager offsetManager = entry.getValue();
-
-long latestTimeOffset = endOffsets.get(topicPartition);
-long earliestTimeOffset = 
beginningOffsets.get(topicPartition);
-
-long latestEmittedOffset = 
offsetManager.getLatestEmittedOffset();
-long latestCompletedOffset = 
offsetManager.getCommittedOffset();
-long spoutLag = latestTimeOffset - 
latestCompletedOffset;
-
-String metricPath = topicPartition.topic()  + 
"/partition_" + topicPartition.partition();
-ret.put(metricPath + "/" + "spoutLag", spoutLag);
-ret.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
-ret.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
-ret.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
-ret.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
-
-TopicMetrics topicMetrics = 
topicMetricsMap.get(topicPartition.topic());
-if(topicMetrics == null) {
-topicMetrics = new TopicMetrics();
-topicMetricsMap.put(topicPartition.topic(), 
topicMetrics);
-}
-
-topicMetrics.totalSpoutLag += spoutLag;
-topicMetrics.totalEarliestTimeOffset += 
earliestTimeOffset;
-topicMetrics.totalLatestTimeOffset += latestTimeOffset;
-topicMetrics.totalLatestEmittedOffset += 
latestEmittedOffset;
-topicMetrics.totalLatestCompletedOffset += 
latestCompletedOffset;
-}
-
-for(Map.Entry e : 
topicMetricsMap.entrySet()) {
-String topic = e.getKey();
-TopicMetrics topicMetrics = e.getValue();
-ret.put(topic + "/" + "totalSpoutLag", 
topicMetrics.totalSpoutLag);
-ret.put(topic + "/" + "totalEarliestTimeOffset", 
topicMetrics.totalEarliestTimeOffset);
-ret.put(topic + "/" + "totalLatestTimeOffset", 
topicMetrics.totalLatestTimeOffset);
-ret.put(topic + "/" + "totalLatestEmittedOffset", 
topicMetrics.totalLatestEmittedOffset);
-ret.put(topic + "/" + "totalLatestCompletedOffset", 
topicMetrics.totalLatestCompletedOffset);
-}
-LOG.debug("Metrics Tick: value : {}", ret);
-return ret;
-} else {
-LOG.info("Metrics Tick: Not enough data to calculate spout 
lag.");
+
+Map offsetManagers = 
offsetManagerSupplier.get();
+KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
+return null;
+}
+
+Map topicMetricsMap = new HashMap<>();
+Set topicPartitions = offsetManag

[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-29 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159075727
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -739,4 +764,9 @@ public boolean shouldPoll() {
 return !this.pollablePartitions.isEmpty();
 }
 }
+
+@VisibleForTesting
+KafkaOffsetMetric getKafkaOffsetMetric() {
--- End diff --

Yes, makes sense. Thanks.


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-29 Thread omkreddy
Github user omkreddy commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159075538
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -739,4 +764,9 @@ public boolean shouldPoll() {
 return !this.pollablePartitions.isEmpty();
 }
 }
+
+@VisibleForTesting
+KafkaOffsetMetric getKafkaOffsetMetric() {
--- End diff --

Agreed,  but I feel current approach is simpler. maybe we can evolve when 
we add more metrics.


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-29 Thread omkreddy
Github user omkreddy commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159075477
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -31,77 +32,75 @@
 public class KafkaOffsetMetric implements IMetric {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
-private Map offsetManagers;
-private KafkaConsumer kafkaConsumer;
+private Supplier> 
offsetManagerSupplier;
+private Supplier consumerSupplier;
 
-public KafkaOffsetMetric(Map 
offsetManagers) {
-this.offsetManagers = offsetManagers;
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+this.offsetManagerSupplier = offsetManagerSupplier;
+this.consumerSupplier = consumerSupplier;
 }
 
 @Override
 public Object getValueAndReset() {
-try {
-HashMap ret = new HashMap<>();
-if (offsetManagers != null && !offsetManagers.isEmpty() && 
kafkaConsumer != null) {
-Map topicMetricsMap = new HashMap<>();
-Set topicPartitions = 
offsetManagers.keySet();
-
-Map beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
-Map endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
-
-for (Map.Entry entry : 
offsetManagers.entrySet()) {
-TopicPartition topicPartition = entry.getKey();
-OffsetManager offsetManager = entry.getValue();
-
-long latestTimeOffset = endOffsets.get(topicPartition);
-long earliestTimeOffset = 
beginningOffsets.get(topicPartition);
-
-long latestEmittedOffset = 
offsetManager.getLatestEmittedOffset();
-long latestCompletedOffset = 
offsetManager.getCommittedOffset();
-long spoutLag = latestTimeOffset - 
latestCompletedOffset;
-
-String metricPath = topicPartition.topic()  + 
"/partition_" + topicPartition.partition();
-ret.put(metricPath + "/" + "spoutLag", spoutLag);
-ret.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
-ret.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
-ret.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
-ret.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
-
-TopicMetrics topicMetrics = 
topicMetricsMap.get(topicPartition.topic());
-if(topicMetrics == null) {
-topicMetrics = new TopicMetrics();
-topicMetricsMap.put(topicPartition.topic(), 
topicMetrics);
-}
-
-topicMetrics.totalSpoutLag += spoutLag;
-topicMetrics.totalEarliestTimeOffset += 
earliestTimeOffset;
-topicMetrics.totalLatestTimeOffset += latestTimeOffset;
-topicMetrics.totalLatestEmittedOffset += 
latestEmittedOffset;
-topicMetrics.totalLatestCompletedOffset += 
latestCompletedOffset;
-}
-
-for(Map.Entry e : 
topicMetricsMap.entrySet()) {
-String topic = e.getKey();
-TopicMetrics topicMetrics = e.getValue();
-ret.put(topic + "/" + "totalSpoutLag", 
topicMetrics.totalSpoutLag);
-ret.put(topic + "/" + "totalEarliestTimeOffset", 
topicMetrics.totalEarliestTimeOffset);
-ret.put(topic + "/" + "totalLatestTimeOffset", 
topicMetrics.totalLatestTimeOffset);
-ret.put(topic + "/" + "totalLatestEmittedOffset", 
topicMetrics.totalLatestEmittedOffset);
-ret.put(topic + "/" + "totalLatestCompletedOffset", 
topicMetrics.totalLatestCompletedOffset);
-}
-LOG.debug("Metrics Tick: value : {}", ret);
-return ret;
-} else {
-LOG.info("Metrics Tick: Not enough data to calculate spout 
lag.");
+
+Map offsetManagers = 
offsetManagerSupplier.get();
+KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
+return null;
+}
+
+Map topicMetricsMap = new HashMap<>();
+Set topicPartitions = offsetM

[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-29 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159063226
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
 ---
@@ -343,4 +344,48 @@ public void 
testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception {
 spout.nextTuple();
 verify(collectorMock).emit(anyString(), anyList(), 
any(KafkaSpoutMessageId.class));
 }
+
+@Test
+public void testOffsetMetrics() throws Exception {
+final int messageCount = 10;
+prepareSpout(messageCount);
+
+Map offsetMetric  = (Map) 
spout.getKafkaOffsetMetric().getValueAndReset();
+assertEquals(0, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
+// the offset of the last available message + 1.
+assertEquals(10, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
+assertEquals(0, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
+assertEquals(0, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
+//totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset
+assertEquals(10, 
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
+
+//Emit and Ack all messages
+for(int i = 0; i < messageCount; i++) {
--- End diff --

I think you can use nextTuple_verifyEmitted_ack_resetCollector(int offset) 
from the superclass to do everything in this block.


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-29 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159061893
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -31,77 +32,75 @@
 public class KafkaOffsetMetric implements IMetric {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
-private Map offsetManagers;
-private KafkaConsumer kafkaConsumer;
+private Supplier> 
offsetManagerSupplier;
--- End diff --

Nit: These can be final now


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-29 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159062211
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -31,77 +32,75 @@
 public class KafkaOffsetMetric implements IMetric {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
-private Map offsetManagers;
-private KafkaConsumer kafkaConsumer;
+private Supplier> 
offsetManagerSupplier;
+private Supplier consumerSupplier;
 
-public KafkaOffsetMetric(Map 
offsetManagers) {
-this.offsetManagers = offsetManagers;
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+this.offsetManagerSupplier = offsetManagerSupplier;
+this.consumerSupplier = consumerSupplier;
 }
 
 @Override
 public Object getValueAndReset() {
-try {
-HashMap ret = new HashMap<>();
-if (offsetManagers != null && !offsetManagers.isEmpty() && 
kafkaConsumer != null) {
-Map topicMetricsMap = new HashMap<>();
-Set topicPartitions = 
offsetManagers.keySet();
-
-Map beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
-Map endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
-
-for (Map.Entry entry : 
offsetManagers.entrySet()) {
-TopicPartition topicPartition = entry.getKey();
-OffsetManager offsetManager = entry.getValue();
-
-long latestTimeOffset = endOffsets.get(topicPartition);
-long earliestTimeOffset = 
beginningOffsets.get(topicPartition);
-
-long latestEmittedOffset = 
offsetManager.getLatestEmittedOffset();
-long latestCompletedOffset = 
offsetManager.getCommittedOffset();
-long spoutLag = latestTimeOffset - 
latestCompletedOffset;
-
-String metricPath = topicPartition.topic()  + 
"/partition_" + topicPartition.partition();
-ret.put(metricPath + "/" + "spoutLag", spoutLag);
-ret.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
-ret.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
-ret.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
-ret.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
-
-TopicMetrics topicMetrics = 
topicMetricsMap.get(topicPartition.topic());
-if(topicMetrics == null) {
-topicMetrics = new TopicMetrics();
-topicMetricsMap.put(topicPartition.topic(), 
topicMetrics);
-}
-
-topicMetrics.totalSpoutLag += spoutLag;
-topicMetrics.totalEarliestTimeOffset += 
earliestTimeOffset;
-topicMetrics.totalLatestTimeOffset += latestTimeOffset;
-topicMetrics.totalLatestEmittedOffset += 
latestEmittedOffset;
-topicMetrics.totalLatestCompletedOffset += 
latestCompletedOffset;
-}
-
-for(Map.Entry e : 
topicMetricsMap.entrySet()) {
-String topic = e.getKey();
-TopicMetrics topicMetrics = e.getValue();
-ret.put(topic + "/" + "totalSpoutLag", 
topicMetrics.totalSpoutLag);
-ret.put(topic + "/" + "totalEarliestTimeOffset", 
topicMetrics.totalEarliestTimeOffset);
-ret.put(topic + "/" + "totalLatestTimeOffset", 
topicMetrics.totalLatestTimeOffset);
-ret.put(topic + "/" + "totalLatestEmittedOffset", 
topicMetrics.totalLatestEmittedOffset);
-ret.put(topic + "/" + "totalLatestCompletedOffset", 
topicMetrics.totalLatestCompletedOffset);
-}
-LOG.debug("Metrics Tick: value : {}", ret);
-return ret;
-} else {
-LOG.info("Metrics Tick: Not enough data to calculate spout 
lag.");
+
+Map offsetManagers = 
offsetManagerSupplier.get();
+KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
+return null;
+}
+
+Map topicMetricsMap = new HashMap<>();
+Set topicPartitions = offsetManag

[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-29 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159063661
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -739,4 +764,9 @@ public boolean shouldPoll() {
 return !this.pollablePartitions.isEmpty();
 }
 }
+
+@VisibleForTesting
+KafkaOffsetMetric getKafkaOffsetMetric() {
--- End diff --

Optional: We could avoid this getter by using the same trick with a factory 
as we use for injecting the KafkaConsumer. If we add a KafkaOffsetMetricFactory 
to the spout constructor, you can create the metric in the test and pass it 
into the spout instead of letting the spout construct the metric.


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-29 Thread omkreddy
Github user omkreddy commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159056636
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
--- End diff --

Added a simple testcase


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-28 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r158960041
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
--- End diff --

Could we add a test or two for this class? Maybe a test in 
KafkaSpoutSingleTopicTest that starts up a spout and checks that it can get 
metrics for the subscribed partition?


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-28 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r158959415
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private Map offsetManagers;
+private KafkaConsumer kafkaConsumer;
+
+public KafkaOffsetMetric(Map 
offsetManagers) {
+this.offsetManagers = offsetManagers;
+}
+
+@Override
+public Object getValueAndReset() {
+try {
+HashMap ret = new HashMap<>();
+if (offsetManagers != null && !offsetManagers.isEmpty() && 
kafkaConsumer != null) {
+Map topicMetricsMap = new HashMap<>();
+Set topicPartitions = 
offsetManagers.keySet();
+
+Map beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
+Map endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
+
+for (Map.Entry entry : 
offsetManagers.entrySet()) {
+TopicPartition topicPartition = entry.getKey();
+OffsetManager offsetManager = entry.getValue();
+
+long latestTimeOffset = endOffsets.get(topicPartition);
+long earliestTimeOffset = 
beginningOffsets.get(topicPartition);
+
+long latestEmittedOffset = 
offsetManager.getLatestEmittedOffset();
+long latestCompletedOffset = 
offsetManager.getCommittedOffset();
+long spoutLag = latestTimeOffset - 
latestCompletedOffset;
+
+String metricPath = topicPartition.topic()  + 
"/partition_" + topicPartition.partition();
+ret.put(metricPath + "/" + "spoutLag", spoutLag);
+ret.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
+ret.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
+ret.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
+ret.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
+
+TopicMetrics topicMetrics = 
topicMetricsMap.get(topicPartition.topic());
+if(topicMetrics == null) {
+topicMetrics = new TopicMetrics();
+topicMetricsMap.put(topicPartition.topic(), 
topicMetrics);
+}
+
+topicMetrics.totalSpoutLag += spoutLag;
+topicMetrics.totalEarliestTimeOffset += 
earliestTimeOffset;
+topicMetrics.totalLatestTimeOffset += latestTimeOffset;
+topicMetrics.totalLatestEmittedOffset += 
latestEmittedOffset;
+topicMetrics.totalLatestCompletedOffset += 
latestCompletedOffset;
+}
+
+for(Map.Entry e : 
topicMetricsMap.entrySet()) {
+String topic = e.getKey();
+TopicMetrics topicMetrics = e.getValue();
+ret.put(topic + "/" + "totalSpoutLag", 
topicMetrics.totalSpoutLag);
+ret.put(topic + "/" + "totalEarliestTimeOffset", 
topicMetrics.totalEarliestTimeOffset);
+ret.put(t

[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-28 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r158958894
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private Map offsetManagers;
+private KafkaConsumer kafkaConsumer;
+
+public KafkaOffsetMetric(Map 
offsetManagers) {
+this.offsetManagers = offsetManagers;
+}
+
+@Override
+public Object getValueAndReset() {
+try {
+HashMap ret = new HashMap<>();
+if (offsetManagers != null && !offsetManagers.isEmpty() && 
kafkaConsumer != null) {
+Map topicMetricsMap = new HashMap<>();
+Set topicPartitions = 
offsetManagers.keySet();
+
+Map beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
+Map endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
+
+for (Map.Entry entry : 
offsetManagers.entrySet()) {
+TopicPartition topicPartition = entry.getKey();
+OffsetManager offsetManager = entry.getValue();
+
+long latestTimeOffset = endOffsets.get(topicPartition);
+long earliestTimeOffset = 
beginningOffsets.get(topicPartition);
+
+long latestEmittedOffset = 
offsetManager.getLatestEmittedOffset();
+long latestCompletedOffset = 
offsetManager.getCommittedOffset();
+long spoutLag = latestTimeOffset - 
latestCompletedOffset;
+
+String metricPath = topicPartition.topic()  + 
"/partition_" + topicPartition.partition();
+ret.put(metricPath + "/" + "spoutLag", spoutLag);
+ret.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
+ret.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
+ret.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
+ret.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
+
+TopicMetrics topicMetrics = 
topicMetricsMap.get(topicPartition.topic());
+if(topicMetrics == null) {
+topicMetrics = new TopicMetrics();
+topicMetricsMap.put(topicPartition.topic(), 
topicMetrics);
+}
+
+topicMetrics.totalSpoutLag += spoutLag;
+topicMetrics.totalEarliestTimeOffset += 
earliestTimeOffset;
+topicMetrics.totalLatestTimeOffset += latestTimeOffset;
+topicMetrics.totalLatestEmittedOffset += 
latestEmittedOffset;
+topicMetrics.totalLatestCompletedOffset += 
latestCompletedOffset;
+}
+
+for(Map.Entry e : 
topicMetricsMap.entrySet()) {
+String topic = e.getKey();
+TopicMetrics topicMetrics = e.getValue();
+ret.put(topic + "/" + "totalSpoutLag", 
topicMetrics.totalSpoutLag);
+ret.put(topic + "/" + "totalEarliestTimeOffset", 
topicMetrics.totalEarliestTimeOffset);
+ret.put(t

[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-28 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r158958224
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private Map offsetManagers;
--- End diff --

nit: It might be better to use a getter or Supplier to access the field in 
KafkaSpout, rather than copying the Map reference in here. If someone does e.g. 
`offsetManagers = new HashMap<>()` in the spout, this reference ends up 
pointing to the wrong thing.


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-27 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r158899496
  
--- Diff: pom.xml ---
@@ -278,7 +278,7 @@
 kafka_2.10
 
 
-0.10.0.0
+0.11.0.2
--- End diff --

Actually we are "effectively" dropping support of Kafka 0.9.x while moving 
from Storm 1.0.x to 1.1.0, because critical bugs in storm-kafka-client are not 
ported back to 1.0.x version line. I think we should try to avoid doing it 
again, but not sure for the cases like this. (bump the version for utilizing 
new feature)

Btw, if Kafka doesn't always guarantee forward compatibility, we may need 
to define support versions of Kafka (and others, if we could maintain) and 
maintain that.


---