[GitHub] storm issue #1815: STORM-2235 Introduce new option: 'add remote repositories...

2016-12-07 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1815
  
+1 (non-binding)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1817: STORM-2237: Nimbus reports bad supervisor heartbeat - unk...

2016-12-07 Thread knusbaum
Github user knusbaum commented on the issue:

https://github.com/apache/storm/pull/1817
  
I'll have a version up for 2.x as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1817: STORM-2237:Nimbus reports bad supervisor heartbeat...

2016-12-07 Thread knusbaum
GitHub user knusbaum opened a pull request:

https://github.com/apache/storm/pull/1817

STORM-2237:Nimbus reports bad supervisor heartbeat - unknown version or 
thrift exception



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/knusbaum/incubator-storm STORM-2237

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1817.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1817


commit 6a2241d9795e0582bb5b1fb9bb469665247f3a72
Author: Kyle Nusbaum 
Date:   2016-12-07T23:41:40Z

Fixing bad supervisor bug and adding list of bad supervisors to the UI.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1808
  
@srdo for me backwards compatibility for 1.x is more a question of 
violating out versioning than anything else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1406: [STORM-433] [WIP] Executor queue backlog metric

2016-12-07 Thread erikdw
Github user erikdw commented on the issue:

https://github.com/apache/storm/pull/1406
  
@abhishekagarwal87 & @HeartSaVioR : any update on this?  Getting visibility 
into queue depth in storm is a major reason for us to even consider upgrading 
off of the storm-0.9.x branch.  Do you need help?  We can potentially dedicate 
time in our next quarter towards helping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

2016-12-07 Thread csivaguru
Github user csivaguru commented on a diff in the pull request:

https://github.com/apache/storm/pull/1816#discussion_r91385214
  
