[GitHub] storm issue #2913: STORM-3290: Split configuration for storm-kafka-client Tr...

2018-11-29 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2913
  
+1


---


[GitHub] storm issue #2829: STORM-3222: Fix KafkaSpout internals to use LinkedList in...

2018-09-12 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2829
  
+1


---


[GitHub] storm issue #2829: STORM-3222: Fix KafkaSpout internals to use LinkedList in...

2018-09-12 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2829
  
@arunmahadevan yes, it make sense to use a LinkedList here. Thanks for the 
explanation.


---


[GitHub] storm issue #2829: STORM-3222: Fix KafkaSpout internals to use LinkedList in...

2018-09-12 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2829
  
@revans2 if I am not mistaken I recall that when you reviewed the initial 
PR you suggested that a call to nextTuple should send only one tuple/record. 
Can you please refresh my mind on the reasons why? If I am confusing with a 
different patch, please disregard this message. Thanks.


---


[GitHub] storm issue #2829: STORM-3222: Fix KafkaSpout internals to use LinkedList in...

2018-09-12 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2829
  
@arunmahadevan what's the motivation to change this to LinkedList?

nextTuple emits only a single tuple because that's the contract of the 
method nextTuple, which must be honored. This was thoroughly discussed in the 
patch with the initial code implementation.


---


[GitHub] storm pull request #2775: MINOR - Make raw type assignment type safe

2018-07-24 Thread hmcl
GitHub user hmcl opened a pull request:

https://github.com/apache/storm/pull/2775

MINOR - Make raw type assignment type safe



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hmcl/storm-apache master_skc_RawTypeAssignSafe

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2775.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2775


commit 5084c9756a4d792c4ac3a2a64e0d0ec2da8530e5
Author: Hugo Louro 
Date:   2018-07-25T01:42:23Z

MINOR - Make raw type assignment type safe




---


[GitHub] storm pull request #2667: STORM-3063: Fix minor pom issues

2018-05-09 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2667#discussion_r187100160
  
--- Diff: pom.xml ---
@@ -1275,6 +1270,25 @@
 
true
 
 
+
+org.apache.maven.plugins
+maven-enforcer-plugin
+
+
+enforce-maven
--- End diff --

Agree. Would 'enforce-maven-version' be more descriptive?


---


[GitHub] storm issue #2667: STORM-3063: Fix minor pom issues

2018-05-08 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2667
  
+1


---


[GitHub] storm pull request #2667: STORM-3063: Fix minor pom issues

2018-05-08 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2667#discussion_r186887493
  
--- Diff: pom.xml ---
@@ -1275,6 +1270,25 @@
 
true
 
 
+
+org.apache.maven.plugins
+maven-enforcer-plugin
+
+
+enforce-maven
--- End diff --

since there is only one execution, is 'id' really necessary ?


---


[GitHub] storm issue #2637: STORM-3060: Map of Spout configurations from storm-kafka ...

2018-05-08 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2637
  
+1
@srishtyagrawal thank you for your nice and helpful contribution. It will 
benefit a lot of users.


---


[GitHub] storm issue #2637: Map of Spout configurations from `storm-kafka` to `storm-...

2018-05-05 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2637
  
@srishtyagrawal Thank you for the code review. It is much better now. 
Besides my two comments above, I still wonder if it would be better to point 
the links with the description of the Kafka properties to the top of the [New 
Consumer 
Configs](http://kafka.apache.org/10/documentation.html#newconsumerconfigs) 
table. The Javadocs have no description of the property and are basically the 
name of the property written using capital letters and underscores. This won't 
be very helpful to the user. It's better to be pointed to the table. The user 
will know then that he has to search for the name of the property there.


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-05-05 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r186253569
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,39 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Translation from `storm-kafka` to `storm-kafka-client` spout properties
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/SpoutConfig.html)
 and

+[KafkaConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/KafkaConfig.html).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://storm.apache.org/releases/1.0.6/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.html)
 
+and Kafka 0.10.1.0 
[ConsumerConfig](https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerConfig.html).
+
+| SpoutConfig   | KafkaSpoutConfig/ConsumerConfig Name | KafkaSpoutConfig 
Usage |
--- End diff --

I suggest removing Name as ConsumerConfig is very explanatory


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-05-05 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r186253611
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,39 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Translation from `storm-kafka` to `storm-kafka-client` spout properties
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/SpoutConfig.html)
 and

+[KafkaConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/KafkaConfig.html).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://storm.apache.org/releases/1.0.6/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.html)
 
+and Kafka 0.10.1.0 
[ConsumerConfig](https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerConfig.html).
+
+| SpoutConfig   | KafkaSpoutConfig/ConsumerConfig Name | KafkaSpoutConfig 
Usage |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | 
[`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.htm
 
l#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | **Setting:** 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Setting:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for 
network requests. The buffer size which consumer has for pulling data from 
producer  **Default:** `1MB`| **Setting:** 
[`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG)
  The size of the TCP receive buffer (SO_RCVBUF) to use when reading 
