[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2538


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165859312
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
--- End diff --

I'm happy to declare the class final, hopefully that'll discourage people 
from using the class directly if the internal package hint doesn't do the trick.

Regarding creating objects in the constructor, I agree with you in the 
general case (DI is great), but for pure data objects like CommitMetadata I'm 
not sure I understand the harm? 


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165859143
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
--- End diff --

It's my impression that doing something like that (declaring one global 
ObjectMapper) would be fine, except if we needed different configurations from 
the ObjectMappers. https://stackoverflow.com/a/3909846/8845188 (The responder 
is one of the Jackson devs). 


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165858606
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
--- End diff --

Right, I misunderstood. I thought you were talking about the comment in L38 
(i.e. you wanted it changed to "... It is unique per CommitMetadataManager"). 
Will fix


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857516
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
--- End diff --

The only question mark is performance. I am OK with it staying like this. I 
just wanted to bring it up. If not for performance, in the extreme case, there 
would be no harm in creating one ObjectMapper in an Utility class and use it in 
the entire codebase.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857829
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +460,37 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setKafkaPropsForProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalStateException("The KafkaConsumer " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
++ " setting is not supported. You can configure similar 
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
 }
-if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-} else {
-String autoOffsetResetPolicy = 
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
-if (autoOffsetResetPolicy == null) {
-/*
-If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
-for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
-error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer 
-requests an offset that was deleted.
- */
-
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
-LOG.warn("Cannot guarantee at-least-once processing 
with auto.offset.reset.policy other than 'earliest' or 'none'."
-+ " Some messages may be skipped.");
-}
-} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
-if (autoOffsetResetPolicy != null
-&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
-LOG.warn("Cannot guarantee at-most-once processing 
with auto.offset.reset.policy other than 'latest' or 'none'."
-+ " Some messages may be processed more than 
once.");
-}
+String autoOffsetResetPolicy = (String) 
builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
+if (autoOffsetResetPolicy == null) {
+/*
+ * If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
+ * for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
+ * error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer
+ * requests an offset that was deleted.
+ */
+LOG.info("Setting consumer property '{}' to 'earliest' to 
ensure at-least-once processing",
+ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
+LOG.warn("Cannot guarantee at-least-once processing with 
auto.offset.reset.policy other than 'earliest' or 'none'."
++ " Some messages may be skipped.");
+}
+} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
+if (autoOffsetResetPolicy != null
+&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
+LOG.warn("Cannot guarantee at-most-once processing with 
auto.offset.reset.policy other than 'latest' or 'none'."
++ " Some messages may be processed more than once.");
   

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857620
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
--- End diff --

It's bad practice to create objects in the constructor. I simply don't do 
it. However, it's done in a lot of places in Storm already, so I am OK if you 
want to leave it as is. 

Another example why this could potentially be bad is if someone wants do 
subclass this class. If we leave it like this, perhaps the class should be 
final then.

These are just some suggestions. You can  leave it as is or go with either 
of of the suggestions.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857949
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
--- End diff --

I disagree. The name of the logger instance should be the class that is 
logging the message or one of its super classes. There is no class called 
CommitMetadata in the codebase, so why should we have a logger called 
CommitMetadata?


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857479
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -142,7 +139,7 @@ public void open(Map conf, 
TopologyContext context, SpoutOutputC
 offsetManagers = new HashMap<>();
 emitted = new HashSet<>();
 waitingToEmit = new HashMap<>();
-setCommitMetadata(context);
+commitMetadataManager = new CommitMetadataManager(context, 
kafkaSpoutConfig.getProcessingGuarantee());
--- End diff --

ok


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857745
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
--- End diff --

ok


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857816
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +460,37 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setKafkaPropsForProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalStateException("The KafkaConsumer " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
++ " setting is not supported. You can configure similar 
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
 }
-if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-} else {
-String autoOffsetResetPolicy = 
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
-if (autoOffsetResetPolicy == null) {
-/*
-If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
-for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
-error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer 
-requests an offset that was deleted.
- */
-
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
-LOG.warn("Cannot guarantee at-least-once processing 
with auto.offset.reset.policy other than 'earliest' or 'none'."
-+ " Some messages may be skipped.");
-}
-} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
-if (autoOffsetResetPolicy != null
-&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
-LOG.warn("Cannot guarantee at-most-once processing 
with auto.offset.reset.policy other than 'latest' or 'none'."
-+ " Some messages may be processed more than 
once.");
-}
+String autoOffsetResetPolicy = (String) 
builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
+if (autoOffsetResetPolicy == null) {
+/*
+ * If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
+ * for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
+ * error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer
+ * requests an offset that was deleted.
+ */
+LOG.info("Setting consumer property '{}' to 'earliest' to 
ensure at-least-once processing",
--- End diff --

NIT: I would write Kafka consumer, but not a deal breaker.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857469
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -75,8 +75,9 @@ public boolean 
isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetad
 final CommitMetadata committedMetadata = 
JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
 return 
committedMetadata.getTopologyId().equals(context.getStormId());
 } catch (IOException e) {
-LOG.warn("Failed to deserialize [{}]. Error likely occurred 
because the last commit "
-+ "for this topic-partition was done using an earlier 
version of Storm. "
+LOG.warn("Failed to deserialize expected commit metadata [{}]."
++ " This error is expected to occur once per partition, if 
the last commit to each partition"
++ " was by an earlier version of the KafkaSpout, or by 
something other than the KafkaSpout. "
--- End diff --

was done by an earlier version ... or by a process other than the KafkaSpout




---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854760
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
+context.getStormId(), context.getThisTaskId(), 
Thread.currentThread().getName()));
+this.processingGuarantee = processingGuarantee;
+} catch (JsonProcessingException e) {
+LOG.error("Failed to create Kafka commit metadata due to JSON 
serialization error", e);
+throw new RuntimeException(e);
+}
+}
+
+/**
+ * Checks if {@link OffsetAndMetadata} was committed by a {@link 
KafkaSpout} instance in this topology.
+ *
+ * @param tp The topic partition the commit metadata belongs to.
+ * @param committedOffset {@link OffsetAndMetadata} info committed to 
Kafka
+ * @param offsetManagers The offset managers.
+ * @return true if this topology committed this {@link 
OffsetAndMetadata}, false otherwise
+ */
+public boolean isOffsetCommittedByThisTopology(TopicPartition tp, 
OffsetAndMetadata committedOffset,
+Map offsetManagers) {
+try {
+if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+&& offsetManagers.containsKey(tp)
+&& offsetManagers.get(tp).hasCommitted()) {
+return true;
+}
+
+final CommitMetadata committedMetadata = 
JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+return 
committedMetadata.getTopologyId().equals(context.getStormId());
+} catch (IOException e) {
+LOG.warn("Failed to deserialize [{}]. Error likely occurred 
because the last commit "
--- End diff --

Tried to update this so it's a little more clear about when it will be 
printed


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854507
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord 
record) {
 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
 } else {
 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
-if (committedOffset != null && 
isOffsetCommittedByThisTopology(tp, committedOffset)
+if (isAtLeastOnceProcessing()
+&& committedOffset != null 
+&& 
commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, 
offsetManagers)
--- End diff --

Good point


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854506
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords 
consumerRecords) {
 numPolledRecords);
 if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
 //Commit polled records immediately to ensure delivery is 
at-most-once.
-kafkaConsumer.commitSync();
+Map offsetsToCommit = 
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+kafkaConsumer.commitSync(offsetsToCommit);
+LOG.debug("Committed offsets {} to Kafka", 
offsetsToCommit);
--- End diff --

I don't mind adding it, but is this information useful to the user?


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854458
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
--- End diff --

I would agree if CommitMetadata weren't a POJO. It doesn't have any 
behavior, why do we need to stub it?


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854399
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
+context.getStormId(), context.getThisTaskId(), 
Thread.currentThread().getName()));
+this.processingGuarantee = processingGuarantee;
+} catch (JsonProcessingException e) {
+LOG.error("Failed to create Kafka commit metadata due to JSON 
serialization error", e);
+throw new RuntimeException(e);
+}
+}
+
+/**
+ * Checks if {@link OffsetAndMetadata} was committed by a {@link 
KafkaSpout} instance in this topology.
+ *
+ * @param tp The topic partition the commit metadata belongs to.
+ * @param committedOffset {@link OffsetAndMetadata} info committed to 
Kafka
+ * @param offsetManagers The offset managers.
+ * @return true if this topology committed this {@link 
OffsetAndMetadata}, false otherwise
+ */
+public boolean isOffsetCommittedByThisTopology(TopicPartition tp, 
OffsetAndMetadata committedOffset,
+Map offsetManagers) {
+try {
+if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+&& offsetManagers.containsKey(tp)
+&& offsetManagers.get(tp).hasCommitted()) {
+return true;
+}
+
+final CommitMetadata committedMetadata = 
JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+return 
committedMetadata.getTopologyId().equals(context.getStormId());
+} catch (IOException e) {
+LOG.warn("Failed to deserialize [{}]. Error likely occurred 
because the last commit "
--- End diff --

Yes, I'll update this message


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854164
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
--- End diff --

No, the metadata is the same if you create two CommitMetadataManagers from 
the same spout instance. 


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854105
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
--- End diff --

Objectmappers are thread safe. I don't see anywhere in the documentation 
that mentions whether there are internal locks being used in the ObjectMapper, 
but this post suggests there aren't 
https://stackoverflow.com/questions/18611565/how-do-i-correctly-reuse-jackson-objectmapper#comment27462917_18618918.
 A quick google also suggests that it should be fine to use ObjectMapper as a 
singleton.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853833
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
--- End diff --

It's in the internal package, so if we make it package private the spout 
can't see it. I think it's up to users to figure out that if the class is in a 
package called "internal", they probably shouldn't use it. Once we start 
looking at Java 9 modularization we can make sure not to export this.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853784
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -142,7 +139,7 @@ public void open(Map conf, 
TopologyContext context, SpoutOutputC
 offsetManagers = new HashMap<>();
 emitted = new HashSet<>();
 waitingToEmit = new HashMap<>();
-setCommitMetadata(context);
+commitMetadataManager = new CommitMetadataManager(context, 
kafkaSpoutConfig.getProcessingGuarantee());
--- End diff --

I think we should wait. Let's do it if someone needs it, but introducing 
more extension points than we need is likely to cause us more headaches down 
the road, because once the extension point is public it's harder for us to 
change if we need to because we have to consider that other people may be 
implementing the interface.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853798
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords 
consumerRecords) {
 numPolledRecords);
 if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
 //Commit polled records immediately to ensure delivery is 
at-most-once.
-kafkaConsumer.commitSync();
+Map offsetsToCommit = 
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+kafkaConsumer.commitSync(offsetsToCommit);
--- End diff --

Will rename


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853341
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
--- End diff --

All the methods and constructors in this class should be package protected


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852737
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord 
record) {
 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
 } else {
 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
-if (committedOffset != null && 
isOffsetCommittedByThisTopology(tp, committedOffset)
+if (isAtLeastOnceProcessing()
+&& committedOffset != null 
+&& 
commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, 
offsetManagers)
 && committedOffset.offset() > record.offset()) {
 // Ensures that after a topology with this id is started, 
the consumer fetch
 // position never falls behind the committed offset 
(STORM-2844)
-throw new IllegalStateException("Attempting to emit a 
message that has already been committed.");
+throw new IllegalStateException("Attempting to emit a 
message that has already been committed."
++ " This should never occur in at-least-once mode.");
--- End diff --

for at-least-once semantics.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852835
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
--- End diff --

Do we want one ObjectMapper for all the KafkaSpout instances (executors), 
or one per executor? This will share it across all the instances. Perhaps we 
should have one per instance.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852864
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
--- End diff --

Do you mean CommitMetadataManager.class ?


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853160
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
+context.getStormId(), context.getThisTaskId(), 
Thread.currentThread().getName()));
+this.processingGuarantee = processingGuarantee;
+} catch (JsonProcessingException e) {
+LOG.error("Failed to create Kafka commit metadata due to JSON 
serialization error", e);
+throw new RuntimeException(e);
+}
+}
+
+/**
+ * Checks if {@link OffsetAndMetadata} was committed by a {@link 
KafkaSpout} instance in this topology.
+ *
+ * @param tp The topic partition the commit metadata belongs to.
+ * @param committedOffset {@link OffsetAndMetadata} info committed to 
Kafka
+ * @param offsetManagers The offset managers.
+ * @return true if this topology committed this {@link 
OffsetAndMetadata}, false otherwise
+ */
+public boolean isOffsetCommittedByThisTopology(TopicPartition tp, 
OffsetAndMetadata committedOffset,
+Map offsetManagers) {
+try {
+if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+&& offsetManagers.containsKey(tp)
+&& offsetManagers.get(tp).hasCommitted()) {
+return true;
+}
+
+final CommitMetadata committedMetadata = 
JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+return 
committedMetadata.getTopologyId().equals(context.getStormId());
+} catch (IOException e) {
+LOG.warn("Failed to deserialize [{}]. Error likely occurred 
because the last commit "
--- End diff --

We should either write in the README or as part of this message that this 
WARN is expected the first time a user starts this or an earlier version of the 
spout with commits to Kafka done by an older version of the spout.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852696
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords 
consumerRecords) {
 numPolledRecords);
 if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
 //Commit polled records immediately to ensure delivery is 
at-most-once.
-kafkaConsumer.commitSync();
+Map offsetsToCommit = 
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+kafkaConsumer.commitSync(offsetsToCommit);
+LOG.debug("Committed offsets {} to Kafka", 
offsetsToCommit);
--- End diff --

Committed offsets {} synchronously to Kafka


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853240
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -311,7 +273,10 @@ public void nextTuple() {
 if (isAtLeastOnceProcessing()) {
 
commitOffsetsForAckedTuples(kafkaConsumer.assignment());
 } else if (kafkaSpoutConfig.getProcessingGuarantee() == 
ProcessingGuarantee.NONE) {
-commitConsumedOffsets(kafkaConsumer.assignment());
+Map offsetsToCommit 
= 
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+kafkaConsumer.commitAsync(offsetsToCommit, null);
--- End diff --

createFetchedOffsetsMetadata


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852272
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -142,7 +139,7 @@ public void open(Map conf, 
TopologyContext context, SpoutOutputC
 offsetManagers = new HashMap<>();
 emitted = new HashSet<>();
 waitingToEmit = new HashMap<>();
-setCommitMetadata(context);
+commitMetadataManager = new CommitMetadataManager(context, 
kafkaSpoutConfig.getProcessingGuarantee());
--- End diff --

I wonder if this should become available to the KakfaSpout through 
KafkaSpoutConfig, perhaps using a factory such that we could make it pluggable, 
in case there is need to support a different behavior in the future.

We can also wait to do that until we need it. Just wanted to get your 
thoughts on it.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852670
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords 
consumerRecords) {
 numPolledRecords);
 if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
 //Commit polled records immediately to ensure delivery is 
at-most-once.
-kafkaConsumer.commitSync();
+Map offsetsToCommit = 
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+kafkaConsumer.commitSync(offsetsToCommit);
--- End diff --

createFetchedOffsetsMetadata


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853046
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord 
record) {
 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
 } else {
 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
-if (committedOffset != null && 
isOffsetCommittedByThisTopology(tp, committedOffset)
+if (isAtLeastOnceProcessing()
+&& committedOffset != null 
+&& 
commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, 
offsetManagers)
--- End diff --

Collections.unmodifiableMap(offsetManagers)


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852989
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
--- End diff --

Ideally commitMetadata would be passed in the constructor to facilitate 
unit testing. We could have a factory method in this class itself with this code

```java
public CommitMetadataManager(TopologyContext context, ProcessingGuarantee 
processingGuarantee, String commitMetadata)
```

```java
public static CommitMetadataManager newInstance(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
return new CommitMetadataManager(context, processingGuarantee, 
JSON_MAPPER.writeValueAsString(new CommitMetadata(context.getStormId(), 
context.getThisTaskId(), Thread.currentThread().getName(;
}
```

handling the JsonProcessingException in the factory method


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-01-27 Thread srdo
GitHub user srdo opened a pull request:

https://github.com/apache/storm/pull/2538

 STORM-2913: Add metadata to at-most-once and at-least-once commits 

https://issues.apache.org/jira/browse/STORM-2913

This builds on STORM-2914.

I believe we can resolve STORM-2913 by committing metadata in all 
processing guarantee modes, rather than just AT_LEAST_ONCE. This change simply 
adds metadata to the AT_MOST_ONCE and NONE commit statements.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srdo/storm STORM-2913

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2538.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2538


commit 3e09a1d718f8d40424e7ac5d574efe7a74706cf8
Author: Stig Rohde Døssing 
Date:   2018-01-27T18:22:07Z

STORM-2914: Implement ProcessingGuarantee.NONE in the spout instead of 
using enable.auto.commit

commit 99c7a30bf38c523a1f97411e7a42dbb44017f9f0
Author: Stig Rohde Døssing 
Date:   2018-01-27T14:15:45Z

STORM-2913: Add metadata to at-most-once and at-least-once commits




---