--- Diff: 
external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/jpmml/JpmmlFactory.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.pmml.runner.jpmml;
+
+import org.apache.storm.pmml.runner.ModelRunnerFactory;
+import org.dmg.pmml.IOUtil;
+import org.dmg.pmml.PMML;
+import org.jpmml.evaluator.Evaluator;
+import org.jpmml.evaluator.ModelEvaluator;
+import org.jpmml.evaluator.ModelEvaluatorFactory;
+import org.jpmml.manager.PMMLManager;
+import org.xml.sax.SAXException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+
+import javax.xml.bind.JAXBException;
+
+/*
+ * This class consists exclusively of static factory methods that create
+ * object instances that are essential to work with the Jpmml library
+ */
+public class JpmmlFactory {
+/**
+ * Creates a new {@link PMML} object representing the PMML model 
defined in the XML {@link File} specified as argument
+ */
+public static PMML newPmml(File file) throws JAXBException, 
SAXException, IOException {
--- End diff --

Nit: Consider adding support for getting newPmml from an input stream as 
well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java

2016-12-07 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/1800
  
@revans2 Okay. Let me review the packaging changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1816: STORM-2223: PMMLBolt

2016-12-07 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/1816
  
Also, what about unit tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1816: STORM-2223: PMMLBolt

2016-12-07 Thread ptgoetz
Github user ptgoetz commented on the issue:

https://github.com/apache/storm/pull/1816
  
Looks like a good start, but it really needs some documentation. It would 
also be helpful to include a sample model + CSV data, without that it's not 
very clear how to run the example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1807: fix NullPointException with acked.get(rtp)

2016-12-07 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1807
  
@cutd Great. Could you update the PR so it changes `fail` instead of 
`doSeekRetriableTopicPartitions`? You might also want to create an issue at 
https://issues.apache.org/jira/browse/STORM so this fix can be tracked :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java

2016-12-07 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1800
  
@ptgoetz I have updated the packaging to have a separate directory for the 
DRPC server dependencies.  I have run manual tests and everything works.  The 
big difference is that {{apache-storm-2.0.0-SNAPSHOT.tar.gz}} and 
{{apache-storm-2.0.0-SNAPSHOT.zip}} are now under {{./final-package/target/}} 
instead of {{./target}}  I will try and look to see if there are any docs that 
I need to update around this, but I wanted to give everyone a chance to look 
and give feedback on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1816: STORM-2223: PMMLBolt

2016-12-07 Thread hmcl
Github user hmcl commented on the issue:

https://github.com/apache/storm/pull/1816
  
@vesense I addressed your serialization comment. I had to do some 
refactoring of the code because it was a strong requirement to enforce 
`ModelRunner` to be serializable. For instance, the JPMML implementation has 
several non serializable objects that really made this approach impossible. 
Rather I extract the output fields from the PMML model, and pass them to the 
bolt directly. The runner is then created in the prepare method. I will upload 
the README in bit.

@harshach @HeartSaVioR can you please take a look as well. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1696: STORM-2104: More graceful handling of acked/failed tuples...

2016-12-07 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1696
  
@revans2 ping for review if you have time. I'd like to get this in before 
too long if possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/1808
  
I like the new configuration design. Backward compatibility is not an issue 
for us. I'm wondering if enough people have even switched to the new spout yet 
to make backward compatibility for 1.x a must.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91354645
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
+User could specify the field name or field index in tuple ,selector will 
use this value as topic name which to publish message.
+When the topic name not found , `KafkaBolt` will write messages into 
default topic .
--- End diff --

Nit: I think it would be clearer if this referred to Field*TopicSelector 
instead of KafkaBolt, I misunderstood it on first read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91365059
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * Translates a Kafka ConsumerRecord into a tuple
+ * use {@link KafkaTuple} if you want to route your tuples
--- End diff --

Nit: I think it's a little clearer to say "Translate to KafkaTuple if..." 
or "Return a KafkaTuple if..."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91350461
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
+User could specify the field name or field index in tuple ,selector will 
use this value as topic name which to publish message.
+When the topic name not found , `KafkaBolt` will write messages into 
default topic .
+Please make sure the default topic have created .
+
+### Specifying Kafka producer properties
+You can provide all the produce properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a Kafka Spout
+After this you can specify a wildcard topic 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91367220
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.Testing;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.MkTupleParam;
+import org.apache.storm.tuple.Tuple;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaBoltTest {
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
+
+@SuppressWarnings({ "unchecked", "serial" })
+@Test
+public void testSimple() {
+final KafkaProducer producer = 
mock(KafkaProducer.class);
+when(producer.send(any(), any())).thenAnswer(new Answer() {
+@Override
+public Object answer(InvocationOnMock invocation) throws 
Throwable {
+Callback c = (Callback)invocation.getArguments()[1];
+c.onCompletion(null, null);
+return null;
+}
+});
+KafkaBolt bolt = new KafkaBolt() {
--- End diff --

Nit: Can leave off the contents of `<>` on the right hand side in a few 
places here 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91355497
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ * 
+ * This bolt uses 0.8.2 Kafka Producer API.
+ * 
+ * It works for sending tuples to older Kafka version (0.8.1).
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+/**
+ * With default setting for fireAndForget and async, the callback is 
called when the sending succeeds.
+ * By setting fireAndForget true, the send will not wait at all for 
kafka to ack.
+ * "acks" setting in 0.8.2 Producer API config doesn't matter if 
fireAndForget is set.
+ * By setting async false, synchronous sending is used. 
+ */
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+throw 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91364222
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * Translates a Kafka ConsumerRecord into a tuple
--- End diff --

Consider adding a period at the end here, when shown as javadoc this is 
going to look weird otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91351206
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
+User could specify the field name or field index in tuple ,selector will 
use this value as topic name which to publish message.
+When the topic name not found , `KafkaBolt` will write messages into 
default topic .
+Please make sure the default topic have created .
+
+### Specifying Kafka producer properties
+You can provide all the produce properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a Kafka Spout
+After this you can specify a wildcard topic 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91362663
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -60,123 +68,197 @@
  * If no offset has been committed, it behaves as LATEST.
  * 
  * */
-public enum FirstPollOffsetStrategy {
+public static enum FirstPollOffsetStrategy {
 EARLIEST,
 LATEST,
 UNCOMMITTED_EARLIEST,
 UNCOMMITTED_LATEST }
-
-// Kafka consumer configuration
-private final Map kafkaProps;
-private final Deserializer keyDeserializer;
-private final Deserializer valueDeserializer;
-private final long pollTimeoutMs;
-
-// Kafka spout configuration
-private final long offsetCommitPeriodMs;
-private final int maxRetries;
-private final int maxUncommittedOffsets;
-private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-private final KafkaSpoutStreams kafkaSpoutStreams;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-private KafkaSpoutConfig(Builder builder) {
-this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-this.keyDeserializer = builder.keyDeserializer;
-this.valueDeserializer = builder.valueDeserializer;
-this.pollTimeoutMs = builder.pollTimeoutMs;
-this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-this.maxRetries = builder.maxRetries;
-this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
-this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-this.tuplesBuilder = builder.tuplesBuilder;
-this.retryService = builder.retryService;
+
+public static Builder builder(String bootstrapServers, 
String ... topics) {
+return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
 }
-
-private Map setDefaultsAndGetKafkaProps(Map kafkaProps) {
+
+public static Builder builder(String bootstrapServers, 
Collection topics) {
+return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
+}
+
+public static Builder builder(String bootstrapServers, 
Pattern topics) {
+return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
+}
+
+private static Map 
setDefaultsAndGetKafkaProps(Map kafkaProps) {
 // set defaults for properties not specified
-if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
-kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+if (!kafkaProps.containsKey(ENABLE_AUTO_COMMIT_CONF)) {
+kafkaProps.put(ENABLE_AUTO_COMMIT_CONF, "false");
 }
 return kafkaProps;
 }
-
+
 public static class Builder {
 private final Map kafkaProps;
-private Deserializer keyDeserializer;
-private Deserializer valueDeserializer;
+private Subscription subscription;
+private final Deserializer keyDes;
+private final Deserializer valueDes;
+private RecordTranslator translator;
 private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
 private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
 private int maxRetries = DEFAULT_MAX_RETRIES;
 private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-private final KafkaSpoutStreams kafkaSpoutStreams;
 private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-/**
- * Please refer to javadoc in {@link #Builder(Map, 
KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.
- * This constructor uses by the default the following 
implementation for {@link KafkaSpoutRetryService}:
- * {@code new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
- *   DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
- */
-public Builder(Map kafkaProps, KafkaSpoutStreams 
kafkaSpoutStreams,
   

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91360488
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -18,34 +18,42 @@
 
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
-import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
-
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+import java.util.function.Function;
 import java.util.regex.Pattern;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.tuple.Fields;
+
 /**
  * KafkaSpoutConfig defines the required configuration to connect a 
consumer to a consumer group, as well as the subscribing topics
  */
 public class KafkaSpoutConfig implements Serializable {
+private static final long serialVersionUID = 141902646130682494L;
+private static final String BOOTSTRAP_SERVERS_CONF = 
"bootstrap.servers";
--- End diff --

Nit: These constants are already defined in 
`org.apache.kafka.clients.consumer.ConsumerConfig`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91365926
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -185,7 +178,9 @@ private long doSeek(TopicPartition tp, 
OffsetAndMetadata committedOffset) {
 }
 
 private Collection toArrayList(final TopicPartition 
tp) {
-return new ArrayList(1){{add(tp);}};
+ArrayList ret = new ArrayList<>(1);
--- End diff --

Nit: Unless you need to modify the return value later, you can just return 
Collections.singleton(tp)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91357255
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
 ---
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field index to select topic name from tuple .
--- End diff --

Nit: Maybe a little clearer to say "Uses the field with index..."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91356935
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
 ---
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.mapper;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+/**
+ * as the really verbose name suggests this interface mapps a storm tuple 
to kafka key and message.
--- End diff --

mapps -> maps


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91367555
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.junit.Test;
+
+public class ByTopicRecordTranslatorTest {
+
+@Test
+public void testBasic() {
+ByTopicRecordTranslator trans = 
+new ByTopicRecordTranslator<>((r) -> new Values(r.key()), 
new Fields("key"));
+trans.forTopic("TOPIC 1", (r) -> new Values(r.value()), new 
Fields("value"), "value-stream");
+trans.forTopic("TOPIC 2", (r) -> new Values(r.key(), r.value()), 
new Fields("key", "value"), "key-value-stream");
+HashSet expectedStreams = new HashSet<>();
+expectedStreams.add("default");
+expectedStreams.add("value-stream");
+expectedStreams.add("key-value-stream");
+assertEquals(expectedStreams, new HashSet<>(trans.streams()));
+
+ConsumerRecord cr1 = new ConsumerRecord<>("TOPIC 
OTHER", 100, 100, "THE KEY", "THE VALUE");
+assertEquals(new Fields("key"), trans.getFieldsFor("default"));
+assertEquals(Arrays.asList("THE KEY"), trans.apply(cr1));
+
+ConsumerRecord cr2 = new ConsumerRecord<>("TOPIC 
1", 100, 100, "THE KEY", "THE VALUE");
+assertEquals(new Fields("value"), 
trans.getFieldsFor("value-stream"));
+assertEquals(Arrays.asList("THE VALUE"), trans.apply(cr2));
+
+ConsumerRecord cr3 = new ConsumerRecord<>("TOPIC 
2", 100, 100, "THE KEY", "THE VALUE");
+assertEquals(new Fields("key", "value"), 
trans.getFieldsFor("key-value-stream"));
+assertEquals(Arrays.asList("THE KEY", "THE VALUE"), 
trans.apply(cr3));
+}
+
+@Test(expected = IllegalArgumentException.class)
+public void testFiledCollision() {
--- End diff --

filed -> field


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91362733
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -60,123 +68,197 @@
  * If no offset has been committed, it behaves as LATEST.
  * 
  * */
-public enum FirstPollOffsetStrategy {
+public static enum FirstPollOffsetStrategy {
 EARLIEST,
 LATEST,
 UNCOMMITTED_EARLIEST,
 UNCOMMITTED_LATEST }
-
-// Kafka consumer configuration
-private final Map kafkaProps;
-private final Deserializer keyDeserializer;
-private final Deserializer valueDeserializer;
-private final long pollTimeoutMs;
-
-// Kafka spout configuration
-private final long offsetCommitPeriodMs;
-private final int maxRetries;
-private final int maxUncommittedOffsets;
-private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-private final KafkaSpoutStreams kafkaSpoutStreams;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-private KafkaSpoutConfig(Builder builder) {
-this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-this.keyDeserializer = builder.keyDeserializer;
-this.valueDeserializer = builder.valueDeserializer;
-this.pollTimeoutMs = builder.pollTimeoutMs;
-this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-this.maxRetries = builder.maxRetries;
-this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
-this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-this.tuplesBuilder = builder.tuplesBuilder;
-this.retryService = builder.retryService;
+
+public static Builder builder(String bootstrapServers, 
String ... topics) {
+return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
 }
-
-private Map setDefaultsAndGetKafkaProps(Map kafkaProps) {
+
+public static Builder builder(String bootstrapServers, 
Collection topics) {
+return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
+}
+
+public static Builder builder(String bootstrapServers, 
Pattern topics) {
+return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
+}
+
+private static Map 
setDefaultsAndGetKafkaProps(Map kafkaProps) {
 // set defaults for properties not specified
-if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
-kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+if (!kafkaProps.containsKey(ENABLE_AUTO_COMMIT_CONF)) {
+kafkaProps.put(ENABLE_AUTO_COMMIT_CONF, "false");
 }
 return kafkaProps;
 }
-
+
 public static class Builder {
 private final Map kafkaProps;
-private Deserializer keyDeserializer;
-private Deserializer valueDeserializer;
+private Subscription subscription;
+private final Deserializer keyDes;
+private final Deserializer valueDes;
+private RecordTranslator translator;
 private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
 private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
 private int maxRetries = DEFAULT_MAX_RETRIES;
 private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-private final KafkaSpoutStreams kafkaSpoutStreams;
 private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-/**
- * Please refer to javadoc in {@link #Builder(Map, 
KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.
- * This constructor uses by the default the following 
implementation for {@link KafkaSpoutRetryService}:
- * {@code new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
- *   DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
- */
-public Builder(Map kafkaProps, KafkaSpoutStreams 
kafkaSpoutStreams,
   

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91362618
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -60,123 +68,197 @@
  * If no offset has been committed, it behaves as LATEST.
  * 
  * */
-public enum FirstPollOffsetStrategy {
+public static enum FirstPollOffsetStrategy {
 EARLIEST,
 LATEST,
 UNCOMMITTED_EARLIEST,
 UNCOMMITTED_LATEST }
-
-// Kafka consumer configuration
-private final Map kafkaProps;
-private final Deserializer keyDeserializer;
-private final Deserializer valueDeserializer;
-private final long pollTimeoutMs;
-
-// Kafka spout configuration
-private final long offsetCommitPeriodMs;
-private final int maxRetries;
-private final int maxUncommittedOffsets;
-private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-private final KafkaSpoutStreams kafkaSpoutStreams;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-private KafkaSpoutConfig(Builder builder) {
-this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-this.keyDeserializer = builder.keyDeserializer;
-this.valueDeserializer = builder.valueDeserializer;
-this.pollTimeoutMs = builder.pollTimeoutMs;
-this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-this.maxRetries = builder.maxRetries;
-this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
-this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-this.tuplesBuilder = builder.tuplesBuilder;
-this.retryService = builder.retryService;
+
+public static Builder builder(String bootstrapServers, 
String ... topics) {
+return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
 }
-
-private Map setDefaultsAndGetKafkaProps(Map kafkaProps) {
+
+public static Builder builder(String bootstrapServers, 
Collection topics) {
+return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
+}
+
+public static Builder builder(String bootstrapServers, 
Pattern topics) {
+return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
+}
+
+private static Map 
setDefaultsAndGetKafkaProps(Map kafkaProps) {
 // set defaults for properties not specified
-if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
-kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+if (!kafkaProps.containsKey(ENABLE_AUTO_COMMIT_CONF)) {
+kafkaProps.put(ENABLE_AUTO_COMMIT_CONF, "false");
 }
 return kafkaProps;
 }
-
+
 public static class Builder {
 private final Map kafkaProps;
-private Deserializer keyDeserializer;
-private Deserializer valueDeserializer;
+private Subscription subscription;
+private final Deserializer keyDes;
+private final Deserializer valueDes;
+private RecordTranslator translator;
 private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
 private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
 private int maxRetries = DEFAULT_MAX_RETRIES;
 private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-private final KafkaSpoutStreams kafkaSpoutStreams;
 private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-/**
- * Please refer to javadoc in {@link #Builder(Map, 
KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.
- * This constructor uses by the default the following 
implementation for {@link KafkaSpoutRetryService}:
- * {@code new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
- *   DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
- */
-public Builder(Map kafkaProps, KafkaSpoutStreams 
kafkaSpoutStreams,
   

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91351118
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
+User could specify the field name or field index in tuple ,selector will 
use this value as topic name which to publish message.
+When the topic name not found , `KafkaBolt` will write messages into 
default topic .
+Please make sure the default topic have created .
+
+### Specifying Kafka producer properties
+You can provide all the produce properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a Kafka Spout
+After this you can specify a wildcard topic 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91366483
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
 ---
@@ -44,29 +40,30 @@
 // Bookkeeping
 private final KafkaSpoutConfig kafkaSpoutConfig;
 // Declare some KafkaSpoutConfig references for convenience
-private KafkaSpoutStreams kafkaSpoutStreams;// Object 
that wraps all the logic to declare output fields and emit tuples
-private KafkaSpoutTuplesBuilder tuplesBuilder;// Object 
that contains the logic to build tuples for each ConsumerRecord
+private final Fields fields;
 
 public KafkaTridentSpoutManager(KafkaSpoutConfig 
kafkaSpoutConfig) {
 this.kafkaSpoutConfig = kafkaSpoutConfig;
-kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
-tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder();
+RecordTranslator translator = 
kafkaSpoutConfig.getTranslator();
+Fields fields = null;
+for (String stream: translator.streams()) {
+if (fields == null) {
+fields = translator.getFieldsFor(stream);
+} else {
+if (!fields.equals(translator.getFieldsFor(stream))) {
+throw new IllegalArgumentException("Trident Spouts do 
nut support multiple output Fields");
--- End diff --

Nit: nut -> not. Also maybe rephrase as "must have the same fields for all 
streams"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91366137
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 ---
@@ -152,7 +152,9 @@ private long seek(TopicPartition tp, 
KafkaTridentSpoutBatchMetadata lastBa
 }
 
 private Collection toArrayList(final TopicPartition 
tp) {
-return new ArrayList(1){{add(tp);}};
+ArrayList ret = new ArrayList<>(1);
--- End diff --

Nit: Same as in KafkaSpout


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91361471
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -60,123 +68,197 @@
  * If no offset has been committed, it behaves as LATEST.
  * 
  * */
-public enum FirstPollOffsetStrategy {
+public static enum FirstPollOffsetStrategy {
 EARLIEST,
 LATEST,
 UNCOMMITTED_EARLIEST,
 UNCOMMITTED_LATEST }
-
-// Kafka consumer configuration
-private final Map kafkaProps;
-private final Deserializer keyDeserializer;
-private final Deserializer valueDeserializer;
-private final long pollTimeoutMs;
-
-// Kafka spout configuration
-private final long offsetCommitPeriodMs;
-private final int maxRetries;
-private final int maxUncommittedOffsets;
-private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-private final KafkaSpoutStreams kafkaSpoutStreams;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-private KafkaSpoutConfig(Builder builder) {
-this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-this.keyDeserializer = builder.keyDeserializer;
-this.valueDeserializer = builder.valueDeserializer;
-this.pollTimeoutMs = builder.pollTimeoutMs;
-this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-this.maxRetries = builder.maxRetries;
-this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
-this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-this.tuplesBuilder = builder.tuplesBuilder;
-this.retryService = builder.retryService;
+
+public static Builder builder(String bootstrapServers, 
String ... topics) {
+return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
 }
-
-private Map setDefaultsAndGetKafkaProps(Map kafkaProps) {
+
+public static Builder builder(String bootstrapServers, 
Collection topics) {
+return new Builder<>(bootstrapServers, new StringDeserializer(), 
new StringDeserializer(), topics);
--- End diff --

I don't think this works when the topology is serialized. See 
https://github.com/apache/storm/pull/1696/files#diff-d2c28214a421176fceab415ffe4c95c0R24


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91353321
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ * 
+ * This bolt uses 0.8.2 Kafka Producer API.
+ * 
+ * It works for sending tuples to older Kafka version (0.8.1).
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+/**
+ * With default setting for fireAndForget and async, the callback is 
called when the sending succeeds.
--- End diff --

This seems a little hard to parse unless you also have the code open. I 
think it's clearer to describe this in terms of when tuples are acked/failed, 
rather than when the callback is called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91364347
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * Translates a Kafka ConsumerRecord into a tuple
+ * use {@link KafkaTuple} if you want to route your tuples
+ * to different streams.
+ */
+public interface RecordTranslator extends Serializable, 
Function, List> {
+public static final List DEFAULT_STREAM = 
Collections.unmodifiableList(Arrays.asList("default"));
--- End diff --

Collections.singletonList is an easier way to do this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91359755
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -290,8 +285,13 @@ private boolean 
emitTupleIfNotEmitted(ConsumerRecord record) {
 } else if (emitted.contains(msgId)) {   // has been emitted and 
it's pending ack or fail
 LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
 } else if (!retryService.isScheduled(msgId) || 
retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never 
emitted) or ready to be retried
-final List tuple = tuplesBuilder.buildTuple(record);
-kafkaSpoutStreams.emit(collector, tuple, msgId);
+//TODO cache translator
--- End diff --

Can't we get rid of this TODO now by putting the translator in a field?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91363994
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Subscribe to all topics that follow a given list of values
+ */
+public class NamedSubscription extends Subscription {
+private static final Logger LOG = 
LoggerFactory.getLogger(NamedSubscription.class);
+private static final long serialVersionUID = 3438543305215813839L;
+private final Collection topics;
+
+public NamedSubscription( Collection topics) {
+super();
+this.topics = Collections.unmodifiableCollection(new 
ArrayList<>(topics));
+}
+
+public NamedSubscription(String ... topics) {
+this(Arrays.asList(topics));
+}
+
+@Override
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener) {
+consumer.subscribe(topics, listener);
+LOG.info("Kafka consumer subscribed topics {}", topics);
+}
+
+@Override
+public String getTopicsString() {
+StringBuilder ret = new StringBuilder();
--- End diff --

Why not use 
https://docs.oracle.com/javase/8/docs/api/java/lang/String.html#join-java.lang.CharSequence-java.lang.Iterable-?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91352188
  
--- Diff: external/storm-kafka-client/README.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
--- End diff --

Why is this file both in docs/ and here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91349569
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
+User could specify the field name or field index in tuple ,selector will 
use this value as topic name which to publish message.
+When the topic name not found , `KafkaBolt` will write messages into 
default topic .
+Please make sure the default topic have created .
+
+### Specifying Kafka producer properties
+You can provide all the produce properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a Kafka Spout
+After this you can specify a wildcard topic 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91348211
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
+User could specify the field name or field index in tuple ,selector will 
use this value as topic name which to publish message.
+When the topic name not found , `KafkaBolt` will write messages into 
default topic .
+Please make sure the default topic have created .
+
+### Specifying Kafka producer properties
+You can provide all the produce properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
--- End diff --

produce -> producer


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91348040
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
+User could specify the field name or field index in tuple ,selector will 
use this value as topic name which to publish message.
+When the topic name not found , `KafkaBolt` will write messages into 
default topic .
--- End diff --

"not found" -> "is not found"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91350132
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
+User could specify the field name or field index in tuple ,selector will 
use this value as topic name which to publish message.
+When the topic name not found , `KafkaBolt` will write messages into 
default topic .
+Please make sure the default topic have created .
+
+### Specifying Kafka producer properties
+You can provide all the produce properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a Kafka Spout
+After this you can specify a wildcard topic 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91351636
  
--- Diff: 
examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
 ---
@@ -18,87 +18,48 @@
 
 package org.apache.storm.kafka.trident;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.AlreadyAliveException;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.InvalidTopologyException;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
+import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics;
-import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager;
 import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-
 public class TridentKafkaClientWordCountNamedTopics {
 private static final String TOPIC_1 = "test-trident";
 private static final String TOPIC_2 = "test-trident-1";
 private static final String KAFKA_LOCAL_BROKER = "localhost:9092";
 
 private KafkaTridentSpoutOpaque 
newKafkaTridentSpoutOpaque() {
-return new KafkaTridentSpoutOpaque<>(new 
KafkaTridentSpoutManager<>(
-newKafkaSpoutConfig(
-newKafkaSpoutStreams(;
+return new KafkaTridentSpoutOpaque<>(newKafkaSpoutConfig());
 }
 
-private KafkaSpoutConfig 
newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
-return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(),
-kafkaSpoutStreams, newTuplesBuilder(), 
newRetryService())
+protected KafkaSpoutConfig newKafkaSpoutConfig() {
+return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPIC_1, TOPIC_2)
--- End diff --

Consider using KAFKA_LOCAL_BROKER here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91345785
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
--- End diff --

Nitpick: Compatibility is misspelled


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91353717
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ * 
+ * This bolt uses 0.8.2 Kafka Producer API.
+ * 
+ * It works for sending tuples to older Kafka version (0.8.1).
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
--- End diff --

Specfied -> Specified


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91348095
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
+User could specify the field name or field index in tuple ,selector will 
use this value as topic name which to publish message.
+When the topic name not found , `KafkaBolt` will write messages into 
default topic .
+Please make sure the default topic have created .
--- End diff --

have -> has been


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91347833
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
--- End diff --

Phrasing is a little awkward here. Maybe something like "... can be used to 
decide which topic a message should go to based on a field in the tuple" 
instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91346392
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
--- End diff --

Nitpick: and -> an


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java

2016-12-07 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1800
  
One more option, still ugly, but with a lot less impact.  I can do 
something similar to Hadoop.  They use several different invocations of the 
maven assembly plugin into directories (predates moduleSets) and then have a 
separate script that produces the final release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1800: STORM-2217: Make DRPC pure java

2016-12-07 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1800
  
So shading is not really an option for jersey.
I was able to split the DRPC server off into its own package with tests, 
but packaging it up with the assembly plugin is proving to be difficult.  If I 
upgrade to 3.0.0 of the assembly plugin I can use moduleSets and make it work, 
but I have to change {{storm-dist/binary}} to a multi-module build and move the 
code that actually packages the final release to a sub package under it.  Oh 
and we leak two empty/useless jar files into the release package that should be 
ignored.  moduleSets do not package pom modules.  They require an artifact that 
is a file.

I really dislike all of these options.  I see a few cleaner options, but 
they will require a lot more work.

1.  Move to gradle
2. Change how we do shading so that the assembly subModule code works the 
way they intended it.
This would involve essentially having a separate package/build for what 
we want shaded (i.e. storm-shaded-deps.jar)

If someone has a cleaner solution I am happy to do it, but I think option 2 
is the best so far, although I don't like it all that much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1815: STORM-2235 Introduce new option: 'add remote repositories...

2016-12-07 Thread harshach
Github user harshach commented on the issue:

https://github.com/apache/storm/pull/1815
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1816: STORM-2223: PMMLBolt

2016-12-07 Thread vesense
Github user vesense commented on the issue:

https://github.com/apache/storm/pull/1816
  
@hmcl Code looks good. Left one comment. I think it is also necessary to 
add a README file to let people know how to use it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1816: STORM-2223: PMMLBolt

2016-12-07 Thread vesense
Github user vesense commented on a diff in the pull request:

https://github.com/apache/storm/pull/1816#discussion_r91297902
  
--- Diff: 
external/storm-pmml/src/main/java/org/apache/storm/pmml/runner/ModelRunner.java 
---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.pmml.runner;
+
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.List;
+
+public interface ModelRunner {
--- End diff --

Since bolt can be executed in distributed environment, `ModelRunner` 
interface should `extends Serializable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1816: STORM-2223: PMMLBolt

2016-12-07 Thread vesense
Github user vesense commented on the issue:

https://github.com/apache/storm/pull/1816
  
@harshach Thanks for your detailed explanations. Yes, this is a good start. 
I'm +1 for adding this in.  This might attract many people who interest NLP, ML.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---