data. If the value is -1, the OS default will be used | 
[`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `socketTimeoutMs` **Default:** `1` | **N/A** ||
+| **Setting:** `useStartOffsetTimeIfOffsetOutOfRange` **Default:** 
`true` | **Setting:** 
[`auto.offset.reset`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG)
 **Possible values:** `"latest"`, `"earliest"`, `"none"` **Default:** 
`latest`. Exception: `earliest` if 
[`ProcessingGuarantee`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.ProcessingGuarantee.ht

[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-05-05 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r186253597
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,39 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Translation from `storm-kafka` to `storm-kafka-client` spout properties
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/SpoutConfig.html)
 and

+[KafkaConfig](https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/KafkaConfig.html).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://storm.apache.org/releases/1.0.6/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.html)
 
+and Kafka 0.10.1.0 
[ConsumerConfig](https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerConfig.html).
+
+| SpoutConfig   | KafkaSpoutConfig/ConsumerConfig Name | KafkaSpoutConfig 
Usage |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | 
[`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.htm
 
l#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | **Setting:** 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Setting:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for 
network requests. The buffer size which consumer has for pulling data from 
producer  **Default:** `1MB`| **Setting:** 
[`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG)
  The size of the TCP receive buffer (SO_RCVBUF) to use when reading 
data. If the value is -1, the OS default will be used | 
[`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `socketTimeoutMs` **Default:** `1` | **N/A** ||
+| **Setting:** `useStartOffsetTimeIfOffsetOutOfRange` **Default:** 
`true` | **Setting:** 
[`auto.offset.reset`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG)
 **Possible values:** `"latest"`, `"earliest"`, `"none"` **Default:** 
`latest`. Exception: `earliest` if 
[`ProcessingGuarantee`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.ProcessingGuarantee.ht

[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-05-05 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r186253552
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,39 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Translation from `storm-kafka` to `storm-kafka-client` spout properties
--- End diff --

NIT: Rename translation to Mapping. This is not really a translation.


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185160807
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | **Import package:** 
`org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.`
  **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` 
  **Usage:** 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Kafka config:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| **Import package:** `import 
org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for 
network requests. The buffer size which consumer has for pulling data from 
producer  **Default:** `1MB`| **Kafka config:** 
[`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG)
  The size of the TCP receive buffer (SO_RCVBUF) to use when reading 
data. If the value is -1, the OS default will be used | **Import package:** 
`import org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `socketTimeoutMs` **Default:** `1` | 
Discontinued in `storm-kafka-client` ||
--- End diff --

Instead of discontinued I would put **N/A**


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185168182
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | **Import package:** 
`org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.`
  **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` 
  **Usage:** 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Kafka config:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| **Import package:** `import 
org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
--- End diff --

the (**max?**) number of bytes to attempt ... 


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185168669
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | **Import package:** 
`org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.`
  **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` 
  **Usage:** 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Kafka config:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| **Import package:** `import 
org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for 
network requests. The buffer size which consumer has for pulling data from 
producer  **Default:** `1MB`| **Kafka config:** 
[`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG)
  The size of the TCP receive buffer (SO_RCVBUF) to use when reading 
data. If the value is -1, the OS default will be used | **Import package:** 
`import org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `socketTimeoutMs` **Default:** `1` | 
Discontinued in `storm-kafka-client` ||
+| **Setting:** `useStartOffsetTimeIfOffsetOutOfRange` **Default:** 
`true` | **Kafka config:** 
[`auto.offset.reset`](https://kafka.apache.org/11/javadoc/org/apache

[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185167619
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
--- End diff --

The storm version is already specified above and could be omitted in the 
table.  IF possible I would suggest to present the table such as:
```
| SpoutConfig   | KafkaSpoutConfig   |  
---
| prop | desc | default | prop  | desc | default |
```


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185163739
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
--- End diff --

Do you really want to mean migration, or rather some sort of parallel 
between the name and meaning of the properties in storm-kafka vs 
storm-kafka-client. This may imply that there is a way to migrate, whereas I 
don't really have a migration, but rather a way to specify the same behavior in 
the old and new spout.


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185168807
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | **Import package:** 
`org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.`
  **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` 
  **Usage:** 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Kafka config:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| **Import package:** `import 
org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `bufferSizeBytes` Buffer size (in bytes) for 
network requests. The buffer size which consumer has for pulling data from 
producer  **Default:** `1MB`| **Kafka config:** 
[`receive.buffer.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#RECEIVE_BUFFER_CONFIG)
  The size of the TCP receive buffer (SO_RCVBUF) to use when reading 
data. If the value is -1, the OS default will be used | **Import package:** 
`import org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `socketTimeoutMs` **Default:** `1` | 
Discontinued in `storm-kafka-client` ||
+| **Setting:** `useStartOffsetTimeIfOffsetOutOfRange` **Default:** 
`true` | **Kafka config:** 
[`auto.offset.reset`](https://kafka.apache.org/11/javadoc/org/apache

[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185161496
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | **Import package:** 
`org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.`
  **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
--- End diff --

Are the "import package:" entries throughout necessary ? The ConsumerConfig 
strings come all from the same Kafka package, and the KafkaSpoutConfig 
configurations already need to have the package imported when the set* method 
is declared in the code.

It seems most "Usage:" web links are broken. Wouldn't it be better to 
simply paste the method signature and put the link for that same signature?


---


[GitHub] storm pull request #2637: Map of Spout configurations from `storm-kafka` to ...

2018-04-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2637#discussion_r185168952
  
--- Diff: docs/storm-kafka-client.md ---
@@ -313,4 +313,37 @@ KafkaSpoutConfig<String, String> kafkaConf = 
KafkaSpoutConfig
   .setTupleTrackingEnforced(true)
 ```
 
-Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
\ No newline at end of file
+Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, 
where tuple tracking is required and therefore always enabled.
+
+# Migrating a `storm-kafka` spout to use `storm-kafka-client`
+
+This may not be an exhaustive list because the `storm-kafka` configs were 
taken from Storm 0.9.6

+[SpoutConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java)
 and

+[KafkaConfig](https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java).
+`storm-kafka-client` spout configurations were taken from Storm 1.0.6

+[KafkaSpoutConfig](https://github.com/apache/storm/blob/v1.0.6/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java).
+
+| Storm-0.9.6 SpoutConfig   | Storm-1.0.6 KafkaSpoutConfig name | 
KafkaSpoutConfig usage help |
+| - |  | 
--- |
+| **Setting:** `startOffsetTime` **Default:** 
`EarliestTime`  
**Setting:** `forceFromStart`  **Default:** `false`  
`startOffsetTime` & `forceFromStart` together determine the starting offset. 
`forceFromStart` determines whether the Zookeeper offset is ignored. 
`startOffsetTime` sets the timestamp that determines the beginning offset, in 
case there is no offset in Zookeeper, or the Zookeeper offset is ignored | 
**Setting:** 
[`FirstPollOffsetStrategy`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html)
 **Default:** `UNCOMMITTED_EARLIEST`  [Refer to the helper 
table](#helper-table-for-setting-firstpolloffsetstrategy) for picking 
`FirstPollOffsetStrategy` based on your `startOffsetTime` & `forceFromStart` 
settings | **Import package:** 
`org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.`
  **Usage:** [`.setFirstPollOffsetStrategy()`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-)|
+| **Setting:** `scheme` The interface that specifies how a 
`ByteBuffer` from a Kafka topic is transformed into Storm tuple 
**Default:** `RawMultiScheme` | 
[`Deserializers`](https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html)|
 **Import package:** `import org.apache.kafka.clients.consumer.ConsumerConfig;` 
  **Usage:** 
[`.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)
 
[`.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
+| **Setting:** `fetchSizeBytes` Message fetch size -- the number 
of bytes to attempt to fetch in one request to a Kafka server  **Default:** 
`1MB` | **Kafka config:** 
[`max.partition.fetch.bytes`](https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html#MAX_PARTITION_FETCH_BYTES_CONFIG)
 **Default:** `1MB`| **Import package:** `import 
org.apache.kafka.clients.consumer.ConsumerConfig;`   **Usage:** 
[`.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 
)`](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-)|
--- End diff --

I suggest omitting "Kafka config:". The link is self explanatory


---


[GitHub] storm issue #2637: Map of Spout configurations from `storm-kafka` to `storm-...

2018-04-30 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2637
  
@erikdw I am reviewing this now. Sorry but I was away the last few days.


---


[GitHub] storm issue #2584: STORM-2985: Explicitly add jackson-annotations dependency...

2018-03-16 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2584
  
+1


---


[GitHub] storm issue #2550: STORM-2937: Overwrite latest storm-kafka-client 1.x-branc...

2018-02-07 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2550
  
+1.

Thanks @erikdw, it's really good. Thanks also for the discussion. Looking 
forward to continue it in the dev list, and hopefully agreeing on a good 
solution. 


---


[GitHub] storm issue #2550: STORM-2937: Overwrite latest storm-kafka-client 1.x-branc...

2018-02-07 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2550
  
@erikdw I am not sure I follow when you wrote:

> All commits of a PR are bunched together. They can all be referenced as a 
group in git revert or git cherry-pick. 

In the first git log example I gave in my [earlier 
comment](https://github.com/apache/storm/pull/2550#issuecomment-363955298), how 
can one find out from git log (i.e. commit history) that SHAs ad7360507, 
0ddc41154, 5d9ad1eaa, f758123b5, e42ae4de8, 533ef7ad2 all belong to JIRA 
STORM-2937.

Perhaps you mean it is possible to do it from GitHub?


---


[GitHub] storm issue #2550: STORM-2937: Overwrite latest storm-kafka-client 1.x-branc...

2018-02-07 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2550
  
@HeartSaVioR I understand your point of not putting the burden on the 
contributors. I didn't really think that it could be a big hurdle for 
contributors. However, taking that into consideration it makes sense that the 
person merging the code  should also squashing the commits. However, in that 
scenario the merger would also be responsible for the commit message, which in 
my opinion is responsibility of the contributor.


---


[GitHub] storm issue #2550: STORM-2937: Overwrite latest storm-kafka-client 1.x-branc...

2018-02-07 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2550
  
@ptgoetz, @srdo and I have been using the following approach for squashing 
the patches in storm-kafka-client, which I believe works well: 

- It is reasonable to submit a PR for review that has a several commits, 
typically done during development.
- During the PR review, changes addressing code review comments go in a new 
commit such that it is easy to verify if the code review comments are property 
addressed.
- Once the PR has a +1 from the reviewers and the changes are accepted to 
merge the creator of the PR will squash the commits.
- I would vote for having only one commit with the commit message starting 
either with STORM-1234: or MINOR:. I would favor not having commit messages 
starting with arbitrary messages. However, this is a very debatable subject and 
I am OK If we don't want to enforce that small commits  have to be prefixed by 
a token such as MINOR. I would say that significant patches should be required 
to have the JIRA number at the top.
- I have discussed previously, and I believe that it is not beneficial to 
to the community to merge into master PRs that have several commits that follow 
a pattern along the lines:

```
STORM-1234: Some storm feature
fix compilation issue
address code review
fix checkstyle
fix unit tests
...
```

I think the entire community would benefit from a more structured way of 
merging code into the main code line. Perhaps we could try again proposing some 
PR and commit guidelines for voting. What do you think?


---


[GitHub] storm issue #2550: STORM-2937: Overwrite latest storm-kafka-client 1.x-branc...

2018-02-07 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2550
  
@erikdw I agree that in this case where we are cherry-picking several 
commits it's very hard to avoid multiple commits, and probably we shouldn't 
even try to squash them because that will completely erase historical context 
and where things came from. Nevertheless, I would like to suggest that the all 
commit messages for this PR follow the convention[1] 

```
STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch
 - Message which ideally should contain the message and SHA of the commit 
that was cherry-picked. IntelliJ, and git cherry-pick typically do that by 
default
``` 

Currently git log for this PR (truncated to be easy to read here) looks 
like this:

```
ad7360507 add storm-kafka-client doc from 1.x-branch, and link to it from 
index.md
0ddc41154 copy Time.java from 1.x-branch to allow use of nanoTime() in 
storm-kafka-client
5d9ad1eaa backport changes to Fields to allow it to be checked for equality 
in storm-kafka-client 
f758123b5 backport trident interface changes
e42ae4de8 update storm-kafka-client pom.xml and base pom.xml as necessary 
for using storm-
533ef7ad2 copied external/storm-kafka-client from 1.x-branch at SHA 74ca795 
(this doesn't
9c8930036 Merge branch 'STORM-2918-1.0.x-merge' into 1.0.x-branch
4df41444b STORM-2918 Update Netty version
5b507eaf0 Merge branch 'STORM-2853-1.0.x-merge' into 1.0.x-branch
4841475a7 STORM-2853 Initialize tick tuple after initializing spouts/bolts
```

I think that it would be much easier to trace back the bulk change if the 
git log was something like:

```
ad7360507 STORM-2937: Overwrite storm-kafka-client 1.x-branch into 
1.0.x-branch
0ddc41154 STORM-2937: Overwrite storm-kafka-client 1.x-branch into 
1.0.x-branch
5d9ad1eaa STORM-2937: Overwrite storm-kafka-client 1.x-branch into 
1.0.x-branch
f758123b5 STORM-2937: Overwrite storm-kafka-client 1.x-branch into 
1.0.x-branch
e42ae4de8 STORM-2937: Overwrite storm-kafka-client 1.x-branch into 
1.0.x-branch
533ef7ad2 STORM-2937: Overwrite storm-kafka-client 1.x-branch into 
1.0.x-branch
9c8930036 Merge branch 'STORM-2918-1.0.x-merge' into 1.0.x-branch
4df41444b STORM-2918 Update Netty version
5b507eaf0 Merge branch 'STORM-2853-1.0.x-merge' into 1.0.x-branch
4841475a7 STORM-2853 Initialize tick tuple after initializing spouts/bolts
```

And, for instance, for SHA ad7360507 the message would be.

```
STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch
 - add storm-kafka-client doc from 1.x-branch, and link to it from index.md
  (cherry picked from commit SHA) - this would be optional if its a big 
pain, but I think git does it for you
``` 

Similarly for the other SHAs.

[1] - I don't necessarily mean Storm convention


---


[GitHub] storm issue #2549: STORM-2936 Overwrite latest storm-kafka-client 1.x-branch...

2018-02-06 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2549
  
+1

Thanks @HeartSaVioR for working on this right away.


---


[GitHub] storm issue #2549: STORM-2936 Overwrite latest storm-kafka-client 1.x-branch...

2018-02-06 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2549
  
+1

@HeartSaVioR I meant to say that you should add also to the commit log the 
SHA of the 1.x-branch that you checked out and of which copied the entire 
storm-kafka-client directory and the examples. Otherwise, how do we know when 
in the 1.x-branch history this back-porting was done?


---


[GitHub] storm issue #2549: STORM-2936 Overwrite latest storm-kafka-client 1.x-branch...

2018-02-06 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2549
  
@HeartSaVioR I agree that your approach. Can please add to the commit 
message the SHA from the 1.x-branch that was the base for this overwrite, such 
that one can trace it down in case it is necessary. Thanks.

I am not sure I am following when you mention compilation issues. I ran the 
following git command, and there seems to be no code changes.

```
git diff upstream/1.x-branch external/storm-kafka-client/
diff --git a/external/storm-kafka-client/pom.xml 
b/external/storm-kafka-client/pom.xml
index 51bb79771..0ba7d4d00 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -22,7 +22,7 @@
 
 storm
 org.apache.storm
-1.2.0-SNAPSHOT
+1.1.2-SNAPSHOT
 ../../pom.xml
 
``` 


---


[GitHub] storm issue #2547: Storm 2913 2914 1.x

2018-02-05 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2547
  
+1


---


[GitHub] storm issue #2538: STORM-2913: Add metadata to at-most-once and at-least-onc...

2018-02-04 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2538
  
+1. 

Once squashed is good to merge as far as I am concerned. Thanks a lot @srdo.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857516
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
--- End diff --

The only question mark is performance. I am OK with it staying like this. I 
just wanted to bring it up. If not for performance, in the extreme case, there 
would be no harm in creating one ObjectMapper in an Utility class and use it in 
the entire codebase.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857829
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +460,37 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setKafkaPropsForProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalStateException("The KafkaConsumer " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
++ " setting is not supported. You can configure similar 
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
 }
-if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-} else {
-String autoOffsetResetPolicy = 
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
-if (autoOffsetResetPolicy == null) {
-/*
-If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
-for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
-error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer 
-requests an offset that was deleted.
- */
-
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
-LOG.warn("Cannot guarantee at-least-once processing 
with auto.offset.reset.policy other than 'earliest' or 'none'."
-+ " Some messages may be skipped.");
-}
-} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
-if (autoOffsetResetPolicy != null
-&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
-LOG.warn("Cannot guarantee at-most-once processing 
with auto.offset.reset.policy other than 'latest' or 'none'."
-+ " Some messages may be processed more than 
once.");
-}
+String autoOffsetResetPolicy = (String) 
builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
+if (autoOffsetResetPolicy == null) {
+/*
+ * If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
+ * for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
+ * error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer
+ * requests an offset that was deleted.
+ */
+LOG.info("Setting consumer property '{}' to 'earliest' to 
ensure at-least-once processing",
+ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
+LOG.warn("Cannot guarantee at-least-once processing with 
auto.offset.reset.policy other than 'earliest' or 'none'."
++ " Some messages may be skipped.");
+}
+} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
+if (autoOffsetResetPolicy != null
+&& (!autoOffsetResetPolicy.equals("latest") &

[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857620
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
--- End diff --

It's bad practice to create objects in the constructor. I simply don't do 
it. However, it's done in a lot of places in Storm already, so I am OK if you 
want to leave it as is. 

Another example why this could potentially be bad is if someone wants do 
subclass this class. If we leave it like this, perhaps the class should be 
final then.

These are just some suggestions. You can  leave it as is or go with either 
of of the suggestions.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857949
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
--- End diff --

I disagree. The name of the logger instance should be the class that is 
logging the message or one of its super classes. There is no class called 
CommitMetadata in the codebase, so why should we have a logger called 
CommitMetadata?


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857479
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -142,7 +139,7 @@ public void open(Map<String, Object> conf, 
TopologyContext context, SpoutOutputC
 offsetManagers = new HashMap<>();
 emitted = new HashSet<>();
 waitingToEmit = new HashMap<>();
-setCommitMetadata(context);
+commitMetadataManager = new CommitMetadataManager(context, 
kafkaSpoutConfig.getProcessingGuarantee());
--- End diff --

ok


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857745
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
--- End diff --

ok


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857816
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +460,37 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setKafkaPropsForProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalStateException("The KafkaConsumer " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
++ " setting is not supported. You can configure similar 
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
 }
-if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-} else {
-String autoOffsetResetPolicy = 
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
-if (autoOffsetResetPolicy == null) {
-/*
-If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
-for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
-error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer 
-requests an offset that was deleted.
- */
-
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
-LOG.warn("Cannot guarantee at-least-once processing 
with auto.offset.reset.policy other than 'earliest' or 'none'."
-+ " Some messages may be skipped.");
-}
-} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
-if (autoOffsetResetPolicy != null
-&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
-LOG.warn("Cannot guarantee at-most-once processing 
with auto.offset.reset.policy other than 'latest' or 'none'."
-+ " Some messages may be processed more than 
once.");
-}
+String autoOffsetResetPolicy = (String) 
builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
+if (autoOffsetResetPolicy == null) {
+/*
+ * If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
+ * for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
+ * error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer
+ * requests an offset that was deleted.
+ */
+LOG.info("Setting consumer property '{}' to 'earliest' to 
ensure at-least-once processing",
--- End diff --

NIT: I would write Kafka consumer, but not a deal breaker.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165857469
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -75,8 +75,9 @@ public boolean 
isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetad
 final CommitMetadata committedMetadata = 
JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
 return 
committedMetadata.getTopologyId().equals(context.getStormId());
 } catch (IOException e) {
-LOG.warn("Failed to deserialize [{}]. Error likely occurred 
because the last commit "
-+ "for this topic-partition was done using an earlier 
version of Storm. "
+LOG.warn("Failed to deserialize expected commit metadata [{}]."
++ " This error is expected to occur once per partition, if 
the last commit to each partition"
++ " was by an earlier version of the KafkaSpout, or by 
something other than the KafkaSpout. "
--- End diff --

was done by an earlier version ... or by a process other than the KafkaSpout




---


[GitHub] storm issue #2537: STORM-2914: Implement ProcessingGuarantee.NONE in the spo...

2018-02-04 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2537
  
+1. Let's squash and as far as I am concerned it is good to merge. Once 
this is squash can you please rebase STORM-2913. Thanks.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853341
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
--- End diff --

All the methods and constructors in this class should be package protected


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852737
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> 
record) {
 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
 } else {
 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
-if (committedOffset != null && 
isOffsetCommittedByThisTopology(tp, committedOffset)
+if (isAtLeastOnceProcessing()
+&& committedOffset != null 
+&& 
commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, 
offsetManagers)
 && committedOffset.offset() > record.offset()) {
 // Ensures that after a topology with this id is started, 
the consumer fetch
 // position never falls behind the committed offset 
(STORM-2844)
-throw new IllegalStateException("Attempting to emit a 
message that has already been committed.");
+throw new IllegalStateException("Attempting to emit a 
message that has already been committed."
++ " This should never occur in at-least-once mode.");
--- End diff --

for at-least-once semantics.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852835
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
--- End diff --

Do we want one ObjectMapper for all the KafkaSpout instances (executors), 
or one per executor? This will share it across all the instances. Perhaps we 
should have one per instance.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852864
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
--- End diff --

Do you mean CommitMetadataManager.class ?


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853160
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
+context.getStormId(), context.getThisTaskId(), 
Thread.currentThread().getName()));
+this.processingGuarantee = processingGuarantee;
+} catch (JsonProcessingException e) {
+LOG.error("Failed to create Kafka commit metadata due to JSON 
serialization error", e);
+throw new RuntimeException(e);
+}
+}
+
+/**
+ * Checks if {@link OffsetAndMetadata} was committed by a {@link 
KafkaSpout} instance in this topology.
+ *
+ * @param tp The topic partition the commit metadata belongs to.
+ * @param committedOffset {@link OffsetAndMetadata} info committed to 
Kafka
+ * @param offsetManagers The offset managers.
+ * @return true if this topology committed this {@link 
OffsetAndMetadata}, false otherwise
+ */
+public boolean isOffsetCommittedByThisTopology(TopicPartition tp, 
OffsetAndMetadata committedOffset,
+Map<TopicPartition, OffsetManager> offsetManagers) {
+try {
+if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+&& offsetManagers.containsKey(tp)
+&& offsetManagers.get(tp).hasCommitted()) {
+return true;
+}
+
+final CommitMetadata committedMetadata = 
JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+return 
committedMetadata.getTopologyId().equals(context.getStormId());
+} catch (IOException e) {
+LOG.warn("Failed to deserialize [{}]. Error likely occurred 
because the last commit "
--- End diff --

We should either write in the README or as part of this message that this 
WARN is expected the first time a user starts this or an earlier version of the 
spout with commits to Kafka done by an older version of the spout.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852696
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords<K, V> 
consumerRecords) {
 numPolledRecords);
 if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
 //Commit polled records immediately to ensure delivery is 
at-most-once.
-kafkaConsumer.commitSync();
+Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+kafkaConsumer.commitSync(offsetsToCommit);
+LOG.debug("Committed offsets {} to Kafka", 
offsetsToCommit);
--- End diff --

Committed offsets {} synchronously to Kafka


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853240
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -311,7 +273,10 @@ public void nextTuple() {
 if (isAtLeastOnceProcessing()) {
 
commitOffsetsForAckedTuples(kafkaConsumer.assignment());
 } else if (kafkaSpoutConfig.getProcessingGuarantee() == 
ProcessingGuarantee.NONE) {
-commitConsumedOffsets(kafkaConsumer.assignment());
+Map<TopicPartition, OffsetAndMetadata> offsetsToCommit 
= 
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+kafkaConsumer.commitAsync(offsetsToCommit, null);
--- End diff --

createFetchedOffsetsMetadata


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852272
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -142,7 +139,7 @@ public void open(Map<String, Object> conf, 
TopologyContext context, SpoutOutputC
 offsetManagers = new HashMap<>();
 emitted = new HashSet<>();
 waitingToEmit = new HashMap<>();
-setCommitMetadata(context);
+commitMetadataManager = new CommitMetadataManager(context, 
kafkaSpoutConfig.getProcessingGuarantee());
--- End diff --

I wonder if this should become available to the KakfaSpout through 
KafkaSpoutConfig, perhaps using a factory such that we could make it pluggable, 
in case there is need to support a different behavior in the future.

We can also wait to do that until we need it. Just wanted to get your 
thoughts on it.


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852670
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -396,7 +361,10 @@ private void setWaitingToEmit(ConsumerRecords<K, V> 
consumerRecords) {
 numPolledRecords);
 if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
 //Commit polled records immediately to ensure delivery is 
at-most-once.
-kafkaConsumer.commitSync();
+Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+
createOffsetsToCommitForConsumedOffsets(kafkaConsumer.assignment());
+kafkaConsumer.commitSync(offsetsToCommit);
--- End diff --

createFetchedOffsetsMetadata


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165853046
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -469,11 +437,14 @@ private boolean emitOrRetryTuple(ConsumerRecord<K, V> 
record) {
 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
 } else {
 final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
-if (committedOffset != null && 
isOffsetCommittedByThisTopology(tp, committedOffset)
+if (isAtLeastOnceProcessing()
+&& committedOffset != null 
+&& 
commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, 
offsetManagers)
--- End diff --

Collections.unmodifiableMap(offsetManagers)


---


[GitHub] storm pull request #2538: STORM-2913: Add metadata to at-most-once and at-le...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2538#discussion_r165852989
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public class CommitMetadataManager {
+
+private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+private static final Logger LOG = 
LoggerFactory.getLogger(CommitMetadata.class);
+// Metadata information to commit to Kafka. It is unique per spout 
instance.
+private final String commitMetadata;
+private final ProcessingGuarantee processingGuarantee;
+private final TopologyContext context;
+
+/**
+ * Create a manager with the given context.
+ */
+public CommitMetadataManager(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
+this.context = context;
+try {
+commitMetadata = JSON_MAPPER.writeValueAsString(new 
CommitMetadata(
--- End diff --

Ideally commitMetadata would be passed in the constructor to facilitate 
unit testing. We could have a factory method in this class itself with this code

```java
public CommitMetadataManager(TopologyContext context, ProcessingGuarantee 
processingGuarantee, String commitMetadata)
```

```java
public static CommitMetadataManager newInstance(TopologyContext context, 
ProcessingGuarantee processingGuarantee) {
return new CommitMetadataManager(context, processingGuarantee, 
JSON_MAPPER.writeValueAsString(new CommitMetadata(context.getStormId(), 
context.getThisTaskId(), Thread.currentThread().getName(;
}
```

handling the JsonProcessingException in the factory method


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165852132
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -519,6 +519,15 @@ private boolean isEmitTuple(List tuple) {
 return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
 }
 
+private void commitConsumedOffsets(Set 
assignedPartitions) {
+Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>();
+for (TopicPartition tp : assignedPartitions) {
+offsetsToCommit.put(tp, new 
OffsetAndMetadata(kafkaConsumer.position(tp)));
+}
+kafkaConsumer.commitAsync(offsetsToCommit, null);
--- End diff --

agree


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165852072
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +459,33 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setKafkaPropsForProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalStateException("The KafkaConsumer " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
++ " setting is not supported. You can configure similar 
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
 }
-if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-} else {
-String autoOffsetResetPolicy = 
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
-if (autoOffsetResetPolicy == null) {
-/*
-If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
-for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
-error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer 
-requests an offset that was deleted.
- */
-
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
-LOG.warn("Cannot guarantee at-least-once processing 
with auto.offset.reset.policy other than 'earliest' or 'none'."
-+ " Some messages may be skipped.");
-}
-} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
-if (autoOffsetResetPolicy != null
-&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
-LOG.warn("Cannot guarantee at-most-once processing 
with auto.offset.reset.policy other than 'latest' or 'none'."
-+ " Some messages may be processed more than 
once.");
-}
+String autoOffsetResetPolicy = (String) 
builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
+if (autoOffsetResetPolicy == null) {
+/*
+ * If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
+ * for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
+ * error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer
+ * requests an offset that was deleted.
+ */
+
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
+LOG.warn("Cannot guarantee at-least-once processing with 
auto.offset.reset.policy other than 'earliest' or 'none'."
++ " Some messages may be skipped.");
+}
+} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
+if (autoOffsetResetPolicy != null
+&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
+LOG.warn("Cannot guarantee at-most-once processing with 
auto.offset.reset.policy other th

[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165851895
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -143,28 +146,28 @@ public String toString() {
 
 /**
  * This enum controls when the tuple with the {@link ConsumerRecord} 
for an offset is marked as processed,
- * i.e. when the offset can be committed to Kafka.
+ * i.e. when the offset can be committed to Kafka. The default value 
is AT_LEAST_ONCE.
  * The commit interval is controlled by {@link 
KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an 
interval.
  * NO_GUARANTEE may be removed in a later release without warning, 
we're still evaluating whether it makes sense to keep.
- *
- * 
- * AT_LEAST_ONCE - an offset is ready to commit only after the 
corresponding tuple has been processed and acked (at least once). If
- * a tuple fails or times out it will be re-emitted, as controlled by 
the {@link KafkaSpoutRetryService}. Commits on the defined
- * interval.
- * 
- * AT_MOST_ONCE - every offset will be committed to Kafka right 
after being polled but before being emitted to the downstream
- * components of the topology. The commit interval is ignored. This 
mode guarantees that the offset is processed at most once by
- * ensuring the spout won't retry tuples that fail or time out after 
the commit to Kafka has been done.
- * 
- * NO_GUARANTEE - the polled offsets are ready to commit 
immediately after being polled. The offsets are committed periodically,
- * i.e. a message may be processed 0, 1 or more times. This behavior 
is similar to setting enable.auto.commit=true in the consumer, but
- * allows the spout to control when commits occur. Commits on the 
defined interval. 
- * 
  */
 @InterfaceStability.Unstable
 public enum ProcessingGuarantee {
+/**
+ * An offset is ready to commit only after the corresponding tuple 
has been processed and acked (at least once). If a tuple fails or
+ * times out it will be re-emitted, as controlled by the {@link 
KafkaSpoutRetryService}. Commits on the defined interval.
+ */
 AT_LEAST_ONCE,
+/**
+ * Every offset will be committed to Kafka right after being 
polled but before being emitted to the downstream components of the
+ * topology. The commit interval is ignored. This mode guarantees 
that the offset is processed at most once by ensuring the spout
+ * won't retry tuples that fail or time out after the commit to 
Kafka has been done
+ */
 AT_MOST_ONCE,
+/**
+ * The polled offsets are ready to commit immediately after being 
polled. The offsets are committed periodically, i.e. a message may
+ * be processed 0, 1 or more times. This behavior is similar to 
setting enable.auto.commit=true in the consumer, but allows the
+ * spout to control when commits occur. Commits on the defined 
interval
--- End diff --

Commits are made asynchronously on the defined interval.

Should we also say specifically that for A_L_O and A_M_O the commits are 
done synchronously ?


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-04 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165852060
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +459,33 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setKafkaPropsForProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalStateException("The KafkaConsumer " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
++ " setting is not supported. You can configure similar 
behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
 }
-if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-
builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-} else {
-String autoOffsetResetPolicy = 
(String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
-if (autoOffsetResetPolicy == null) {
-/*
-If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
-for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
-error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer 
-requests an offset that was deleted.
- */
-
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-} else if (!autoOffsetResetPolicy.equals("earliest") && 
!autoOffsetResetPolicy.equals("none")) {
-LOG.warn("Cannot guarantee at-least-once processing 
with auto.offset.reset.policy other than 'earliest' or 'none'."
-+ " Some messages may be skipped.");
-}
-} else if (builder.processingGuarantee == 
ProcessingGuarantee.AT_MOST_ONCE) {
-if (autoOffsetResetPolicy != null
-&& (!autoOffsetResetPolicy.equals("latest") && 
!autoOffsetResetPolicy.equals("none"))) {
-LOG.warn("Cannot guarantee at-most-once processing 
with auto.offset.reset.policy other than 'latest' or 'none'."
-+ " Some messages may be processed more than 
once.");
-}
+String autoOffsetResetPolicy = (String) 
builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+if (builder.processingGuarantee == 
ProcessingGuarantee.AT_LEAST_ONCE) {
+if (autoOffsetResetPolicy == null) {
+/*
+ * If the user wants to explicitly set an auto offset 
reset policy, we should respect it, but when the spout is configured
+ * for at-least-once processing we should default to 
seeking to the earliest offset in case there's an offset out of range
+ * error, rather than seeking to the latest (Kafka's 
default). This type of error will typically happen when the consumer
+ * requests an offset that was deleted.
+ */
+
builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
--- End diff --

We should print and INFO level log here saying:

LOG.info("Set Kafka property {} to {}, 
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830316
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
 ---
@@ -196,13 +206,37 @@ public void 
testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
 }
 
 @Test
-public void testAnyTimesModeDoesNotCommitAckedTuples() throws 
Exception {
-//When tuple tracking is enabled, the spout must not commit acked 
tuples in any-times mode because committing is managed by the consumer
+public void testNoneModeCommitsPolledTuples() throws Exception {
+//When using the none guarantee, the spout must commit tuples 
periodically, regardless of whether they've been acked
--- End diff --

no-guarantee


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165829893
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -210,23 +215,26 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 }
 
 /**
- * Set a {@link KafkaConsumer} property.
+ * Set a {@link KafkaConsumer} property. Please don't set 
enable.auto.commit, instead set the {@link ProcessingGuarantee}
--- End diff --

Should we leave this info here, or add it to the [s-k-c 
documentation](https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md)?
 I would say it  belongs in s-k-c documentation. However, if we find it is too 
important, we can leave it here. Regardless of the location, I would write 
something along the lines: 

"the Kafka property enable.auto.commit is not supported and if set will 
throw an exception. All other Kafka properties that control Kafka auto commit 
mechanism, if set will be ignored.


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830237
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +451,33 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setPropsToFitProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
--- End diff --

IllegalStateException


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830142
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -519,6 +519,15 @@ private boolean isEmitTuple(List tuple) {
 return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
 }
 
+private void commitConsumedOffsets(Set 
assignedPartitions) {
--- End diff --

perhaps the name of this method should be "commitFetchedOffsetsAsync" based 
on the javadoc for 
[kafkaConsumer.position(tp)](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1396)


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830128
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -307,8 +307,12 @@ public void nextTuple() {
 kafkaSpoutConfig.getSubscription().refreshAssignment();
 }
 
-if (shouldCommit()) {
-commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) 
{ //commit timer is null for AT_MOST_ONCE mode
+if (isAtLeastOnceProcessing()) {
+
commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+} else if (kafkaSpoutConfig.getProcessingGuarantee() == 
ProcessingGuarantee.NONE) {
--- End diff --

we can delete the if condition and leave only else because if timer!=null 
the processing guarantee is either at-least-once or no-guarantee. If we want to 
make it clear we can put a line comment.


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830302
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
 ---
@@ -153,8 +163,8 @@ public void testAtMostOnceModeCannotReplayTuples() 
throws Exception {
 }
 
 @Test
-public void testAnyTimesModeCannotReplayTuples() throws Exception {
-//When tuple tracking is enabled, the spout must not replay tuples 
in any-times mode
+public void testNoneModeCannotReplayTuples() throws Exception {
+//When tuple tracking is enabled, the spout must not replay tuples 
in none mode
--- End diff --

no-guarantee


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830301
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
 ---
@@ -153,8 +163,8 @@ public void testAtMostOnceModeCannotReplayTuples() 
throws Exception {
 }
 
 @Test
-public void testAnyTimesModeCannotReplayTuples() throws Exception {
-//When tuple tracking is enabled, the spout must not replay tuples 
in any-times mode
+public void testNoneModeCannotReplayTuples() throws Exception {
--- End diff --

testProcessingGuaranteeNoGuaranteeCannotReplayTuples


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830313
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
 ---
@@ -196,13 +206,37 @@ public void 
testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
 }
 
 @Test
-public void testAnyTimesModeDoesNotCommitAckedTuples() throws 
Exception {
-//When tuple tracking is enabled, the spout must not commit acked 
tuples in any-times mode because committing is managed by the consumer
+public void testNoneModeCommitsPolledTuples() throws Exception {
--- End diff --

testProcessingGuaranteeNoGuaranteeCommitsPolledTuples


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830248
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +451,33 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setPropsToFitProcessingGuarantee(Builder 
builder) {
--- End diff --

setKafkaPropsForProcessingGuarantee


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830275
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -453,37 +451,33 @@ public Builder(String bootstrapServers, Subscription 
subscription) {
 return builder;
 }
 
-private static void setAutoCommitMode(Builder builder) {
+private static void setPropsToFitProcessingGuarantee(Builder 
builder) {
 if 
(builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-+ " Instead use 
KafkaSpoutConfig.Builder.setProcessingGuarantee");
+throw new IllegalArgumentException("Do not set " + 
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
--- End diff --

message should be: 

Kafka enable.auto.commit is not supported. Please set the desired 
ProcessingGuarantee using {@link setProcessingGuarantee}


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830219
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -519,6 +519,15 @@ private boolean isEmitTuple(List tuple) {
 return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
 }
 
+private void commitConsumedOffsets(Set 
assignedPartitions) {
+Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>();
+for (TopicPartition tp : assignedPartitions) {
+offsetsToCommit.put(tp, new 
OffsetAndMetadata(kafkaConsumer.position(tp)));
+}
+kafkaConsumer.commitAsync(offsetsToCommit, null);
--- End diff --

I would create a constant called NO_CALLBACK to improve readability


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830294
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
 ---
@@ -109,7 +119,7 @@ public void 
testAtMostOnceModeDisregardsMaxUncommittedOffsets() throws Exception
 }
 
 @Test
-public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws 
Exception {
+public void testNoneModeDisregardsMaxUncommittedOffsets() throws 
Exception {
--- End diff --

testProcessingGuaranteeNoGuaranteeDisregardsMaxUncommittedOffsets


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830061
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -133,8 +133,8 @@ public void open(Map<String, Object> conf, 
TopologyContext context, SpoutOutputC
 
 tupleListener = kafkaSpoutConfig.getTupleListener();
 
-if (isAtLeastOnceProcessing()) {
-// Only used if the spout should commit an offset to Kafka 
only after the corresponding tuple has been acked.
+if (kafkaSpoutConfig.getProcessingGuarantee() != 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
+// In at-most-once mode the offsets are committed after every 
poll, so the timer is not used
--- End diff --

In at-most-once mode the offsets are committed after every poll and not 
periodically as controlled by the timer.


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830031
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -89,7 +89,7 @@
 private transient KafkaSpoutRetryService retryService;
 // Handles tuple events (emit, ack etc.)
 private transient KafkaTupleListener tupleListener;
-// timer == null if processing guarantee is none or at-most-once
+// timer == null if processing guarantee is at-most-once
--- End diff --

only if the


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830279
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
 ---
@@ -85,4 +90,12 @@ public void testMetricsTimeBucketSizeInSecs() {
 
 assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
 }
+
+@Test
+public void testEnableAutoCommitIsBanned() {
--- End diff --

testEnableAutoCommitNotSupported


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830085
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -307,8 +307,12 @@ public void nextTuple() {
 kafkaSpoutConfig.getSubscription().refreshAssignment();
 }
 
-if (shouldCommit()) {
-commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) 
{ //commit timer is null for AT_MOST_ONCE mode
--- End diff --

I would say this comment is redundant here. We already say that on the 
field, so I suggest that we just remove it.


---


[GitHub] storm pull request #2537: STORM-2914: Implement ProcessingGuarantee.NONE in ...

2018-02-03 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2537#discussion_r165830283
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
 ---
@@ -85,4 +90,12 @@ public void testMetricsTimeBucketSizeInSecs() {
 
 assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
 }
+
+@Test
+public void testEnableAutoCommitIsBanned() {
+expectedException.expect(IllegalArgumentException.class);
--- End diff --

IllegalStateException.class


---


[GitHub] storm issue #2538: STORM-2913: Add metadata to at-most-once and at-least-onc...

2018-01-30 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2538
  
@srdo there is a discrepancy between the title of this pull request and the 
title of the associated JIRA. What problem are you trying to solve in this 
patch? Add meatada, or remove warnings?

If the goal is to solve the recurring WARN messages that get printed, in my 
opinion the obvious thing to do is to simply not log the message unless the 
processing guarantee is AT_LEAST_ONCE. This would be a one line change

Can you please also clarify what is the need to add metadata when running 
the spout in AT-MOST-ONCE mode ? Also, I think it is quite dangerous to try to 
mimic the behavior Kafka does by simply calling commitAsync(...) in auto.commit 
mode. Besides the WARN messages, is there any other problem associated with 
letting Kafka handle everything in auto.commit mode?






---


[GitHub] storm issue #2537: STORM-2914: Implement ProcessingGuarantee.NONE in the spo...

2018-01-30 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2537
  
@srdo I left the 
[comment](https://issues.apache.org/jira/browse/STORM-2914?focusedCommentId=16346102=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16346102)
 on the JIRA.


---


[GitHub] storm issue #2530: STORM-2907: Exclude curator dependencies from storm-core ...

2018-01-23 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2530
  
OK, thanks.
+1


---


[GitHub] storm issue #2530: STORM-2907: Exclude curator dependencies from storm-core ...

2018-01-23 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2530
  
+1, however I don't understand why this module is using the maven app 
assembler plugin to copy all the jars under target/app-assembler/repo nor why 
all of these jars are posteriorly being copied by binary.xml onto 
external/storm-autocreds.

It seems to me that the root cause to this problem is not the transitive 
dependency, but rather the way these scripts are copying jars around.


---


[GitHub] storm issue #2512: [STORM-2894] fix some random typos

2018-01-15 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2512
  
@erikdw @srdo Thanks for checking in with me. I was off for the last few 
days and hence just getting back into this, although a bit too late because the 
patch just got merged before I got the chance to respond. 

@erikdw the fundamental reason behind squashing all the commits is sanity. 
The main challenge when the branches diverge is to track down what happened. 
The same happens when cherry-picking changes across branches. In Storm this is 
very hard because Storm's Git log does not have a well​ defined structure, 
does not have one commit per JIRA, and contributors don't always squash their 
commits. As an illustration I am adding a the output of "git log" for Storm, 
Kafka, Spark, Hbase, Flink, and Apex. The difference is notorious. Every other 
project has a clean, consistent, Git log, which makes it very easy to 
understand and track down what happened. Furthermore, it makes it also very 
easy to write git hooks to automate all sorts of tasks to merge and track down 
what happens cross branches, for example, helping users that have their own 
private distribution which gets synced with Apache from time to time (I face 
such tasks periodically and understand how much easier it would be if had 
 a clean git log).

One could argue one way or another. However, a fact is that all of the 
aforementioned mainstream Apache projects follow one commit per JIRA (with the 
exception of having perhaps a consistent token such as MINOR), and commits 
always squashed. In my opinion we should do so as well with absolutely no 
exceptions. The extra effort to do this is minimal, and the benefits are 
immense. I have proposed this a few times before, but for some reason it has 
never been agreed upon or even warranted a lengthy discussion (which honestly 
is hard for me to understand why).


![image](https://user-images.githubusercontent.com/10284328/34959386-299e7800-f9eb-11e7-99a5-9b18022e6665.png)



---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120269
  
--- Diff: external/storm-solr/README.md ---
@@ -97,6 +97,29 @@ field separates each value with the token % instead of 
the default | . To use th
 .setMultiValueFieldToken("%").build();
 ```
 
+##Working with Kerberized Solr
+If your topology is going to interact with kerberized Solr, your 
bolts/states need to be authenticated by Solr Server. We can enable
+authentication by distributing keytabs for solr user on all worker hosts. 
We can configure the solr bolt to use keytabs by setting
+SolrConfig.enableKerberos config property.
+
+On worker hosts the bolt/trident-state code will use the keytab file with 
principal provided in the jaas config to authenticate with
+Solr. You need to specify a Kerberos principal for the client and a 
corresponding keytab in the JAAS client configuration file.
+Also make sure the provided principal is configured with required 
permissions to access solr collections.
+
+Here’s an example JAAS config:
+
+`SolrJClient {
+  com.sun.security.auth.module.Krb5LoginModule required
+  useKeyTab=true
+  keyTab="/keytabs/foo.keytab"
--- End diff --

/keytabs/solr.keytab ? Perhaps we could put in here an entry that matches 
what Ambari typically creates.


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159121022
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/schema/builder/RestJsonSchemaBuilderV2.java
 ---
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.solr.schema.builder;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
+import org.apache.solr.client.solrj.request.schema.FieldTypeDefinition;
+import org.apache.solr.client.solrj.request.schema.SchemaRequest;
+import org.apache.solr.client.solrj.response.schema.SchemaRepresentation;
+import org.apache.solr.client.solrj.response.schema.SchemaResponse;
+import org.apache.storm.solr.config.SolrConfig;
+import org.apache.storm.solr.schema.CopyField;
+import org.apache.storm.solr.schema.Field;
+import org.apache.storm.solr.schema.FieldType;
+import org.apache.storm.solr.schema.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class that builds the {@link Schema} object from the schema returned by 
the SchemaRequest
+ */
+public class RestJsonSchemaBuilderV2 implements SchemaBuilder {
+private static final Logger logger = 
LoggerFactory.getLogger(RestJsonSchemaBuilderV2.class);
+private Schema schema = new Schema();
+private SolrConfig solrConfig;
+private String collection;
+
+public RestJsonSchemaBuilderV2(SolrConfig solrConfig, String 
collection) {
+this.solrConfig = solrConfig;
+this.collection = collection;
+}
+
+@Override
+public void buildSchema() throws IOException {
+SolrClient solrClient = null;
+try {
+if (solrConfig.enableKerberos())
+HttpClientUtil.setConfigurer(new 
Krb5HttpClientConfigurer());
+
+solrClient = new CloudSolrClient(solrConfig.getZkHostString());
--- End diff --

The initial code was building the schema from the JSON representation. The 
other class has been deprecated, which means that JSON is no longer supported. 
Is there a reason to support both? If so, there should probably be a factory 
that depending on configuration choice (e.g. Kerberos) would build one or 
another.

The goal of using JSON was to avoid using all of this programatic setting. 


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120398
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
 ---
@@ -153,4 +160,15 @@ private void failQueuedTuples(List 
failedTuples) {
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) { }
 
+@Override
+public void cleanup() {
+if (solrClient != null) {
+try {
+solrClient.close();
+} catch (IOException e) {
+LOG.debug("Error while closing solrClient", e);
--- End diff --

should it be error level?


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120315
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
 ---
@@ -88,9 +92,11 @@ private int capacity() {
 @Override
 protected void process(Tuple tuple) {
 try {
+LOG.debug("Processing Tuple: {}", tuple);
--- End diff --

Storm provides these debug log messages when the Config has setDebug(true). 
Therefore we don't usually add these types of logging.


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120430
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/schema/SolrFieldTypeFinder.java
 ---
@@ -70,13 +73,22 @@ public String toString() {
 /**
  * Initiates class containing all the information relating fields with 
their types.
  * This information is parsed from the schema
- * @param schema SolrSchema containing the information about fields 
and types
+ * @param schemaBuilder schemaBuilder to build the information about 
fields and types
  * */
-public SolrFieldTypeFinder(Schema schema) {
-if (schema == null) {
-throw new IllegalArgumentException("Schema object is null");
+public SolrFieldTypeFinder(SchemaBuilder schemaBuilder) {
+this.schemaBuilder = schemaBuilder;
+}
+
+public void initialize() {
+if (schemaBuilder == null) {
+throw new IllegalArgumentException("schemaBuilder object is 
null");
--- End diff --

Should this IllegalArgumentException  validation be done in the constructor 
to avoid this exception from occurring at runtime?  If it is intended to be 
done here, perhaps it should be IllegalStateException.


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120320
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
 ---
@@ -128,6 +134,7 @@ private void fail(Tuple tuple, Exception e) {
 List failedTuples = getQueuedTuples();
 failQueuedTuples(failedTuples);
 }
+LOG.debug("Failed Tuple: {}", tuple, e);
--- End diff --

Same as above


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120287
  
--- Diff: external/storm-solr/README.md ---
@@ -171,7 +194,7 @@ Querying  Solr for these patterns, you will see the 
values that have been indexe
 
 curl -X GET -H "Content-type:application/json" -H 
"Accept:application/json" 
http://localhost:8983/solr/gettingstarted_shard1_replica2/select?q=*id_fields_test_val*=json=true
 
-curl -X GET -H "Content-type: application/json" -H "Accept: 
application/json" 
http://localhost:8983/solr/gettingstarted_shard1_replica2/select?q=*id_fields_test_val*=json=true
+curl -X GET -H "Content-type:application/json" -H 
"Accept:application/json" 
http://localhost:8983/solr/gettingstarted_shard1_replica2/select?q=*json_test_val*=json=true
--- End diff --

Was this a bug?


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120311
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/bolt/SolrUpdateBolt.java
 ---
@@ -88,9 +92,11 @@ private int capacity() {
 @Override
 protected void process(Tuple tuple) {
 try {
+LOG.debug("Processing Tuple: {}", tuple);
 SolrRequest request = solrMapper.toSolrRequest(tuple);
 solrClient.request(request, solrMapper.getCollection());
 ack(tuple);
+LOG.debug("Acked Tuple: {}", tuple);
--- End diff --

Same as above


---


[GitHub] storm pull request #2467: STORM-2860: Add Kerberos support to Solr bolt

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2467#discussion_r159120406
  
--- Diff: 
external/storm-solr/src/main/java/org/apache/storm/solr/config/SolrConfig.java 
---
@@ -54,4 +65,7 @@ public int getTickTupleInterval() {
 return tickTupleInterval;
 }
 
+public boolean enableKerberos() {
--- End diff --

NIT: IsKerberosEnabled() or isEnableKerberos() ?


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159119938
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier<Map<TopicPartition, OffsetManager>> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
--- End diff --

What's the reasoning behind passing Supplier rather than the actual object?


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159119896
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier<Map<TopicPartition, OffsetManager>> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+this.offsetManagerSupplier = offsetManagerSupplier;
+this.consumerSupplier = consumerSupplier;
+}
+
+@Override
+public Object getValueAndReset() {
+
+Map<TopicPartition, OffsetManager> offsetManagers = 
offsetManagerSupplier.get();
+KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
+return null;
+}
+
+Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
+Set topicPartitions = offsetManagers.keySet();
+
+Map<TopicPartition, Long> beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
+Map<TopicPartition, Long> endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
+Map<String, Long> result = new HashMap<>();
--- End diff --

it would be useful to have a comment saying what is in this result map


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159119878
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class KafkaOffsetMetric implements IMetric {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
+private final Supplier<Map<TopicPartition, OffsetManager>> 
offsetManagerSupplier;
+private final Supplier consumerSupplier;
+
+public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
+this.offsetManagerSupplier = offsetManagerSupplier;
+this.consumerSupplier = consumerSupplier;
+}
+
+@Override
+public Object getValueAndReset() {
+
+Map<TopicPartition, OffsetManager> offsetManagers = 
offsetManagerSupplier.get();
+KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
+LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
--- End diff --

Should this be INFO level? Is this going to print this message periodically?


---


[GitHub] storm pull request #2480: STORM-2867: Add consumer lag metrics to KafkaSpout

2017-12-30 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r159119706
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -739,4 +764,9 @@ public boolean shouldPoll() {
 return !this.pollablePartitions.isEmpty();
 }
 }
+
+@VisibleForTesting
+KafkaOffsetMetric getKafkaOffsetMetric() {
--- End diff --

If we start adding a lot of these test methods we would be better off but 
creating a class in the tests packaged called KafkaSpoutTest that extends 
KafkaSpout and use that one in the tests. All of these methods should go in 
this class. WE don't want this class to be very bloated.


---


[GitHub] storm issue #2466: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-25 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2466
  
@srdo Thanks for the review. I have addressed the last few things and 
squashed the commits right away. It is ready to merge.


---


[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-25 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2465
  
@srdo Thanks for the review. I have squashed the commits and it is ready to 
merge.


---


[GitHub] storm pull request #2480: [WIP] STORM-2867: Add consumer lag metrics to Kafk...

2017-12-25 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/2480#discussion_r158656447
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/KafkaUtils.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class KafkaUtils {
--- End diff --

Why the need of the KafkaUtils wrapper class? Why not just have the 
KafkaOffsetMetric class ?


---


[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-24 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2465
  
@srdo I would like to merge this patch asap. Can you please take a quick 
look such that I can squash and merge it in. Thanks.


---


[GitHub] storm issue #2466: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-23 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2466
  
@srdo these are all the changes in 1.x-branch already squashed. It is ready 
to merge.


---


[GitHub] storm issue #2465: STORM-2844: KafkaSpout Throws IllegalStateException After...

2017-12-23 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/2465
  
@srdo done. Pls check and I will squash the commits right away. I would 
like to try to merge this in today. I will update the master PR with everything 
squashed already. Thanks.


---


  1   2   3   4   5   6   7   8   >