[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2538


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2537


---


[GitHub] storm issue #2537: STORM-2914: Implement ProcessingGuarantee.NONE in the spo...

2018-02-04 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2537
  
@srdo 
I've found some divergence between master and 1.x branch. Could you raise 
pull request on 1.x-branch as well? I'm OK to raise a PR with two commits 
(STORM-2914/STORM-2913).


---


[GitHub] storm issue #2538: STORM-2913: Add metadata to at-most-once and at-least-onc...

2018-02-04 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2538
  
@srdo 
Thanks for the great work. I'd rather not review the two PRs since @hmcl 
already gave a nice review. If you would like to merge twos by yourself, please 
replace storm-kafka-client 1.1.x/1.0.x with 1.x afterwards. If you would rather 
let me do it please let me know. Thanks in advance!


---


[GitHub] storm issue #2538: STORM-2913: Add metadata to at-most-once and at-least-onc...

2018-02-04 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2538
  
+1. 

Once squashed is good to merge as far as I am concerned. Thanks a lot @srdo.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165859312
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
--- End diff --

I'm happy to declare the class final, hopefully that'll discourage people 
from using the class directly if the internal package hint doesn't do the trick.

Regarding creating objects in the constructor, I agree with you in the 
general case (DI is great), but for pure data objects like CommitMetadata I'm 
not sure I understand the harm? 


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165859143
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
--- End diff --

It's my impression that doing something like that (declaring one global 
ObjectMapper) would be fine, except if we needed different configurations from 
the ObjectMappers. https://stackoverflow.com/a/3909846/8845188 (The responder 
is one of the Jackson devs). 


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165858606
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
--- End diff --

Right, I misunderstood. I thought you were talking about the comment in L38 
(i.e. you wanted it changed to "... It is unique per CommitMetadataManager"). 
Will fix


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857516
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
--- End diff --

The only question mark is performance. I am OK with it staying like this. I 
just wanted to bring it up. If not for performance, in the extreme case, there 
would be no harm in creating one ObjectMapper in an Utility class and use it in 
the entire codebase.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857829
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +460,37 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setKafkaPropsForProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalStateException("The KafkaConsumer " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
++ " setting is not supported. You can configure similar 
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
 }
-if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-} else {
-String autoOffsetResetPolicy = 
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
-if (autoOffsetResetPolicy == null) {
-/*
-If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
-for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
-error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer 
-requests an offset that was deleted.
- */
-
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
-LOG.warn("Cannot guarantee at-least-once processing 
with auto.offset.reset.policy other than 'earliest' or 'none'."
-+ " Some messages may be skipped.");
-}
-} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
-if (autoOffsetResetPolicy != null
-&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
-LOG.warn("Cannot guarantee at-most-once processing 
with auto.offset.reset.policy other than 'latest' or 'none'."
-+ " Some messages may be processed more than 
once.");
-}
+String autoOffsetResetPolicy = (String) 
builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
+if (autoOffsetResetPolicy == null) {
+/*
+ * If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
+ * for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
+ * error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer
+ * requests an offset that was deleted.
+ */
+LOG.info("Setting consumer property '{}' to 'earliest' to 
ensure at-least-once processing",
+ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
+LOG.warn("Cannot guarantee at-least-once processing with 
auto.offset.reset.policy other than 'earliest' or 'none'."
++ " Some messages may be skipped.");
+}
+} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
+if (autoOffsetResetPolicy != null
+&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
+LOG.warn("Cannot guarantee at-most-once processing with 
auto.offset.reset.policy other than 'latest' or 'none'."
++ " Some messages may be processed more than once.");
   

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857620
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
--- End diff --

It's bad practice to create objects in the constructor. I simply don't do 
it. However, it's done in a lot of places in Storm already, so I am OK if you 
want to leave it as is. 

Another example why this could potentially be bad is if someone wants do 
subclass this class. If we leave it like this, perhaps the class should be 
final then.

These are just some suggestions. You can  leave it as is or go with either 
of of the suggestions.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857949
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
--- End diff --

I disagree. The name of the logger instance should be the class that is 
logging the message or one of its super classes. There is no class called 
CommitMetadata in the codebase, so why should we have a logger called 
CommitMetadata?


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857479
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -142,7 +139,7 @@ public void open(Map conf, 
TopologyContext context, SpoutOutputC
 offsetManagers = new HashMap<>();
 emitted = new HashSet<>();
 waitingToEmit = new HashMap<>();
-setCommitMetadata(context);
+commitMetadataManager = new CommitMetadataManager(context, 
kafkaSpoutConfig.getProcessingGuarantee());
--- End diff --

ok


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857745
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
--- End diff --

ok


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857816
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +460,37 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setKafkaPropsForProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalStateException("The KafkaConsumer " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
++ " setting is not supported. You can configure similar 
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
 }
-if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-} else {
-String autoOffsetResetPolicy = 
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
-if (autoOffsetResetPolicy == null) {
-/*
-If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
-for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
-error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer 
-requests an offset that was deleted.
- */
-
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
-LOG.warn("Cannot guarantee at-least-once processing 
with auto.offset.reset.policy other than 'earliest' or 'none'."
-+ " Some messages may be skipped.");
-}
-} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
-if (autoOffsetResetPolicy != null
-&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
-LOG.warn("Cannot guarantee at-most-once processing 
with auto.offset.reset.policy other than 'latest' or 'none'."
-+ " Some messages may be processed more than 
once.");
-}
+String autoOffsetResetPolicy = (String) 
builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
+if (autoOffsetResetPolicy == null) {
+/*
+ * If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
+ * for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
+ * error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer
+ * requests an offset that was deleted.
+ */
+LOG.info("Setting consumer property '{}' to 'earliest' to 
ensure at-least-once processing",
--- End diff --

NIT: I would write Kafka consumer, but not a deal breaker.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857469
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -75,8 +75,9 @@ public boolean 
isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetad
 final CommitMetadata committedMetadata = 
JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
 return 
committedMetadata.getTopologyId().equals(context.getStormId());
 } catch (IOException e) {
-LOG.warn("Failed to deserialize [{}]. Error likely occurred 
because the last commit "
-+ "for this topic-partition was done using an earlier 
version of Storm. "
+LOG.warn("Failed to deserialize expected commit metadata [{}]."
++ " This error is expected to occur once per partition, if 
the last commit to each partition"
++ " was by an earlier version of the KafkaSpout, or by 
something other than the KafkaSpout. "
--- End diff --

was done by an earlier version ... or by a process other than the KafkaSpout




---


[GitHub] storm issue #2538: STORM-2913: Add metadata to at-most-once and at-least-onc...

2018-02-04 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2538
  
@hmcl Rebased.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854760
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
+context.getStormId(), context.getThisTaskId(), 
Thread.currentThread().getName()));
+this.processingGuarantee = processingGuarantee;
+} catch (JsonProcessingException e) {
+LOG.error("Failed to create Kafka commit metadata due to JSON 
serialization error", e);
+throw new RuntimeException(e);
+}
+}
+
+/**
+ * Checks if {@link OffsetAndMetadata} was committed by a {@link 
KafkaSpout} instance in this topology.
+ *
+ * @param tp The topic partition the commit metadata belongs to.
+ * @param committedOffset {@link OffsetAndMetadata} info committed to 
Kafka
+ * @param offsetManagers The offset managers.
+ * @return true if this topology committed this {@link 
OffsetAndMetadata}, false otherwise
+ */
+public boolean isOffsetCommittedByThisTopology(TopicPartition tp, 
OffsetAndMetadata committedOffset,
+Map offsetManagers) {
+try {
+if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+&& offsetManagers.containsKey(tp)
+&& offsetManagers.get(tp).hasCommitted()) {
+return true;
+}
+
+final CommitMetadata committedMetadata = 
JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+return 
committedMetadata.getTopologyId().equals(context.getStormId());
+} catch (IOException e) {
+LOG.warn("Failed to deserialize [{}]. Error likely occurred 
because the last commit "
--- End diff --

Tried to update this so it's a little more clear about when it will be 
printed


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854507
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord 
record) {
 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
 } else {
 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
-if (committedOffset != null && 
isOffsetCommittedByThisTopology(tp, committedOffset)
+if (isAtLeastOnceProcessing()
+&& committedOffset != null 
+&& 
commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, 
offsetManagers)
--- End diff --

Good point


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854506
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords 
consumerRecords) {
 numPolledRecords);
 if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
 //Commit polled records immediately to ensure delivery is 
at-most-once.
-kafkaConsumer.commitSync();
+Map offsetsToCommit = 
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+kafkaConsumer.commitSync(offsetsToCommit);
+LOG.debug("Committed offsets {} to Kafka", 
offsetsToCommit);
--- End diff --

I don't mind adding it, but is this information useful to the user?


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854458
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
--- End diff --

I would agree if CommitMetadata weren't a POJO. It doesn't have any 
behavior, why do we need to stub it?


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854399
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
+context.getStormId(), context.getThisTaskId(), 
Thread.currentThread().getName()));
+this.processingGuarantee = processingGuarantee;
+} catch (JsonProcessingException e) {
+LOG.error("Failed to create Kafka commit metadata due to JSON 
serialization error", e);
+throw new RuntimeException(e);
+}
+}
+
+/**
+ * Checks if {@link OffsetAndMetadata} was committed by a {@link 
KafkaSpout} instance in this topology.
+ *
+ * @param tp The topic partition the commit metadata belongs to.
+ * @param committedOffset {@link OffsetAndMetadata} info committed to 
Kafka
+ * @param offsetManagers The offset managers.
+ * @return true if this topology committed this {@link 
OffsetAndMetadata}, false otherwise
+ */
+public boolean isOffsetCommittedByThisTopology(TopicPartition tp, 
OffsetAndMetadata committedOffset,
+Map offsetManagers) {
+try {
+if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+&& offsetManagers.containsKey(tp)
+&& offsetManagers.get(tp).hasCommitted()) {
+return true;
+}
+
+final CommitMetadata committedMetadata = 
JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+return 
committedMetadata.getTopologyId().equals(context.getStormId());
+} catch (IOException e) {
+LOG.warn("Failed to deserialize [{}]. Error likely occurred 
because the last commit "
--- End diff --

Yes, I'll update this message


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854164
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
--- End diff --

No, the metadata is the same if you create two CommitMetadataManagers from 
the same spout instance. 


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165854105
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
--- End diff --

Objectmappers are thread safe. I don't see anywhere in the documentation 
that mentions whether there are internal locks being used in the ObjectMapper, 
but this post suggests there aren't 
https://stackoverflow.com/questions/18611565/how-do-i-correctly-reuse-jackson-objectmapper#comment27462917_18618918.
 A quick google also suggests that it should be fine to use ObjectMapper as a 
singleton.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853833
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
--- End diff --

It's in the internal package, so if we make it package private the spout 
can't see it. I think it's up to users to figure out that if the class is in a 
package called "internal", they probably shouldn't use it. Once we start 
looking at Java 9 modularization we can make sure not to export this.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853784
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -142,7 +139,7 @@ public void open(Map conf, 
TopologyContext context, SpoutOutputC
 offsetManagers = new HashMap<>();
 emitted = new HashSet<>();
 waitingToEmit = new HashMap<>();
-setCommitMetadata(context);
+commitMetadataManager = new CommitMetadataManager(context, 
kafkaSpoutConfig.getProcessingGuarantee());
--- End diff --

I think we should wait. Let's do it if someone needs it, but introducing 
more extension points than we need is likely to cause us more headaches down 
the road, because once the extension point is public it's harder for us to 
change if we need to because we have to consider that other people may be 
implementing the interface.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853798
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords 
consumerRecords) {
 numPolledRecords);
 if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
 //Commit polled records immediately to ensure delivery is 
at-most-once.
-kafkaConsumer.commitSync();
+Map offsetsToCommit = 
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+kafkaConsumer.commitSync(offsetsToCommit);
--- End diff --

Will rename


---


[GitHub] storm issue #2537: STORM-2914: Implement ProcessingGuarantee.NONE in the spo...

2018-02-04 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2537
  
+1. Let's squash and as far as I am concerned it is good to merge. Once 
this is squash can you please rebase STORM-2913. Thanks.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852737
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord 
record) {
 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
 } else {
 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
-if (committedOffset != null && 
isOffsetCommittedByThisTopology(tp, committedOffset)
+if (isAtLeastOnceProcessing()
+&& committedOffset != null 
+&& 
commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, 
offsetManagers)
 && committedOffset.offset() > record.offset()) {
 // Ensures that after a topology with this id is started, 
the consumer fetch
 // position never falls behind the committed offset 
(STORM-2844)
-throw new IllegalStateException("Attempting to emit a 
message that has already been committed.");
+throw new IllegalStateException("Attempting to emit a 
message that has already been committed."
++ " This should never occur in at-least-once mode.");
--- End diff --

for at-least-once semantics.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853341
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
--- End diff --

All the methods and constructors in this class should be package protected


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852835
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
--- End diff --

Do we want one ObjectMapper for all the KafkaSpout instances (executors), 
or one per executor? This will share it across all the instances. Perhaps we 
should have one per instance.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852864
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
--- End diff --

Do you mean CommitMetadataManager.class ?


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853160
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
+context.getStormId(), context.getThisTaskId(), 
Thread.currentThread().getName()));
+this.processingGuarantee = processingGuarantee;
+} catch (JsonProcessingException e) {
+LOG.error("Failed to create Kafka commit metadata due to JSON 
serialization error", e);
+throw new RuntimeException(e);
+}
+}
+
+/**
+ * Checks if {@link OffsetAndMetadata} was committed by a {@link 
KafkaSpout} instance in this topology.
+ *
+ * @param tp The topic partition the commit metadata belongs to.
+ * @param committedOffset {@link OffsetAndMetadata} info committed to 
Kafka
+ * @param offsetManagers The offset managers.
+ * @return true if this topology committed this {@link 
OffsetAndMetadata}, false otherwise
+ */
+public boolean isOffsetCommittedByThisTopology(TopicPartition tp, 
OffsetAndMetadata committedOffset,
+Map offsetManagers) {
+try {
+if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+&& offsetManagers.containsKey(tp)
+&& offsetManagers.get(tp).hasCommitted()) {
+return true;
+}
+
+final CommitMetadata committedMetadata = 
JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+return 
committedMetadata.getTopologyId().equals(context.getStormId());
+} catch (IOException e) {
+LOG.warn("Failed to deserialize [{}]. Error likely occurred 
because the last commit "
--- End diff --

We should either write in the README or as part of this message that this 
WARN is expected the first time a user starts this or an earlier version of the 
spout with commits to Kafka done by an older version of the spout.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853240
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -311,7 +273,10 @@ public void nextTuple() {
 if (isAtLeastOnceProcessing()) {
 
commitOffsetsForAckedTuples(kafkaConsumer.assignment());
 } else if (kafkaSpoutConfig.getProcessingGuarantee() == 
ProcessingGuarantee.NONE) {
-commitConsumedOffsets(kafkaConsumer.assignment());
+Map offsetsToCommit 
= 
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+kafkaConsumer.commitAsync(offsetsToCommit, null);
--- End diff --

createFetchedOffsetsMetadata


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852272
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -142,7 +139,7 @@ public void open(Map conf, 
TopologyContext context, SpoutOutputC
 offsetManagers = new HashMap<>();
 emitted = new HashSet<>();
 waitingToEmit = new HashMap<>();
-setCommitMetadata(context);
+commitMetadataManager = new CommitMetadataManager(context, 
kafkaSpoutConfig.getProcessingGuarantee());
--- End diff --

I wonder if this should become available to the KakfaSpout through 
KafkaSpoutConfig, perhaps using a factory such that we could make it pluggable, 
in case there is need to support a different behavior in the future.

We can also wait to do that until we need it. Just wanted to get your 
thoughts on it.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852696
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords 
consumerRecords) {
 numPolledRecords);
 if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
 //Commit polled records immediately to ensure delivery is 
at-most-once.
-kafkaConsumer.commitSync();
+Map offsetsToCommit = 
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+kafkaConsumer.commitSync(offsetsToCommit);
+LOG.debug("Committed offsets {} to Kafka", 
offsetsToCommit);
--- End diff --

Committed offsets {} synchronously to Kafka


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852670
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords 
consumerRecords) {
 numPolledRecords);
 if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
 //Commit polled records immediately to ensure delivery is 
at-most-once.
-kafkaConsumer.commitSync();
+Map offsetsToCommit = 
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+kafkaConsumer.commitSync(offsetsToCommit);
--- End diff --

createFetchedOffsetsMetadata


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853046
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord 
record) {
 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
 } else {
 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
-if (committedOffset != null && 
isOffsetCommittedByThisTopology(tp, committedOffset)
+if (isAtLeastOnceProcessing()
+&& committedOffset != null 
+&& 
commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, 
offsetManagers)
--- End diff --

Collections.unmodifiableMap(offsetManagers)


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852989
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
--- End diff --

Ideally commitMetadata would be passed in the constructor to facilitate 
unit testing. We could have a factory method in this class itself with this code

```java
public CommitMetadataManager(TopologyContext context, ProcessingGuarantee 
processingGuarantee, String commitMetadata)
```

```java
public static CommitMetadataManager newInstance(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
return new CommitMetadataManager(context, processingGuarantee, 
JSON_MAPPER.writeValueAsString(new CommitMetadata(context.getStormId(), 
context.getThisTaskId(), Thread.currentThread().getName(;
}
```

handling the JsonProcessingException in the factory method


---


[GitHub] storm issue #2537: STORM-2914: Implement ProcessingGuarantee.NONE in the spo...

2018-02-04 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2537
  
@hmcl Thanks for the review, caught a lot of oversights :)


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165852894
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -143,28 +146,28 @@ public String toString() {
 
 /**
  * This enum controls when the tuple with the {@link ConsumerRecord} 
for an offset is marked as processed,
- * i.e. when the offset can be committed to Kafka.
+ * i.e. when the offset can be committed to Kafka. The default value 
is AT_LEAST_ONCE.
  * The commit interval is controlled by {@link 
KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an 
interval.
  * NO_GUARANTEE may be removed in a later release without warning, 
we're still evaluating whether it makes sense to keep.
- *
- * 
- * AT_LEAST_ONCE - an offset is ready to commit only after the 
corresponding tuple has been processed and acked (at least once). If
- * a tuple fails or times out it will be re-emitted, as controlled by 
the {@link KafkaSpoutRetryService}. Commits on the defined
- * interval.
- * 
- * AT_MOST_ONCE - every offset will be committed to Kafka right 
after being polled but before being emitted to the downstream
- * components of the topology. The commit interval is ignored. This 
mode guarantees that the offset is processed at most once by
- * ensuring the spout won't retry tuples that fail or time out after 
the commit to Kafka has been done.
- * 
- * NO_GUARANTEE - the polled offsets are ready to commit 
immediately after being polled. The offsets are committed periodically,
- * i.e. a message may be processed 0, 1 or more times. This behavior 
is similar to setting enable.auto.commit=true in the consumer, but
- * allows the spout to control when commits occur. Commits on the 
defined interval. 
- * 
  */
 @InterfaceStability.Unstable
 public enum ProcessingGuarantee {
+/**
+ * An offset is ready to commit only after the corresponding tuple 
has been processed and acked (at least once). If a tuple fails or
+ * times out it will be re-emitted, as controlled by the {@link 
KafkaSpoutRetryService}. Commits on the defined interval.
+ */
 AT_LEAST_ONCE,
+/**
+ * Every offset will be committed to Kafka right after being 
polled but before being emitted to the downstream components of the
+ * topology. The commit interval is ignored. This mode guarantees 
that the offset is processed at most once by ensuring the spout
+ * won't retry tuples that fail or time out after the commit to 
Kafka has been done
+ */
 AT_MOST_ONCE,
+/**
+ * The polled offsets are ready to commit immediately after being 
polled. The offsets are committed periodically, i.e. a message may
+ * be processed 0, 1 or more times. This behavior is similar to 
setting enable.auto.commit=true in the consumer, but allows the
+ * spout to control when commits occur. Commits on the defined 
interval
--- End diff --

Yes, will add notes to all of them


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165852904
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +459,33 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setKafkaPropsForProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalStateException("The KafkaConsumer " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
++ " setting is not supported. You can configure similar 
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
 }
-if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-} else {
-String autoOffsetResetPolicy = 
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
-if (autoOffsetResetPolicy == null) {
-/*
-If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
-for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
-error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer 
-requests an offset that was deleted.
- */
-
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
-LOG.warn("Cannot guarantee at-least-once processing 
with auto.offset.reset.policy other than 'earliest' or 'none'."
-+ " Some messages may be skipped.");
-}
-} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
-if (autoOffsetResetPolicy != null
-&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
-LOG.warn("Cannot guarantee at-most-once processing 
with auto.offset.reset.policy other than 'latest' or 'none'."
-+ " Some messages may be processed more than 
once.");
-}
+String autoOffsetResetPolicy = (String) 
builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
+if (autoOffsetResetPolicy == null) {
+/*
+ * If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
+ * for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
+ * error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer
+ * requests an offset that was deleted.
+ */
+
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
--- End diff --

Good point, will add the log


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165852132
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -519,6 +519,15 @@ private boolean isEmitTuple(List tuple) {
 return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
 }
 
+private void commitConsumedOffsets(Set 
assignedPartitions) {
+Map offsetsToCommit = new 
HashMap<>();
+for (TopicPartition tp : assignedPartitions) {
+offsetsToCommit.put(tp, new 
OffsetAndMetadata(kafkaConsumer.position(tp)));
+}
+kafkaConsumer.commitAsync(offsetsToCommit, null);
--- End diff --

agree


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165852072
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +459,33 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setKafkaPropsForProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalStateException("The KafkaConsumer " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
++ " setting is not supported. You can configure similar 
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
 }
-if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-} else {
-String autoOffsetResetPolicy = 
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
-if (autoOffsetResetPolicy == null) {
-/*
-If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
-for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
-error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer 
-requests an offset that was deleted.
- */
-
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
-LOG.warn("Cannot guarantee at-least-once processing 
with auto.offset.reset.policy other than 'earliest' or 'none'."
-+ " Some messages may be skipped.");
-}
-} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
-if (autoOffsetResetPolicy != null
-&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
-LOG.warn("Cannot guarantee at-most-once processing 
with auto.offset.reset.policy other than 'latest' or 'none'."
-+ " Some messages may be processed more than 
once.");
-}
+String autoOffsetResetPolicy = (String) 
builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
+if (autoOffsetResetPolicy == null) {
+/*
+ * If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
+ * for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
+ * error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer
+ * requests an offset that was deleted.
+ */
+
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
+LOG.warn("Cannot guarantee at-least-once processing with 
auto.offset.reset.policy other than 'earliest' or 'none'."
++ " Some messages may be skipped.");
+}
+} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
+if (autoOffsetResetPolicy != null
+&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
+LOG.warn("Cannot guarantee at-most-once processing with 
auto.offset.reset.policy other than 'latest' or 'none'."
++ " Some messages may be processed more than once.");
 }
-
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 }
+builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_C

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165851895
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -143,28 +146,28 @@ public String toString() {
 
 /**
  * This enum controls when the tuple with the {@link ConsumerRecord} 
for an offset is marked as processed,
- * i.e. when the offset can be committed to Kafka.
+ * i.e. when the offset can be committed to Kafka. The default value 
is AT_LEAST_ONCE.
  * The commit interval is controlled by {@link 
KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an 
interval.
  * NO_GUARANTEE may be removed in a later release without warning, 
we're still evaluating whether it makes sense to keep.
- *
- * 
- * AT_LEAST_ONCE - an offset is ready to commit only after the 
corresponding tuple has been processed and acked (at least once). If
- * a tuple fails or times out it will be re-emitted, as controlled by 
the {@link KafkaSpoutRetryService}. Commits on the defined
- * interval.
- * 
- * AT_MOST_ONCE - every offset will be committed to Kafka right 
after being polled but before being emitted to the downstream
- * components of the topology. The commit interval is ignored. This 
mode guarantees that the offset is processed at most once by
- * ensuring the spout won't retry tuples that fail or time out after 
the commit to Kafka has been done.
- * 
- * NO_GUARANTEE - the polled offsets are ready to commit 
immediately after being polled. The offsets are committed periodically,
- * i.e. a message may be processed 0, 1 or more times. This behavior 
is similar to setting enable.auto.commit=true in the consumer, but
- * allows the spout to control when commits occur. Commits on the 
defined interval. 
- * 
  */
 @InterfaceStability.Unstable
 public enum ProcessingGuarantee {
+/**
+ * An offset is ready to commit only after the corresponding tuple 
has been processed and acked (at least once). If a tuple fails or
+ * times out it will be re-emitted, as controlled by the {@link 
KafkaSpoutRetryService}. Commits on the defined interval.
+ */
 AT_LEAST_ONCE,
+/**
+ * Every offset will be committed to Kafka right after being 
polled but before being emitted to the downstream components of the
+ * topology. The commit interval is ignored. This mode guarantees 
that the offset is processed at most once by ensuring the spout
+ * won't retry tuples that fail or time out after the commit to 
Kafka has been done
+ */
 AT_MOST_ONCE,
+/**
+ * The polled offsets are ready to commit immediately after being 
polled. The offsets are committed periodically, i.e. a message may
+ * be processed 0, 1 or more times. This behavior is similar to 
setting enable.auto.commit=true in the consumer, but allows the
+ * spout to control when commits occur. Commits on the defined 
interval
--- End diff --

Commits are made asynchronously on the defined interval.

Should we also say specifically that for A_L_O and A_M_O the commits are 
done synchronously ?


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165852060
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +459,33 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setKafkaPropsForProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalStateException("The KafkaConsumer " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
++ " setting is not supported. You can configure similar 
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
 }
-if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-} else {
-String autoOffsetResetPolicy = 
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
-if (autoOffsetResetPolicy == null) {
-/*
-If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
-for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
-error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer 
-requests an offset that was deleted.
- */
-
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
-LOG.warn("Cannot guarantee at-least-once processing 
with auto.offset.reset.policy other than 'earliest' or 'none'."
-+ " Some messages may be skipped.");
-}
-} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
-if (autoOffsetResetPolicy != null
-&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
-LOG.warn("Cannot guarantee at-most-once processing 
with auto.offset.reset.policy other than 'latest' or 'none'."
-+ " Some messages may be processed more than 
once.");
-}
+String autoOffsetResetPolicy = (String) 
builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
+if (autoOffsetResetPolicy == null) {
+/*
+ * If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
+ * for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
+ * error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer
+ * requests an offset that was deleted.
+ */
+
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
--- End diff --

We should print and INFO level log here saying:

LOG.info("Set Kafka property {} to {}, 
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


---


[GitHub] storm issue #2537: STORM-2914: Implement ProcessingGuarantee.NONE in the spo...

2018-02-04 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2537
  
@hmcl Addressed your comments. Also fixed the storm-kafka-client docs to 
use release specific links, instead of linking to the 1.0.x branch. I've 
removed some of the listings of enum values in favor of linking to the enum 
javadoc as well, I don't think it's a good idea to duplicate that information. 
It's better if we just link to the javadoc, so the documentation doesn't get 
out of sync.


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165850084
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -210,23 +215,26 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 }
 
 /**
- * Set a {@link KafkaConsumer} property.
+ * Set a {@link KafkaConsumer} property. Please don't set 
enable.auto.commit, instead set the {@link ProcessingGuarantee}
--- End diff --

Moved it to the documentation


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165850081
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -519,6 +519,15 @@ private boolean isEmitTuple(List tuple) {
 return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
 }
 
+private void commitConsumedOffsets(Set 
assignedPartitions) {
+Map offsetsToCommit = new 
HashMap<>();
+for (TopicPartition tp : assignedPartitions) {
+offsetsToCommit.put(tp, new 
OffsetAndMetadata(kafkaConsumer.position(tp)));
+}
+kafkaConsumer.commitAsync(offsetsToCommit, null);
--- End diff --

I don't think it improves readability, it also looks a little weird to have 
a constant null field at the top of the class IMO.


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165850087
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +451,33 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setPropsToFitProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
--- End diff --

This isn't javadoc, so linking won't work. Changed the message to be 
friendlier.


---