[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-02-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/1808


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-23 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r97367473
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
 ---
@@ -1,16 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.common.TopicPartition;
-
 import java.util.Comparator;
 
+import org.apache.kafka.common.TopicPartition;
+
 public class TopicPartitionComparator implements 
Comparator {
-@Override
-public int compare(TopicPartition o1, TopicPartition o2) {
-if (!o1.topic().equals(o2.topic())) {
-return o1.topic().compareTo(o2.topic());
-} else {
-return o1.partition() - o2.partition();
-}
-}
+   public static final TopicPartitionComparator INSTANCE = new 
TopicPartitionComparator();
--- End diff --

@hmcl Neat, hadn't considered those arguments for using enum.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-23 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r97334512
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -447,7 +440,10 @@ private void shutdown() {
 
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
-kafkaSpoutStreams.declareOutputFields(declarer);
+RecordTranslator translator = 
kafkaSpoutConfig.getTranslator();
--- End diff --

I agree I think we need a fairly standard way to Tuple->X and X->Tuple.  
But really it would just end up looking like

```java
public interface  TupleToX {
public X fromTuple(Tuple t);
}

public interface X XToTuple {
public Tuple toTuple(X record);
}
```

But I don't know how much code reuse there could be, because X is so 
different for each library we are going to interface with.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-23 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r97333183
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
 ---
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field name to select topic name from tuple .
+ */
+public class FieldNameTopicSelector implements KafkaTopicSelector {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FieldNameTopicSelector.class);
+
+private final String fieldName;
+private final String defaultTopicName;
+
+
+public FieldNameTopicSelector(String fieldName, String 
defaultTopicName) {
+this.fieldName = fieldName;
+this.defaultTopicName = defaultTopicName;
+}
+
+@Override
+public String getTopic(Tuple tuple) {
--- End diff --

I filed https://issues.apache.org/jira/browse/STORM-2318 for this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-23 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r97331194
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
 ---
@@ -1,16 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.common.TopicPartition;
-
 import java.util.Comparator;
 
+import org.apache.kafka.common.TopicPartition;
+
 public class TopicPartitionComparator implements 
Comparator {
-@Override
-public int compare(TopicPartition o1, TopicPartition o2) {
-if (!o1.topic().equals(o2.topic())) {
-return o1.topic().compareTo(o2.topic());
-} else {
-return o1.partition() - o2.partition();
-}
-}
+   public static final TopicPartitionComparator INSTANCE = new 
TopicPartitionComparator();
--- End diff --

@hmcl I agree I went half way to making it a true singleton and I should 
finish the job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-23 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r97332427
  
--- Diff: 
external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java ---
@@ -51,7 +51,9 @@
  * This bolt uses 0.8.2 Kafka Producer API.
  * 
  * It works for sending tuples to older Kafka version (0.8.1).
+ * @deprecated Please use the KafkaBolt in storm-kafka-client
  */
+@Deprecated
--- End diff --

I filed https://issues.apache.org/jira/browse/STORM-2317 to do this and I 
assigned it to myself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-20 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r97187057
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
 ---
@@ -1,16 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.common.TopicPartition;
-
 import java.util.Comparator;
 
+import org.apache.kafka.common.TopicPartition;
+
 public class TopicPartitionComparator implements 
Comparator {
-@Override
-public int compare(TopicPartition o1, TopicPartition o2) {
-if (!o1.topic().equals(o2.topic())) {
-return o1.topic().compareTo(o2.topic());
-} else {
-return o1.partition() - o2.partition();
-}
-}
+   public static final TopicPartitionComparator INSTANCE = new 
TopicPartitionComparator();
--- End diff --

@srdo that is not true. See Effective Java Item 3. Yes, private constructor 
would work.

@revans2 either way is fine. My comment was motivated because I believe 
that either we should make it a singleton or not. If not, let the users simple 
instantiate it with new and that's it. I am a believer in consistency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96995077
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
 ---
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+String getTopic(Tuple tuple);
--- End diff --

IMHO we need to try avoiding JDK 8 feature for public API unless we decided 
to release minor version only for 2.0.0. But anyway I also think this is beyond 
of the scope.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96951710
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState implements State {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaState.class);
+
+private KafkaProducer producer;
+private OutputCollector collector;
+
+private TridentTupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+
+public TridentKafkaState 
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector 
selector) {
+this.topicSelector = selector;
+return this;
+}
+
+@Override
+public void beginCommit(Long txid) {
+LOG.debug("beginCommit is Noop.");
+}
+
+@Override
+public void commit(Long txid) {
+LOG.debug("commit is Noop.");
+}
+
+public void prepare(Properties options) {
+if (mapper == null) throw new NullPointerException("mapper can not 
be null");
--- End diff --

@srdo you are right.

```
maven-compiler-plugin

1.7
1.7

```
I'll update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96926665
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState implements State {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaState.class);
+
+private KafkaProducer producer;
+private OutputCollector collector;
+
+private TridentTupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+
+public TridentKafkaState 
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector 
selector) {
+this.topicSelector = selector;
+return this;
+}
+
+@Override
+public void beginCommit(Long txid) {
+LOG.debug("beginCommit is Noop.");
+}
+
+@Override
+public void commit(Long txid) {
+LOG.debug("commit is Noop.");
+}
+
+public void prepare(Properties options) {
+if (mapper == null) throw new NullPointerException("mapper can not 
be null");
--- End diff --

I thought 1.x was targeting at least Java 7?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96925318
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
 ---
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class FieldNameBasedTupleToKafkaMapper implements 
TridentTupleToKafkaMapper {
+
+public final String keyFieldName;
+public final String msgFieldName;
+
+public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String 
msgFieldName) {
+this.keyFieldName = keyFieldName;
+this.msgFieldName = msgFieldName;
+}
+
+@Override
+public K getKeyFromTuple(TridentTuple tuple) {
+return (K) tuple.getValueByField(keyFieldName);
+}
+
+@Override
+public V getMessageFromTuple(TridentTuple tuple) {
--- End diff --

It is possible but that is the point of the generics. To try and reduce the 
likelihood of it happening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96925159
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
 ---
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+String getTopic(Tuple tuple);
--- End diff --

File another JIRA and we can look into it.  I think it is beyond the scope 
of this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96925279
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
 ---
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class FieldNameBasedTupleToKafkaMapper implements 
TridentTupleToKafkaMapper {
+
+public final String keyFieldName;
+public final String msgFieldName;
+
+public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String 
msgFieldName) {
+this.keyFieldName = keyFieldName;
+this.msgFieldName = msgFieldName;
+}
+
+@Override
+public K getKeyFromTuple(TridentTuple tuple) {
+return (K) tuple.getValueByField(keyFieldName);
--- End diff --

It is possible but that is the point of the generics.  To try and reduce 
the likelihood of it happening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96924990
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState implements State {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaState.class);
+
+private KafkaProducer producer;
+private OutputCollector collector;
+
+private TridentTupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+
+public TridentKafkaState 
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector 
selector) {
+this.topicSelector = selector;
+return this;
+}
+
+@Override
+public void beginCommit(Long txid) {
+LOG.debug("beginCommit is Noop.");
+}
+
+@Override
+public void commit(Long txid) {
+LOG.debug("commit is Noop.");
+}
+
+public void prepare(Properties options) {
+if (mapper == null) throw new NullPointerException("mapper can not 
be null");
--- End diff --

Objects does not exist in java 6 and I would prefer to keep the code 
compatible as mush as possible to avoid extra rework when pulling these changes 
back.  If you insist I will do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96923940
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState implements State {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaState.class);
+
+private KafkaProducer producer;
+private OutputCollector collector;
+
+private TridentTupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+
+public TridentKafkaState 
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
--- End diff --

This is code that was "moved" like with the KafkaBolt for storm-kafka to 
storm-kafka-client.  If we really want to make it immutable we can, but I think 
that is beyond the scope of this JIRA  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96923481
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
 ---
@@ -44,29 +43,30 @@
 // Bookkeeping
 private final KafkaSpoutConfig kafkaSpoutConfig;
 // Declare some KafkaSpoutConfig references for convenience
-private KafkaSpoutStreams kafkaSpoutStreams;// Object 
that wraps all the logic to declare output fields and emit tuples
-private KafkaSpoutTuplesBuilder tuplesBuilder;// Object 
that contains the logic to build tuples for each ConsumerRecord
+private final Fields fields;
 
 public KafkaTridentSpoutManager(KafkaSpoutConfig 
kafkaSpoutConfig) {
 this.kafkaSpoutConfig = kafkaSpoutConfig;
-kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
-tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder();
+RecordTranslator translator = 
kafkaSpoutConfig.getTranslator();
+Fields fields = null;
--- End diff --

I'm a bit confused about what benefit that would give.  The Fields come 
from the SpoutConfig, by way of the RecordTranslator.  Why would you want to 
override the Fields but not the RecordTranslator that must conform to those 
fields?  I just don't see value it separating two things that are highly 
coupled together.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96911529
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
 ---
@@ -15,14 +15,26 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+package org.apache.storm.kafka.spout;
 
-package org.apache.storm.kafka.spout.internal.partition;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
 
-import java.util.List;
+public class RoundRobinManualPartitioner implements ManualPartitioner {
 
-public interface KafkaPartitionReader {
-List readPartitions(KafkaConsumer consumer);
+   @Override
+   public List partition(List 
allPartitions, TopologyContext context) {
+   int thisTaskIndex = context.getThisTaskIndex();
+   int totalTaskCount = 
context.getComponentTasks(context.getThisComponentId()).size();
+   Set myPartitions = new 
HashSet<>(allPartitions.size()/totalTaskCount+1);
+   for (int i = thisTaskIndex; i < allPartitions.size(); i += 
totalTaskCount) {
--- End diff --

No.  `i < allPartitions.size()` guarantees that we will never call get on 
allPartitions with an index that is out of bounds.  The Set is just setting the 
initial size to avoid more memory allocation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96909498
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
 ---
@@ -15,22 +15,33 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
-
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.List;
-
-public class KafkaSpoutTuplesBuilderWildcardTopics implements 
KafkaSpoutTuplesBuilder {
-private KafkaSpoutTupleBuilder tupleBuilder;
+import org.apache.storm.tuple.Values;
 
-public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder tupleBuilder) {
-this.tupleBuilder = tupleBuilder;
+/**
+ * A list of Values in a tuple that can be routed 
+ * to a given stream.
+ */
+public class KafkaTuple extends Values {
+private static final long serialVersionUID = 4803794470450587992L;
+private String stream = null;
+
+public KafkaTuple() {
+super();
+}
+
+public KafkaTuple(Object... vals) {
+super(vals);
+}
+
+public KafkaTuple routedTo(String stream) {
--- End diff --

This is because the constructor is varadic following the Values parent 
class. It is ambiguous to have

```
public KafkaTuple(Object... vals)
```

and any other constructor.  Java complains.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96908491
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Subscribe to all topics that follow a given list of values
+ */
+public class NamedSubscription extends Subscription {
+private static final Logger LOG = 
LoggerFactory.getLogger(NamedSubscription.class);
+private static final long serialVersionUID = 3438543305215813839L;
+protected final Collection topics;
+
+public NamedSubscription(Collection topics) {
+super();
+this.topics = Collections.unmodifiableCollection(new 
ArrayList<>(topics));
+}
+
+public NamedSubscription(String ... topics) {
+this(Arrays.asList(topics));
+}
+
+@Override
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener, TopologyContext unused) {
+consumer.subscribe(topics, listener);
+LOG.info("Kafka consumer subscribed topics {}", topics);
+}
+
+@Override
+public String getTopicsString() {
--- End diff --

It is in the parent javadocs.  That is around the override


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96908178
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
 ---
@@ -1,16 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.common.TopicPartition;
-
 import java.util.Comparator;
 
+import org.apache.kafka.common.TopicPartition;
+
 public class TopicPartitionComparator implements 
Comparator {
-@Override
-public int compare(TopicPartition o1, TopicPartition o2) {
-if (!o1.topic().equals(o2.topic())) {
-return o1.topic().compareTo(o2.topic());
-} else {
-return o1.partition() - o2.partition();
-}
-}
+   public static final TopicPartitionComparator INSTANCE = new 
TopicPartitionComparator();
--- End diff --

I'm not sure we need to guarantee it.  The old code had the INSTANCE where 
it was used, I moved it here in hopes that others might use it.  I could make 
the constructor private if we really want it to be a singleton, but I don't 
think it is a requirement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96907232
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -325,15 +310,19 @@ private boolean 
emitTupleIfNotEmitted(ConsumerRecord record) {
 } else {
 boolean isScheduled = retryService.isScheduled(msgId);
 if (!isScheduled || retryService.isReady(msgId)) {   // not 
scheduled <=> never failed (i.e. never emitted) or ready to be retried
-final List tuple = 
tuplesBuilder.buildTuple(record);
-kafkaSpoutStreams.emit(collector, tuple, msgId);
+final List tuple = 
kafkaSpoutConfig.getTranslator().apply(record);
+if (tuple instanceof KafkaTuple) {
--- End diff --

Yes, and it is documented in RecordTranslator.  I will add in more 
documentation on it though.

The reason specifically for this was because the spout is not able to keep 
track of a single message being emitted to multiple streams.  It would get 
confused and ack it before it was truly done.  This makes it impossible for 
that to happen.  What is more the built in record translators should cover 99% 
of the use cases, so the fact that it is not super well documented should be 
more of a corner case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96904191
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -61,129 +66,244 @@
  * If no offset has been committed, it behaves as LATEST.
  * 
  * */
-public enum FirstPollOffsetStrategy {
+public static enum FirstPollOffsetStrategy {
 EARLIEST,
 LATEST,
 UNCOMMITTED_EARLIEST,
 UNCOMMITTED_LATEST }
-
-// Kafka consumer configuration
-private final Map kafkaProps;
-private final Deserializer keyDeserializer;
-private final Deserializer valueDeserializer;
-private final long pollTimeoutMs;
-
-// Kafka spout configuration
-private final long offsetCommitPeriodMs;
-private final int maxRetries;
-private final int maxUncommittedOffsets;
-private final long partitionRefreshPeriodMs;
-private final boolean manualPartitionAssignment;
-private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-private final KafkaSpoutStreams kafkaSpoutStreams;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-private KafkaSpoutConfig(Builder builder) {
-this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-this.keyDeserializer = builder.keyDeserializer;
-this.valueDeserializer = builder.valueDeserializer;
-this.pollTimeoutMs = builder.pollTimeoutMs;
-this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-this.maxRetries = builder.maxRetries;
-this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
-this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
-this.manualPartitionAssignment = builder.manualPartitionAssignment;
-this.tuplesBuilder = builder.tuplesBuilder;
-this.retryService = builder.retryService;
+
+public static Builder builder(String bootstrapServers, 
String ... topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
 }
-
-private Map setDefaultsAndGetKafkaProps(Map kafkaProps) {
+
+public static Builder builder(String bootstrapServers, 
Collection topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+public static Builder builder(String bootstrapServers, 
Pattern topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+private static Map 
setDefaultsAndGetKafkaProps(Map kafkaProps) {
 // set defaults for properties not specified
-if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
-kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+if 
(!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
 }
 return kafkaProps;
 }
-
+
 public static class Builder {
 private final Map kafkaProps;
-private SerializableDeserializer keyDeserializer;
-private SerializableDeserializer valueDeserializer;
+private Subscription subscription;
+private final SerializableDeserializer keyDes;
+private final Class> keyDesClazz;
+private final SerializableDeserializer valueDes;
+private final Class> valueDesClazz;
+private RecordTranslator translator;
 private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
 private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
 private int maxRetries = DEFAULT_MAX_RETRIES;
 private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-private final KafkaSpoutStreams kafkaSpoutStreams;
 private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
+private KafkaSpoutRetryService retryService = 
DEFAULT_RETRY_SERVICE;
 private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
-private boolean manualPartitionAssignment = false;
-private final 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96902715
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
 ---
@@ -1,16 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.common.TopicPartition;
-
 import java.util.Comparator;
 
+import org.apache.kafka.common.TopicPartition;
+
 public class TopicPartitionComparator implements 
Comparator {
-@Override
-public int compare(TopicPartition o1, TopicPartition o2) {
-if (!o1.topic().equals(o2.topic())) {
-return o1.topic().compareTo(o2.topic());
-} else {
-return o1.partition() - o2.partition();
-}
-}
+   public static final TopicPartitionComparator INSTANCE = new 
TopicPartitionComparator();
--- End diff --

Enums for things that are not an enumeration feels a little weird. Wouldn't 
you get the same result by just adding a private no-args constructor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96903456
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
 ---
@@ -15,14 +15,26 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+package org.apache.storm.kafka.spout;
 
-package org.apache.storm.kafka.spout.internal.partition;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
 
-import java.util.List;
+public class RoundRobinManualPartitioner implements ManualPartitioner {
 
-public interface KafkaPartitionReader {
-List readPartitions(KafkaConsumer consumer);
+   @Override
+   public List partition(List 
allPartitions, TopologyContext context) {
+   int thisTaskIndex = context.getThisTaskIndex();
+   int totalTaskCount = 
context.getComponentTasks(context.getThisComponentId()).size();
+   Set myPartitions = new 
HashSet<>(allPartitions.size()/totalTaskCount+1);
+   for (int i = thisTaskIndex; i < allPartitions.size(); i += 
totalTaskCount) {
--- End diff --

I don't think so. Either i is in [0, allPartitions.size()-1] or the loop is 
over.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96897291
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -325,15 +310,19 @@ private boolean 
emitTupleIfNotEmitted(ConsumerRecord record) {
 } else {
 boolean isScheduled = retryService.isScheduled(msgId);
 if (!isScheduled || retryService.isReady(msgId)) {   // not 
scheduled <=> never failed (i.e. never emitted) or ready to be retried
-final List tuple = 
tuplesBuilder.buildTuple(record);
-kafkaSpoutStreams.emit(collector, tuple, msgId);
+final List tuple = 
kafkaSpoutConfig.getTranslator().apply(record);
+if (tuple instanceof KafkaTuple) {
--- End diff --

The javadocs for RecordTranslator state.
```
 * @return the objects in the tuple.  Return a {@link KafkaTuple}
 * if you want to route the tuple to a non-default stream
```

All of the provided implementations support this.  Both Simple and ByTopic, 
by way of the SimpleRecordTranslator.

I will add a section in the docs to talk about this, but it only really 
matters if you are writing your own record translator from scratch, instead of 
using the built in ones that should cover the vast majority of the use cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96894587
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+public class ByTopicRecordTranslator implements RecordTranslator {
+private static final long serialVersionUID = -121699733778988688L;
+private final RecordTranslator defaultTranslator;
+private final Map> topicToTranslator = 
new HashMap<>();
+private final Map streamToFields = new HashMap<>();
+
+public ByTopicRecordTranslator(Func, 
List> func, Fields fields, String stream) {
+this(new SimpleRecordTranslator<>(func, fields, stream));
+}
+
+public ByTopicRecordTranslator(Func, 
List> func, Fields fields) {
+this(new SimpleRecordTranslator<>(func, fields));
+}
+
+public ByTopicRecordTranslator(RecordTranslator 
defaultTranslator) {
+this.defaultTranslator = defaultTranslator;
+cacheNCheckFields(defaultTranslator);
+}
+
+public ByTopicRecordTranslator forTopic(String topic, 
Func, List> func, Fields fields) {
+return forTopic(topic, new SimpleRecordTranslator<>(func, fields));
+}
+
+public ByTopicRecordTranslator forTopic(String topic, 
Func, List> func, Fields fields, String stream) {
+return forTopic(topic, new SimpleRecordTranslator<>(func, fields, 
stream));
+}
+
+public ByTopicRecordTranslator forTopic(String topic, 
RecordTranslator translator) {
+if (topicToTranslator.containsKey(topic)) {
+throw new IllegalStateException("Topic " + topic + " is 
already registered");
+}
+topicToTranslator.put(topic, translator);
+cacheNCheckFields(translator);
+return this;
+}
+
+private void cacheNCheckFields(RecordTranslator translator) {
+for (String stream : translator.streams()) {
+Fields fromTrans = translator.getFieldsFor(stream);
+Fields cached = streamToFields.get(stream);
+if (cached != null && !fromTrans.equals(cached)) {
+throw new IllegalArgumentException("Stream " + stream + " 
currently has Fields of " + cached + " which is not the same as those being 
added in " + fromTrans);
--- End diff --

In this case it is the argument being passed in that is bad and we are 
rejecting it.  They could come back and switch it to a new stream which would 
be fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96893867
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+public class ByTopicRecordTranslator implements RecordTranslator {
+private static final long serialVersionUID = -121699733778988688L;
+private final RecordTranslator defaultTranslator;
+private final Map> topicToTranslator = 
new HashMap<>();
+private final Map streamToFields = new HashMap<>();
+
+public ByTopicRecordTranslator(Func, 
List> func, Fields fields, String stream) {
+this(new SimpleRecordTranslator<>(func, fields, stream));
+}
+
+public ByTopicRecordTranslator(Func, 
List> func, Fields fields) {
+this(new SimpleRecordTranslator<>(func, fields));
+}
+
+public ByTopicRecordTranslator(RecordTranslator 
defaultTranslator) {
+this.defaultTranslator = defaultTranslator;
+cacheNCheckFields(defaultTranslator);
+}
+
+public ByTopicRecordTranslator forTopic(String topic, 
Func, List> func, Fields fields) {
+return forTopic(topic, new SimpleRecordTranslator<>(func, fields));
+}
+
+public ByTopicRecordTranslator forTopic(String topic, 
Func, List> func, Fields fields, String stream) {
+return forTopic(topic, new SimpleRecordTranslator<>(func, fields, 
stream));
+}
+
+public ByTopicRecordTranslator forTopic(String topic, 
RecordTranslator translator) {
+if (topicToTranslator.containsKey(topic)) {
+throw new IllegalStateException("Topic " + topic + " is 
already registered");
+}
+topicToTranslator.put(topic, translator);
+cacheNCheckFields(translator);
+return this;
+}
+
+private void cacheNCheckFields(RecordTranslator translator) {
+for (String stream : translator.streams()) {
+Fields fromTrans = translator.getFieldsFor(stream);
+Fields cached = streamToFields.get(stream);
+if (cached != null && !fromTrans.equals(cached)) {
--- End diff --

```
ByTopicRecordTranslator trans = new ByTopicRecordTranslator((rec) -> 
Arrays.asList(rec.offset()), new Fields("offset"), "default");
trans.forTopic("specialTopic", (rec) -> Arrays.asList(rec.offset(), 
rec.message()), new Fields("offset", "message"), "default");
```
At this point we have tried to declare that the "default" stream has Fields 
["offset"], and ["offset", "message"]  This is not supported by storm so we 
should not allow anyone to configure the spout to do this.

streamToFields is not yet updated for the new translator we are adding it 
yet.  We do it after we have verified that the Fields match for anything we 
have done already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96891568
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
 ---
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field with a given index to select the topic name from a tuple .
+ */
+public class FieldIndexTopicSelector implements KafkaTopicSelector {
+private static final long serialVersionUID = -3830575380208166367L;
--- End diff --

Because I missed them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96891946
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
 ---
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field name to select topic name from tuple .
+ */
+public class FieldNameTopicSelector implements KafkaTopicSelector {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FieldNameTopicSelector.class);
+
+private final String fieldName;
+private final String defaultTopicName;
+
+
+public FieldNameTopicSelector(String fieldName, String 
defaultTopicName) {
+this.fieldName = fieldName;
+this.defaultTopicName = defaultTopicName;
+}
+
+@Override
+public String getTopic(Tuple tuple) {
--- End diff --

I agree but both for backwards compatibility and to limit the scope of this 
JIRA I would rather see it done in a follow on JIRA


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96889563
  
--- Diff: 
external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java ---
@@ -51,7 +51,9 @@
  * This bolt uses 0.8.2 Kafka Producer API.
  * 
  * It works for sending tuples to older Kafka version (0.8.1).
+ * @deprecated Please use the KafkaBolt in storm-kafka-client
  */
+@Deprecated
--- End diff --

I would rather do that in a follow on JIRA.  Most of the code is the same 
between the two so getting the rework from here to make there is a lot simpler 
if I can do a cherry pick


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96889298
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A subscription to kafka.
+ */
+public abstract class Subscription implements Serializable {
+private static final long serialVersionUID = -216136367240198716L;
+
+/**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the 
subscription
+ */
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener, TopologyContext context) {
+   subscribe(consumer, listener);
+}
+
+/**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the 
subscription
+ * @deprecated please use the version with the TopologyContext in it
+ */
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener) {
+   throw new IllegalStateException("At least one subscribe method 
must be overwritten");
+}
+
+/**
+ * @return a string representing the subscribed topics.
--- End diff --

I didn't know either, but the only place I can see it used is in logging so 
that is what I did.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96887342
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -61,129 +66,244 @@
  * If no offset has been committed, it behaves as LATEST.
  * 
  * */
-public enum FirstPollOffsetStrategy {
+public static enum FirstPollOffsetStrategy {
 EARLIEST,
 LATEST,
 UNCOMMITTED_EARLIEST,
 UNCOMMITTED_LATEST }
-
-// Kafka consumer configuration
-private final Map kafkaProps;
-private final Deserializer keyDeserializer;
-private final Deserializer valueDeserializer;
-private final long pollTimeoutMs;
-
-// Kafka spout configuration
-private final long offsetCommitPeriodMs;
-private final int maxRetries;
-private final int maxUncommittedOffsets;
-private final long partitionRefreshPeriodMs;
-private final boolean manualPartitionAssignment;
-private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-private final KafkaSpoutStreams kafkaSpoutStreams;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-private KafkaSpoutConfig(Builder builder) {
-this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-this.keyDeserializer = builder.keyDeserializer;
-this.valueDeserializer = builder.valueDeserializer;
-this.pollTimeoutMs = builder.pollTimeoutMs;
-this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-this.maxRetries = builder.maxRetries;
-this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
-this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
-this.manualPartitionAssignment = builder.manualPartitionAssignment;
-this.tuplesBuilder = builder.tuplesBuilder;
-this.retryService = builder.retryService;
+
+public static Builder builder(String bootstrapServers, 
String ... topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
 }
-
-private Map setDefaultsAndGetKafkaProps(Map kafkaProps) {
+
+public static Builder builder(String bootstrapServers, 
Collection topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+public static Builder builder(String bootstrapServers, 
Pattern topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+private static Map 
setDefaultsAndGetKafkaProps(Map kafkaProps) {
 // set defaults for properties not specified
-if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
-kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+if 
(!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
 }
 return kafkaProps;
 }
-
+
 public static class Builder {
 private final Map kafkaProps;
-private SerializableDeserializer keyDeserializer;
-private SerializableDeserializer valueDeserializer;
+private Subscription subscription;
+private final SerializableDeserializer keyDes;
+private final Class> keyDesClazz;
+private final SerializableDeserializer valueDes;
+private final Class> valueDesClazz;
+private RecordTranslator translator;
 private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
 private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
 private int maxRetries = DEFAULT_MAX_RETRIES;
 private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-private final KafkaSpoutStreams kafkaSpoutStreams;
 private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
+private KafkaSpoutRetryService retryService = 
DEFAULT_RETRY_SERVICE;
 private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
-private boolean manualPartitionAssignment = false;
-private final 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96886556
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -61,129 +66,244 @@
  * If no offset has been committed, it behaves as LATEST.
  * 
  * */
-public enum FirstPollOffsetStrategy {
+public static enum FirstPollOffsetStrategy {
 EARLIEST,
 LATEST,
 UNCOMMITTED_EARLIEST,
 UNCOMMITTED_LATEST }
-
-// Kafka consumer configuration
-private final Map kafkaProps;
-private final Deserializer keyDeserializer;
-private final Deserializer valueDeserializer;
-private final long pollTimeoutMs;
-
-// Kafka spout configuration
-private final long offsetCommitPeriodMs;
-private final int maxRetries;
-private final int maxUncommittedOffsets;
-private final long partitionRefreshPeriodMs;
-private final boolean manualPartitionAssignment;
-private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-private final KafkaSpoutStreams kafkaSpoutStreams;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-private KafkaSpoutConfig(Builder builder) {
-this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-this.keyDeserializer = builder.keyDeserializer;
-this.valueDeserializer = builder.valueDeserializer;
-this.pollTimeoutMs = builder.pollTimeoutMs;
-this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-this.maxRetries = builder.maxRetries;
-this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
-this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
-this.manualPartitionAssignment = builder.manualPartitionAssignment;
-this.tuplesBuilder = builder.tuplesBuilder;
-this.retryService = builder.retryService;
+
+public static Builder builder(String bootstrapServers, 
String ... topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
 }
-
-private Map setDefaultsAndGetKafkaProps(Map kafkaProps) {
+
+public static Builder builder(String bootstrapServers, 
Collection topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+public static Builder builder(String bootstrapServers, 
Pattern topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+private static Map 
setDefaultsAndGetKafkaProps(Map kafkaProps) {
 // set defaults for properties not specified
-if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
-kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+if 
(!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
 }
 return kafkaProps;
 }
-
+
 public static class Builder {
 private final Map kafkaProps;
-private SerializableDeserializer keyDeserializer;
-private SerializableDeserializer valueDeserializer;
+private Subscription subscription;
+private final SerializableDeserializer keyDes;
+private final Class> keyDesClazz;
+private final SerializableDeserializer valueDes;
+private final Class> valueDesClazz;
+private RecordTranslator translator;
 private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
 private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
 private int maxRetries = DEFAULT_MAX_RETRIES;
 private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-private final KafkaSpoutStreams kafkaSpoutStreams;
 private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
+private KafkaSpoutRetryService retryService = 
DEFAULT_RETRY_SERVICE;
 private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
-private boolean manualPartitionAssignment = false;
-private final 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96884451
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -225,11 +213,7 @@ public void nextTuple() {
 }
 
 if (poll()) {
-try {
-setWaitingToEmit(pollKafkaBroker());
-} catch (RetriableException e) {
-LOG.error("Failed to poll from kafka.", e);
-}
+setWaitingToEmit(pollKafkaBroker());
--- End diff --

That's right it was a merge error.  Great catch though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96879498
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+throw new IllegalArgumentException("topic should be 
specified in bolt's configuration");
+}
+}
+
+producer = mkProducer(boltSpecfiedProperties);
+this.collector = collector;
+}
+
+/**
+ * Intended to be overridden for tests.  Make the producer from props
--- End diff --

No sorry wrong place for that comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96879307
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+throw new IllegalArgumentException("topic should be 
specified in bolt's configuration");
+}
+}
+
+producer = mkProducer(boltSpecfiedProperties);
+this.collector = collector;
+}
+
+/**
+ * Intended to be overridden for tests.  Make the producer from props
--- End diff --

backwards compatibility


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96879248
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+throw new IllegalArgumentException("topic should be 
specified in bolt's configuration");
+}
+}
+
+producer = mkProducer(boltSpecfiedProperties);
+this.collector = collector;
+}
+
+/**
+ * Intended to be overridden for tests.  Make the producer from props
+ */
+protected KafkaProducer mkProducer(Properties props) {
--- End diff --

backwards compatibility



---
If your project is set up for it, you can reply 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96877333
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
--- End diff --

I don't want to rename public facing APIs right now, because this is a copy 
of what is in external/storm-kafka.  The other one is deprecated, but I want to 
maintain compatibility if possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96877744
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
--- End diff --

This is "moving" existing code so I want to maintain compatibility if 
possible.  But we are doing a copy + deprecation because the two libraries are 
compiled with different versions of Kafka so combining the two libraries in a 
single topology is difficult in some cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-19 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96875424
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,232 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka copnsumer API.
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatibility
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing an instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to 
select the topic should to publish a tuple to.
+A user just needs to specify the field name or field index for the topic 
name in the tuple itself.
+When the topic is name not found , the `Field*TopicSelector` will write 
messages into default topic .
+Please make sure the default topic has been created .
+
+### Specifying Kafka producer properties
+You can provide all the producer properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+These are also defined in 
`org.apache.kafka.clients.producer.ProducerConfig`
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96763012
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -447,7 +440,10 @@ private void shutdown() {
 
 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
-kafkaSpoutStreams.declareOutputFields(declarer);
+RecordTranslator translator = 
kafkaSpoutConfig.getTranslator();
--- End diff --

I agree with this. I just think that this pattern is used many times in 
many Bolt implementations. I think that it would be useful to have an 
abstraction as part of stomr, similar to what `KafkaSpoutStreams` was doing, 
that would allow Bolts to declare the set of fields per stream in a pluggable 
way. This would make it uniform for every Bolt, and avoid having to do similar 
work for every Bolt that is writing to multiple streams.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96764322
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
 ---
@@ -15,22 +15,33 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
-
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.List;
-
-public class KafkaSpoutTuplesBuilderWildcardTopics implements 
KafkaSpoutTuplesBuilder {
-private KafkaSpoutTupleBuilder tupleBuilder;
+import org.apache.storm.tuple.Values;
 
-public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder tupleBuilder) {
-this.tupleBuilder = tupleBuilder;
+/**
+ * A list of Values in a tuple that can be routed 
+ * to a given stream.
+ */
+public class KafkaTuple extends Values {
+private static final long serialVersionUID = 4803794470450587992L;
+private String stream = null;
+
+public KafkaTuple() {
+super();
+}
+
+public KafkaTuple(Object... vals) {
+super(vals);
+}
+
+public KafkaTuple routedTo(String stream) {
--- End diff --

Why not make this object immutable and use two constructors one with and 
one without stream ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96743744
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
 ---
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field name to select topic name from tuple .
+ */
+public class FieldNameTopicSelector implements KafkaTopicSelector {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FieldNameTopicSelector.class);
+
+private final String fieldName;
+private final String defaultTopicName;
+
+
+public FieldNameTopicSelector(String fieldName, String 
defaultTopicName) {
+this.fieldName = fieldName;
+this.defaultTopicName = defaultTopicName;
+}
+
+@Override
+public String getTopic(Tuple tuple) {
--- End diff --

This logic could be incorporated in the `FieldIndexTopicSelector`. It 
should be possible to delete this class, rename `FieldIndexTopicSelector` to 
`FieldTopicSelector`, and provide two constructors. One with `fieldName` and 
another with `fieldIndex`. And then use `Tuple#fieldIndex(String field)` method 
to extract one from the other.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96756536
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A subscription to kafka.
+ */
+public abstract class Subscription implements Serializable {
+private static final long serialVersionUID = -216136367240198716L;
+
+/**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the 
subscription
+ */
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener, TopologyContext context) {
+   subscribe(consumer, listener);
+}
+
+/**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the 
subscription
+ * @deprecated please use the version with the TopologyContext in it
+ */
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener) {
--- End diff --

Why have deprecated methods in the first implementation of this class ? 
Shouldn't first implementation be `@deprecated` free ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96763257
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -609,4 +593,7 @@ public String toString() {
 '}';
 }
 }
+
+// === Timer ===
--- End diff --

`Timer` was moved to its own parent class. We can remove this comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96763944
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
 ---
@@ -15,22 +15,33 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
-
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.List;
-
-public class KafkaSpoutTuplesBuilderWildcardTopics implements 
KafkaSpoutTuplesBuilder {
-private KafkaSpoutTupleBuilder tupleBuilder;
+import org.apache.storm.tuple.Values;
 
-public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder tupleBuilder) {
-this.tupleBuilder = tupleBuilder;
+/**
+ * A list of Values in a tuple that can be routed 
+ * to a given stream.
--- End diff --

If the `KafkaSpout` remains as is, I would suggest that we add a comment 
here saying that the method 
`org.apache.storm.kafka.spout.RecordTranslator#apply` must necessarily return 
`KafkaTuple` if the user desires to write to multiple streams.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96774008
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * Translate a ConsumerRecord to a tuple.
--- End diff --

{@link org.apache.kafka.clients.consumer.ConsumerRecord}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96779787
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState implements State {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaState.class);
+
+private KafkaProducer producer;
+private OutputCollector collector;
+
+private TridentTupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+
+public TridentKafkaState 
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector 
selector) {
+this.topicSelector = selector;
+return this;
+}
+
+@Override
+public void beginCommit(Long txid) {
+LOG.debug("beginCommit is Noop.");
+}
+
+@Override
+public void commit(Long txid) {
+LOG.debug("commit is Noop.");
+}
+
+public void prepare(Properties options) {
+if (mapper == null) throw new NullPointerException("mapper can not 
be null");
--- End diff --

Objects.requireNonNull("mapper cannot be null");
..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96775680
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
 ---
@@ -15,14 +15,26 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+package org.apache.storm.kafka.spout;
 
-package org.apache.storm.kafka.spout.internal.partition;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
 
-import java.util.List;
+public class RoundRobinManualPartitioner implements ManualPartitioner {
 
-public interface KafkaPartitionReader {
-List readPartitions(KafkaConsumer consumer);
+   @Override
+   public List partition(List 
allPartitions, TopologyContext context) {
+   int thisTaskIndex = context.getThisTaskIndex();
+   int totalTaskCount = 
context.getComponentTasks(context.getThisComponentId()).size();
+   Set myPartitions = new 
HashSet<>(allPartitions.size()/totalTaskCount+1);
+   for (int i = thisTaskIndex; i < allPartitions.size(); i += 
totalTaskCount) {
--- End diff --

can't this code cause `IndexOutOfBoundsException` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96722977
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
 ---
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.mapper;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+/**
+ * as the really verbose name suggests this interface maps a storm tuple 
to kafka key and message.
--- End diff --

I would delete "as the really verbose name suggests this " and put 
"interface defining a mapping from storm tuple to kafka key and message"




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96779676
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 ---
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState implements State {
+private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaState.class);
+
+private KafkaProducer producer;
+private OutputCollector collector;
+
+private TridentTupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+
+public TridentKafkaState 
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
--- End diff --

Wouldn't it be preferable the class to be immutable ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96761049
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -325,15 +310,19 @@ private boolean 
emitTupleIfNotEmitted(ConsumerRecord record) {
 } else {
 boolean isScheduled = retryService.isScheduled(msgId);
 if (!isScheduled || retryService.isReady(msgId)) {   // not 
scheduled <=> never failed (i.e. never emitted) or ready to be retried
-final List tuple = 
tuplesBuilder.buildTuple(record);
-kafkaSpoutStreams.emit(collector, tuple, msgId);
+final List tuple = 
kafkaSpoutConfig.getTranslator().apply(record);
+if (tuple instanceof KafkaTuple) {
--- End diff --

Doesn't this require that for a tuple to go to a specific stream, that 
tuple has to necessarily be a `KafkaTuple`? If that is the case, it is not 
obvious for an implementer of `RecordTranslator` that he has to do so. If I 
understand it correctly, currently this code only works as intended with 
`SimpleRecordTranslator`.

The reasoning behind the `KafkaSpoutStreams` abstraction was to allow the 
user to clearly define the stream and associated fields without ambiguity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96740920
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
 ---
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field with a given index to select the topic name from a tuple .
+ */
+public class FieldIndexTopicSelector implements KafkaTopicSelector {
+private static final long serialVersionUID = -3830575380208166367L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FieldIndexTopicSelector.class);
+
+private final int fieldIndex;
+private final String defaultTopicName;
+
+public FieldIndexTopicSelector(int fieldIndex, String 
defaultTopicName) {
+this.fieldIndex = fieldIndex;
+if (fieldIndex < 0) {
+throw new IllegalArgumentException("fieldIndex cannot be 
negative");
+}
+this.defaultTopicName = defaultTopicName;
+}
+
+@Override
+public String getTopic(Tuple tuple) {
+if (fieldIndex < tuple.size()) {
+return tuple.getString(fieldIndex);
+} else {
+LOG.warn("Field Index " + fieldIndex + " Out of bound . Using 
default topic " + defaultTopicName);
--- End diff --

bounds. Returning default ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96776152
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A subscription to kafka.
+ */
+public abstract class Subscription implements Serializable {
+private static final long serialVersionUID = -216136367240198716L;
+
+/**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the 
subscription
+ */
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener, TopologyContext context) {
+   subscribe(consumer, listener);
+}
+
+/**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the 
subscription
+ * @deprecated please use the version with the TopologyContext in it
+ */
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener) {
+   throw new IllegalStateException("At least one subscribe method 
must be overwritten");
+}
+
+/**
+ * @return a string representing the subscribed topics.
+ */
+public abstract String getTopicsString();
+
+/**
+ * Refresh any assignments if needed.  Kafka usually will handle this 
for you.
--- End diff --

NOOP is the default behavior, which means that Kafka will internally handle 
partition assignment. If you wish to do manual partition management, you must 
provide an implementation of this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96780580
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
 ---
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class FieldNameBasedTupleToKafkaMapper implements 
TridentTupleToKafkaMapper {
+
+public final String keyFieldName;
+public final String msgFieldName;
+
+public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String 
msgFieldName) {
+this.keyFieldName = keyFieldName;
+this.msgFieldName = msgFieldName;
+}
+
+@Override
+public K getKeyFromTuple(TridentTuple tuple) {
+return (K) tuple.getValueByField(keyFieldName);
+}
+
+@Override
+public V getMessageFromTuple(TridentTuple tuple) {
--- End diff --

ClassCastException?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96744488
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
 ---
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field with a given index to select the topic name from a tuple .
+ */
+public class FieldIndexTopicSelector implements KafkaTopicSelector {
+private static final long serialVersionUID = -3830575380208166367L;
--- End diff --

Why does this class and other classes (e.g. ByTopicRecordTranslator) have 
`serialVersionUID` but others (e.g. DefaultTopicSelector, 
FieldNameTopicSelector) don't?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96780567
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
 ---
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class FieldNameBasedTupleToKafkaMapper implements 
TridentTupleToKafkaMapper {
+
+public final String keyFieldName;
+public final String msgFieldName;
+
+public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String 
msgFieldName) {
+this.keyFieldName = keyFieldName;
+this.msgFieldName = msgFieldName;
+}
+
+@Override
+public K getKeyFromTuple(TridentTuple tuple) {
+return (K) tuple.getValueByField(keyFieldName);
--- End diff --

ClassCastException?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96762055
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
 ---
@@ -1,16 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.common.TopicPartition;
-
 import java.util.Comparator;
 
+import org.apache.kafka.common.TopicPartition;
+
 public class TopicPartitionComparator implements 
Comparator {
-@Override
-public int compare(TopicPartition o1, TopicPartition o2) {
-if (!o1.topic().equals(o2.topic())) {
-return o1.topic().compareTo(o2.topic());
-} else {
-return o1.partition() - o2.partition();
-}
-}
+   public static final TopicPartitionComparator INSTANCE = new 
TopicPartitionComparator();
--- End diff --

Probably the best way to guarantee that INSTANCE is always called is to 
make this an `Enum`, which will also make the code thread safe. Currently 
nothing prevents the user from creating her own instance of this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96773771
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Subscribe to all topics that follow a given list of values
+ */
+public class NamedSubscription extends Subscription {
+private static final Logger LOG = 
LoggerFactory.getLogger(NamedSubscription.class);
+private static final long serialVersionUID = 3438543305215813839L;
+protected final Collection topics;
+
+public NamedSubscription(Collection topics) {
+super();
--- End diff --

super not necessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96774279
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * Translate a ConsumerRecord to a tuple.
+ */
+public interface RecordTranslator extends Serializable, 
Func, List> {
+public static final List DEFAULT_STREAM = 
Collections.singletonList("default");
+
+/**
+ * Do the translation.
--- End diff --

Creates a tuple from a Kafka ConsumerRecord.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96780204
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
 ---
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+String getTopic(Tuple tuple);
--- End diff --

since in some scenarios it's OK that this value returns null, wouldn't it 
be much clearer to return `Optional` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96767284
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+public class ManualPartitionNamedSubscription extends NamedSubscription {
+   private static final long serialVersionUID = 5633018073527583826L;
+   private final ManualPartitioner parter;
+   private Set currentAssignment = null;
--- End diff --

all these `= null` are redundant


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96762352
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Subscribe to all topics that follow a given list of values
+ */
+public class NamedSubscription extends Subscription {
+private static final Logger LOG = 
LoggerFactory.getLogger(NamedSubscription.class);
+private static final long serialVersionUID = 3438543305215813839L;
+protected final Collection topics;
+
+public NamedSubscription(Collection topics) {
+super();
+this.topics = Collections.unmodifiableCollection(new 
ArrayList<>(topics));
+}
+
+public NamedSubscription(String ... topics) {
+this(Arrays.asList(topics));
+}
+
+@Override
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener, TopologyContext unused) {
+consumer.subscribe(topics, listener);
+LOG.info("Kafka consumer subscribed topics {}", topics);
+}
+
+@Override
+public String getTopicsString() {
--- End diff --

Javadoc saying that it is a comma separated String of all the topic names


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96731003
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -235,41 +355,116 @@ public Builder(Map kafkaProps, 
KafkaSpoutStreams kafkaSpoutStrea
 this.firstPollOffsetStrategy = firstPollOffsetStrategy;
 return this;
 }
-
+
 /**
- * Sets partition refresh period in milliseconds in manual 
partition assignment model. Default is 2s.
- * @param partitionRefreshPeriodMs time in milliseconds
+ * Sets the retry service for the spout to use.
+ * @param retryService the new retry service
+ * @return the builder (this).
  */
-public Builder setPartitionRefreshPeriodMs(long 
partitionRefreshPeriodMs) {
-this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
+public Builder setRetry(KafkaSpoutRetryService retryService) 
{
+if (retryService == null) {
+throw new NullPointerException("retryService cannot be 
null");
+}
+this.retryService = retryService;
 return this;
 }
 
+public Builder setRecordTranslator(RecordTranslator 
translator) {
+this.translator = translator;
+return this;
+}
+
+public Builder setRecordTranslator(Func, List> func, Fields fields) {
+return setRecordTranslator(new SimpleRecordTranslator<>(func, 
fields));
+}
+
+public Builder setRecordTranslator(Func, List> func, Fields fields, String stream) {
+return setRecordTranslator(new SimpleRecordTranslator<>(func, 
fields, stream));
+}
+
 /**
- * Defines whether the consumer manages partition manually.
- * If set to true, the consumer manage partition manually, 
otherwise it will rely on kafka to do partition assignment.
- * @param manualPartitionAssignment True if using manual partition 
assignment.
+ * Sets partition refresh period in milliseconds. This is how 
often the subscription is refreshed
+ * For most subscriptions that go through the 
KafkaConsumer.subscribe this is ignored.
--- End diff --

This seems vague, and requires the user to go look at the subscription 
source. Maybe either link the method in Subscription that must be overridden to 
make this apply, or list the subscriptions that don't ignore this setting


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96734787
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -61,129 +66,244 @@
  * If no offset has been committed, it behaves as LATEST.
  * 
  * */
-public enum FirstPollOffsetStrategy {
+public static enum FirstPollOffsetStrategy {
 EARLIEST,
 LATEST,
 UNCOMMITTED_EARLIEST,
 UNCOMMITTED_LATEST }
-
-// Kafka consumer configuration
-private final Map kafkaProps;
-private final Deserializer keyDeserializer;
-private final Deserializer valueDeserializer;
-private final long pollTimeoutMs;
-
-// Kafka spout configuration
-private final long offsetCommitPeriodMs;
-private final int maxRetries;
-private final int maxUncommittedOffsets;
-private final long partitionRefreshPeriodMs;
-private final boolean manualPartitionAssignment;
-private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-private final KafkaSpoutStreams kafkaSpoutStreams;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-private KafkaSpoutConfig(Builder builder) {
-this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-this.keyDeserializer = builder.keyDeserializer;
-this.valueDeserializer = builder.valueDeserializer;
-this.pollTimeoutMs = builder.pollTimeoutMs;
-this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-this.maxRetries = builder.maxRetries;
-this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
-this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
-this.manualPartitionAssignment = builder.manualPartitionAssignment;
-this.tuplesBuilder = builder.tuplesBuilder;
-this.retryService = builder.retryService;
+
+public static Builder builder(String bootstrapServers, 
String ... topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
 }
-
-private Map setDefaultsAndGetKafkaProps(Map kafkaProps) {
+
+public static Builder builder(String bootstrapServers, 
Collection topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+public static Builder builder(String bootstrapServers, 
Pattern topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+private static Map 
setDefaultsAndGetKafkaProps(Map kafkaProps) {
 // set defaults for properties not specified
-if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
-kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+if 
(!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
 }
 return kafkaProps;
 }
-
+
 public static class Builder {
 private final Map kafkaProps;
-private SerializableDeserializer keyDeserializer;
-private SerializableDeserializer valueDeserializer;
+private Subscription subscription;
+private final SerializableDeserializer keyDes;
+private final Class> keyDesClazz;
+private final SerializableDeserializer valueDes;
+private final Class> valueDesClazz;
+private RecordTranslator translator;
 private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
 private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
 private int maxRetries = DEFAULT_MAX_RETRIES;
 private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-private final KafkaSpoutStreams kafkaSpoutStreams;
 private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
+private KafkaSpoutRetryService retryService = 
DEFAULT_RETRY_SERVICE;
 private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
-private boolean manualPartitionAssignment = false;
-private final 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96721536
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
--- End diff --

Is kafka.broker.properties still being used somewhere? It might be good to 
mention that new code should prefer withTopicSelector over using 'topic' in 
storm conf.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96730185
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -235,41 +355,116 @@ public Builder(Map kafkaProps, 
KafkaSpoutStreams kafkaSpoutStrea
 this.firstPollOffsetStrategy = firstPollOffsetStrategy;
 return this;
 }
-
+
 /**
- * Sets partition refresh period in milliseconds in manual 
partition assignment model. Default is 2s.
- * @param partitionRefreshPeriodMs time in milliseconds
+ * Sets the retry service for the spout to use.
+ * @param retryService the new retry service
+ * @return the builder (this).
  */
-public Builder setPartitionRefreshPeriodMs(long 
partitionRefreshPeriodMs) {
-this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
+public Builder setRetry(KafkaSpoutRetryService retryService) 
{
+if (retryService == null) {
+throw new NullPointerException("retryService cannot be 
null");
+}
+this.retryService = retryService;
 return this;
 }
 
+public Builder setRecordTranslator(RecordTranslator 
translator) {
+this.translator = translator;
+return this;
+}
+
+public Builder setRecordTranslator(Func, List> func, Fields fields) {
+return setRecordTranslator(new SimpleRecordTranslator<>(func, 
fields));
+}
+
+public Builder setRecordTranslator(Func, List> func, Fields fields, String stream) {
+return setRecordTranslator(new SimpleRecordTranslator<>(func, 
fields, stream));
+}
+
 /**
- * Defines whether the consumer manages partition manually.
- * If set to true, the consumer manage partition manually, 
otherwise it will rely on kafka to do partition assignment.
- * @param manualPartitionAssignment True if using manual partition 
assignment.
+ * Sets partition refresh period in milliseconds. This is how 
often the subscription is refreshed
+ * For most subscriptions that go through the 
KafkaConsumer.subscribe this is ignored.
+ * @param partitionRefreshPeriodMs time in milliseconds
+ * @return the builder (this)
  */
-public Builder setManualPartitionAssignment(boolean 
manualPartitionAssignment) {
-this.manualPartitionAssignment = manualPartitionAssignment;
-return this;
+public Builder setPartitionRefreshPeriodMs(long 
partitionRefreshPeriodMs) {
+   this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
+   return this;
 }
-
+
 public KafkaSpoutConfig build() {
 return new KafkaSpoutConfig<>(this);
 }
 }
 
+// Kafka consumer configuration
+private final Map kafkaProps;
+private final Subscription subscription;
+private final SerializableDeserializer keyDes;
+private final Class> keyDesClazz;
+private final SerializableDeserializer valueDes;
+private final Class> valueDesClazz;
+private final long pollTimeoutMs;
+
+// Kafka spout configuration
+private final RecordTranslator translator;
+private final long offsetCommitPeriodMs;
+private final int maxRetries;
+private final int maxUncommittedOffsets;
+private final FirstPollOffsetStrategy firstPollOffsetStrategy;
+private final KafkaSpoutRetryService retryService;
+private final long partitionRefreshPeriodMs;
+
+private KafkaSpoutConfig(Builder builder) {
+this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
+this.subscription = builder.subscription;
+this.translator = builder.translator;
+this.pollTimeoutMs = builder.pollTimeoutMs;
+this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
+this.maxRetries = builder.maxRetries;
+this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+this.retryService = builder.retryService;
+this.keyDes = builder.keyDes;
+this.keyDesClazz = builder.keyDesClazz;
+this.valueDes = builder.valueDes;
+this.valueDesClazz = builder.valueDesClazz;
+this.partitionRefreshPeriodMs = 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96723152
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+throw new IllegalArgumentException("topic should be 
specified in bolt's configuration");
+}
+}
+
+producer = mkProducer(boltSpecfiedProperties);
+this.collector = collector;
+}
+
+/**
+ * Intended to be overridden for tests.  Make the producer from props
+ */
+protected KafkaProducer mkProducer(Properties props) {
+return new KafkaProducer<>(props);
+}
+
+@Override
+public void 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96733517
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A subscription to kafka.
+ */
+public abstract class Subscription implements Serializable {
+private static final long serialVersionUID = -216136367240198716L;
+
+/**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the 
subscription
+ */
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener, TopologyContext context) {
+   subscribe(consumer, listener);
+}
+
+/**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the 
subscription
+ * @deprecated please use the version with the TopologyContext in it
+ */
+public  void subscribe(KafkaConsumer consumer, 
ConsumerRebalanceListener listener) {
+   throw new IllegalStateException("At least one subscribe method 
must be overwritten");
+}
+
+/**
+ * @return a string representing the subscribed topics.
--- End diff --

Can this string be in any format? It seems to only be used in 
getComponentConfiguration, not really sure where that's being read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96728300
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -61,129 +66,244 @@
  * If no offset has been committed, it behaves as LATEST.
  * 
  * */
-public enum FirstPollOffsetStrategy {
+public static enum FirstPollOffsetStrategy {
 EARLIEST,
 LATEST,
 UNCOMMITTED_EARLIEST,
 UNCOMMITTED_LATEST }
-
-// Kafka consumer configuration
-private final Map kafkaProps;
-private final Deserializer keyDeserializer;
-private final Deserializer valueDeserializer;
-private final long pollTimeoutMs;
-
-// Kafka spout configuration
-private final long offsetCommitPeriodMs;
-private final int maxRetries;
-private final int maxUncommittedOffsets;
-private final long partitionRefreshPeriodMs;
-private final boolean manualPartitionAssignment;
-private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-private final KafkaSpoutStreams kafkaSpoutStreams;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-private KafkaSpoutConfig(Builder builder) {
-this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-this.keyDeserializer = builder.keyDeserializer;
-this.valueDeserializer = builder.valueDeserializer;
-this.pollTimeoutMs = builder.pollTimeoutMs;
-this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-this.maxRetries = builder.maxRetries;
-this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
-this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
-this.manualPartitionAssignment = builder.manualPartitionAssignment;
-this.tuplesBuilder = builder.tuplesBuilder;
-this.retryService = builder.retryService;
+
+public static Builder builder(String bootstrapServers, 
String ... topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
 }
-
-private Map setDefaultsAndGetKafkaProps(Map kafkaProps) {
+
+public static Builder builder(String bootstrapServers, 
Collection topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+public static Builder builder(String bootstrapServers, 
Pattern topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+private static Map 
setDefaultsAndGetKafkaProps(Map kafkaProps) {
 // set defaults for properties not specified
-if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
-kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+if 
(!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
 }
 return kafkaProps;
 }
-
+
 public static class Builder {
 private final Map kafkaProps;
-private SerializableDeserializer keyDeserializer;
-private SerializableDeserializer valueDeserializer;
+private Subscription subscription;
+private final SerializableDeserializer keyDes;
+private final Class> keyDesClazz;
+private final SerializableDeserializer valueDes;
+private final Class> valueDesClazz;
+private RecordTranslator translator;
 private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
 private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
 private int maxRetries = DEFAULT_MAX_RETRIES;
 private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-private final KafkaSpoutStreams kafkaSpoutStreams;
 private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
+private KafkaSpoutRetryService retryService = 
DEFAULT_RETRY_SERVICE;
 private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
-private boolean manualPartitionAssignment = false;
-private final 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96727056
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -609,4 +593,7 @@ public String toString() {
 '}';
 }
 }
+
+// === Timer ===
--- End diff --

This doesn't seem like it should be here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96734680
  
--- Diff: 
external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java ---
@@ -51,7 +51,9 @@
  * This bolt uses 0.8.2 Kafka Producer API.
  * 
  * It works for sending tuples to older Kafka version (0.8.1).
+ * @deprecated Please use the KafkaBolt in storm-kafka-client
  */
+@Deprecated
--- End diff --

Maybe it would be fine to just remove it here immediately and deprecate it 
in 1.1.0?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96729446
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -61,129 +66,244 @@
  * If no offset has been committed, it behaves as LATEST.
  * 
  * */
-public enum FirstPollOffsetStrategy {
+public static enum FirstPollOffsetStrategy {
 EARLIEST,
 LATEST,
 UNCOMMITTED_EARLIEST,
 UNCOMMITTED_LATEST }
-
-// Kafka consumer configuration
-private final Map kafkaProps;
-private final Deserializer keyDeserializer;
-private final Deserializer valueDeserializer;
-private final long pollTimeoutMs;
-
-// Kafka spout configuration
-private final long offsetCommitPeriodMs;
-private final int maxRetries;
-private final int maxUncommittedOffsets;
-private final long partitionRefreshPeriodMs;
-private final boolean manualPartitionAssignment;
-private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-private final KafkaSpoutStreams kafkaSpoutStreams;
-private final KafkaSpoutTuplesBuilder tuplesBuilder;
-private final KafkaSpoutRetryService retryService;
-
-private KafkaSpoutConfig(Builder builder) {
-this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-this.keyDeserializer = builder.keyDeserializer;
-this.valueDeserializer = builder.valueDeserializer;
-this.pollTimeoutMs = builder.pollTimeoutMs;
-this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-this.maxRetries = builder.maxRetries;
-this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
-this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
-this.manualPartitionAssignment = builder.manualPartitionAssignment;
-this.tuplesBuilder = builder.tuplesBuilder;
-this.retryService = builder.retryService;
+
+public static Builder builder(String bootstrapServers, 
String ... topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
 }
-
-private Map setDefaultsAndGetKafkaProps(Map kafkaProps) {
+
+public static Builder builder(String bootstrapServers, 
Collection topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+public static Builder builder(String bootstrapServers, 
Pattern topics) {
+return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+}
+
+private static Map 
setDefaultsAndGetKafkaProps(Map kafkaProps) {
 // set defaults for properties not specified
-if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
-kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+if 
(!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
 }
 return kafkaProps;
 }
-
+
 public static class Builder {
 private final Map kafkaProps;
-private SerializableDeserializer keyDeserializer;
-private SerializableDeserializer valueDeserializer;
+private Subscription subscription;
+private final SerializableDeserializer keyDes;
+private final Class> keyDesClazz;
+private final SerializableDeserializer valueDes;
+private final Class> valueDesClazz;
+private RecordTranslator translator;
 private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
 private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
 private int maxRetries = DEFAULT_MAX_RETRIES;
 private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-private final KafkaSpoutStreams kafkaSpoutStreams;
 private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
+private KafkaSpoutRetryService retryService = 
DEFAULT_RETRY_SERVICE;
 private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
-private boolean manualPartitionAssignment = false;
-private final 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96726868
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -412,8 +401,12 @@ public void activate() {
 
 private void subscribeKafkaConsumer() {
 kafkaConsumer = 
kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
-recordsFetcher = KafkaRecordsFetchers.create(kafkaSpoutConfig, 
kafkaConsumer, topologyContext,
-new KafkaSpoutConsumerRebalanceListener());
+
+kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new 
KafkaSpoutConsumerRebalanceListener(), context);
+
+// Initial poll to get the consumer registration process going.
+// KafkaSpoutConsumerRebalanceListener will be called following 
this poll, upon partition registration
+kafkaConsumer.poll(0);
--- End diff --

Doesn't this belong in the automatic subscriptions? It's not necessary if 
using manual partition assignment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96726512
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -225,11 +213,7 @@ public void nextTuple() {
 }
 
 if (poll()) {
-try {
-setWaitingToEmit(pollKafkaBroker());
-} catch (RetriableException e) {
-LOG.error("Failed to poll from kafka.", e);
-}
+setWaitingToEmit(pollKafkaBroker());
--- End diff --

I think @liurenjie1024 had decent reasoning for catching this, see 
https://github.com/apache/storm/pull/1835#discussion_r93105952.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96725508
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+public class ByTopicRecordTranslator implements RecordTranslator {
+private static final long serialVersionUID = -121699733778988688L;
+private final RecordTranslator defaultTranslator;
+private final Map> topicToTranslator = 
new HashMap<>();
+private final Map streamToFields = new HashMap<>();
+
+public ByTopicRecordTranslator(Func, 
List> func, Fields fields, String stream) {
--- End diff --

Nit: This class could do with some javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96725703
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
@@ -80,16 +76,14 @@
 private transient KafkaSpoutRetryService retryService;  // 
Class that has the logic to handle tuple failure
 private transient Timer commitTimer;// 
timer == null for auto commit mode
 private transient boolean initialized;  // 
Flag indicating that the spout is still undergoing initialization process.
-private transient KafkaRecordsFetcher recordsFetcher; // 
Class that encapsulates the logic of managing partitions and fetching records
 // Initialization is only complete after the first call to  
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
-private KafkaSpoutStreams kafkaSpoutStreams;// 
Object that wraps all the logic to declare output fields and emit tuples
-private transient KafkaSpoutTuplesBuilder tuplesBuilder;  // 
Object that contains the logic to build tuples for each ConsumerRecord
-
 transient Map acked;   // Tuples 
that were successfully acked. These tuples will be committed periodically when 
the commit timer expires, after consumer rebalance, or on close/deactivate
 private transient Set emitted; // 
Tuples that have been emitted but that are "on the wire", i.e. pending being 
acked or failed
 private transient Iterator> waitingToEmit;
 // Records that have been polled and are queued to be emitted in the 
nextTuple() call. One record is emitted per nextTuple()
 private transient long numUncommittedOffsets;   // 
Number of offsets that have been polled and emitted but not yet been committed
+   private transient TopologyContext context;
--- End diff --

Nit: The rest of the file uses spaces for indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96730015
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -235,41 +355,116 @@ public Builder(Map kafkaProps, 
KafkaSpoutStreams kafkaSpoutStrea
 this.firstPollOffsetStrategy = firstPollOffsetStrategy;
 return this;
 }
-
+
 /**
- * Sets partition refresh period in milliseconds in manual 
partition assignment model. Default is 2s.
- * @param partitionRefreshPeriodMs time in milliseconds
+ * Sets the retry service for the spout to use.
+ * @param retryService the new retry service
+ * @return the builder (this).
  */
-public Builder setPartitionRefreshPeriodMs(long 
partitionRefreshPeriodMs) {
-this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
+public Builder setRetry(KafkaSpoutRetryService retryService) 
{
+if (retryService == null) {
+throw new NullPointerException("retryService cannot be 
null");
+}
+this.retryService = retryService;
 return this;
 }
 
+public Builder setRecordTranslator(RecordTranslator 
translator) {
+this.translator = translator;
+return this;
+}
+
+public Builder setRecordTranslator(Func, List> func, Fields fields) {
--- End diff --

Would probably be good to put javadoc on this one and the one below, it's 
not obvious from the interface/parameter names what it does


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96721101
  
--- Diff: external/storm-kafka-client/README.md ---
@@ -1,191 +1,5 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar (This inclused 
the new Apache Kafka consumer API)
--- End diff --

inclused -> includes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96721821
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+throw new IllegalArgumentException("topic should be 
specified in bolt's configuration");
+}
+}
+
+producer = mkProducer(boltSpecfiedProperties);
+this.collector = collector;
+}
+
+/**
+ * Intended to be overridden for tests.  Make the producer from props
+ */
+protected KafkaProducer mkProducer(Properties props) {
+return new KafkaProducer<>(props);
+}
+
+@Override
+public void 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96712816
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,232 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka copnsumer API.
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatibility
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing an instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to 
select the topic should to publish a tuple to.
+A user just needs to specify the field name or field index for the topic 
name in the tuple itself.
+When the topic is name not found , the `Field*TopicSelector` will write 
messages into default topic .
+Please make sure the default topic has been created .
+
+### Specifying Kafka producer properties
+You can provide all the producer properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+These are also defined in 
`org.apache.kafka.clients.producer.ProducerConfig`
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96712487
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,232 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka copnsumer API.
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatibility
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing an instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to 
select the topic should to publish a tuple to.
+A user just needs to specify the field name or field index for the topic 
name in the tuple itself.
+When the topic is name not found , the `Field*TopicSelector` will write 
messages into default topic .
+Please make sure the default topic has been created .
+
+### Specifying Kafka producer properties
+You can provide all the producer properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+These are also defined in 
`org.apache.kafka.clients.producer.ProducerConfig`
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96713059
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,232 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka copnsumer API.
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatibility
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing an instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to 
select the topic should to publish a tuple to.
+A user just needs to specify the field name or field index for the topic 
name in the tuple itself.
+When the topic is name not found , the `Field*TopicSelector` will write 
messages into default topic .
+Please make sure the default topic has been created .
+
+### Specifying Kafka producer properties
+You can provide all the producer properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+These are also defined in 
`org.apache.kafka.clients.producer.ProducerConfig`
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96711515
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,232 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka copnsumer API.
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatibility
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing an instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to 
select the topic should to publish a tuple to.
+A user just needs to specify the field name or field index for the topic 
name in the tuple itself.
+When the topic is name not found , the `Field*TopicSelector` will write 
messages into default topic .
+Please make sure the default topic has been created .
+
+### Specifying Kafka producer properties
+You can provide all the producer properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+These are also defined in 
`org.apache.kafka.clients.producer.ProducerConfig`
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96719192
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+throw new IllegalArgumentException("topic should be 
specified in bolt's configuration");
+}
+}
+
+producer = mkProducer(boltSpecfiedProperties);
+this.collector = collector;
+}
+
+/**
+ * Intended to be overridden for tests.  Make the producer from props
--- End diff --

Create producer with props. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96719100
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+throw new IllegalArgumentException("topic should be 
specified in bolt's configuration");
+}
+}
+
+producer = mkProducer(boltSpecfiedProperties);
+this.collector = collector;
+}
+
+/**
+ * Intended to be overridden for tests.  Make the producer from props
+ */
+protected KafkaProducer mkProducer(Properties props) {
--- End diff --

`newProducer` ?


---
If your project is set up for it, you can reply to this 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96713362
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,232 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka copnsumer API.
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatibility
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing an instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to 
select the topic should to publish a tuple to.
+A user just needs to specify the field name or field index for the topic 
name in the tuple itself.
+When the topic is name not found , the `Field*TopicSelector` will write 
messages into default topic .
+Please make sure the default topic has been created .
+
+### Specifying Kafka producer properties
+You can provide all the producer properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+These are also defined in 
`org.apache.kafka.clients.producer.ProducerConfig`
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96716770
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ * 
+ * This bolt uses 0.8.2 Kafka Producer API.
+ * 
+ * It works for sending tuples to older Kafka version (0.8.1).
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
--- End diff --

nit: `boltSpecifiedProps` or `boltSpecificProps`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96720421
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
--- End diff --

Can you please add a comment saying to look at the comments in the set 
method? I missed those comments at first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96710739
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,232 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka copnsumer API.
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatibility
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing an instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to 
select the topic should to publish a tuple to.
+A user just needs to specify the field name or field index for the topic 
name in the tuple itself.
+When the topic is name not found , the `Field*TopicSelector` will write 
messages into default topic .
+Please make sure the default topic has been created .
+
+### Specifying Kafka producer properties
+You can provide all the producer properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+These are also defined in 
`org.apache.kafka.clients.producer.ProducerConfig`
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96717295
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
--- End diff --

Is this method intended to set KafkaProducerProperties only? If so, 
wouldn't it be beneficial to call this method `withKafkaProducerProps`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96694593
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,232 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka copnsumer API.
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatibility
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing an instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to 
select the topic should to publish a tuple to.
+A user just needs to specify the field name or field index for the topic 
name in the tuple itself.
+When the topic is name not found , the `Field*TopicSelector` will write 
messages into default topic .
+Please make sure the default topic has been created .
+
+### Specifying Kafka producer properties
+You can provide all the producer properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+These are also defined in 
`org.apache.kafka.clients.producer.ProducerConfig`
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-18 Thread hmcl
Github user hmcl commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r96694426
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,232 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka copnsumer API.
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatibility
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing an instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` can be used to 
select the topic should to publish a tuple to.
+A user just needs to specify the field name or field index for the topic 
name in the tuple itself.
+When the topic is name not found , the `Field*TopicSelector` will write 
messages into default topic .
+Please make sure the default topic has been created .
+
+### Specifying Kafka producer properties
+You can provide all the producer properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+These are also defined in 
`org.apache.kafka.clients.producer.ProducerConfig`
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-07 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r95055015
  
--- Diff: external/storm-kafka-client/README.md ---
@@ -1,191 +1,5 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+Spouts and Bolts used to interact with Kafka thought the kafka-client 
library.
--- End diff --

Nit: "Thought" should be "through"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2017-01-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r94917566
  
--- Diff: external/storm-kafka-client/README.md ---
@@ -1,191 +1,3 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
-
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
-
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
-
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
-
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
-
-
-# Usage Examples
-
-### Create a Kafka Spout
-
-The code snippet bellow is extracted from the example in the module [test] 
(https://github.com/apache/storm/tree/master/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test).
 The code that is common for named topics and topic wildcards is in the first 
box. The specific implementations are in the appropriate section. 
-
-These snippets serve as a reference and do not compile. If you would like 
to reuse this code in your implementation, please obtain it from the test 
module, where it is complete.
-
-```java
-KafkaSpout kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
-
-KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService)
-.setOffsetCommitPeriodMs(10_000)
-.setFirstPollOffsetStrategy(EARLIEST)
-.setMaxUncommittedOffsets(250)
-.build();
-
-Map kafkaConsumerProps= new HashMap<>();

-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS,"127.0.0.1:9092");

-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.GROUP_ID,"kafkaSpoutTestGroup");

-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");

-kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
-
-KafkaSpoutRetryService retryService = new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
-KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), 
Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
-```
-
-### Named Topics
-```java
-KafkaSpoutStreams kafkaSpoutStreams = new 
KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new 
String[]{TOPICS[0], TOPICS[1]})
-.addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  
// contents of topic test2 sent to test_stream
-.addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) 
 // contents of topic test2 sent to test2_stream
-.build();
-
-KafkaSpoutTuplesBuilder tuplesBuilder = new 
KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
-new TopicsTest0Test1TupleBuilder(TOPICS[0], 
TOPICS[1]),
-new TopicTest2TupleBuilder(TOPICS[2]))
-.build();
-
-String[] STREAMS = new String[]{"test_stream", "test1_stream", 
"test2_stream"};
-String[] TOPICS = new String[]{"test", "test1", "test2"};
-
-Fields outputFields = new Fields("topic", "partition", "offset", "key", 
"value");
-Fields outputFields1 = new Fields("topic", "partition", "offset");
-```
-
-### Topic Wildcards
-```java
-KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreamsWildcardTopics(
-new KafkaSpoutStream(outputFields, STREAM, 
Pattern.compile(TOPIC_WILDCARD_PATTERN)));
-
-KafkaSpoutTuplesBuilder tuplesBuilder = new 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91622857
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.Testing;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.MkTupleParam;
+import org.apache.storm.tuple.Tuple;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaBoltTest {
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
+
+@SuppressWarnings({ "unchecked", "serial" })
+@Test
+public void testSimple() {
+final KafkaProducer producer = 
mock(KafkaProducer.class);
+when(producer.send(any(), any())).thenAnswer(new Answer() {
+@Override
+public Object answer(InvocationOnMock invocation) throws 
Throwable {
+Callback c = (Callback)invocation.getArguments()[1];
+c.onCompletion(null, null);
+return null;
+}
+});
+KafkaBolt bolt = new KafkaBolt() {
--- End diff --

Right, can't do that. Sorry :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91621754
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ * 
+ * This bolt uses 0.8.2 Kafka Producer API.
+ * 
+ * It works for sending tuples to older Kafka version (0.8.1).
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+/**
+ * With default setting for fireAndForget and async, the callback is 
called when the sending succeeds.
+ * By setting fireAndForget true, the send will not wait at all for 
kafka to ack.
+ * "acks" setting in 0.8.2 Producer API config doesn't matter if 
fireAndForget is set.
+ * By setting async false, synchronous sending is used. 
+ */
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+throw 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-08 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91618249
  
--- Diff: 
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
 ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.Testing;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.MkTupleParam;
+import org.apache.storm.tuple.Tuple;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaBoltTest {
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBoltTest.class);
+
+@SuppressWarnings({ "unchecked", "serial" })
+@Test
+public void testSimple() {
+final KafkaProducer producer = 
mock(KafkaProducer.class);
+when(producer.send(any(), any())).thenAnswer(new Answer() {
+@Override
+public Object answer(InvocationOnMock invocation) throws 
Throwable {
+Callback c = (Callback)invocation.getArguments()[1];
+c.onCompletion(null, null);
+return null;
+}
+});
+KafkaBolt bolt = new KafkaBolt() {
--- End diff --

In this case because I am subclassing KafkaBolt it the compiler(at least in 
eclipse) actually complains that I am not allowed to do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-08 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91613603
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
 ---
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * 
+ * It expects the producer configuration and topic in storm config under
+ * 
+ * 'kafka.broker.properties' and 'topic'
+ * 
+ * respectively.
+ * 
+ * This bolt uses 0.8.2 Kafka Producer API.
+ * 
+ * It works for sending tuples to older Kafka version (0.8.1).
+ */
+public class KafkaBolt extends BaseRichBolt {
+private static final long serialVersionUID = -5205886631877033478L;
+
+private static final Logger LOG = 
LoggerFactory.getLogger(KafkaBolt.class);
+
+public static final String TOPIC = "topic";
+
+private KafkaProducer producer;
+private OutputCollector collector;
+private TupleToKafkaMapper mapper;
+private KafkaTopicSelector topicSelector;
+private Properties boltSpecfiedProperties = new Properties();
+/**
+ * With default setting for fireAndForget and async, the callback is 
called when the sending succeeds.
+ * By setting fireAndForget true, the send will not wait at all for 
kafka to ack.
+ * "acks" setting in 0.8.2 Producer API config doesn't matter if 
fireAndForget is set.
+ * By setting async false, synchronous sending is used. 
+ */
+private boolean fireAndForget = false;
+private boolean async = true;
+
+public KafkaBolt() {}
+
+public KafkaBolt withTupleToKafkaMapper(TupleToKafkaMapper 
mapper) {
+this.mapper = mapper;
+return this;
+}
+
+public KafkaBolt withTopicSelector(String topic) {
+return withTopicSelector(new DefaultTopicSelector(topic));
+}
+
+public KafkaBolt withTopicSelector(KafkaTopicSelector selector) {
+this.topicSelector = selector;
+return this;
+}
+
+public KafkaBolt withProducerProperties(Properties 
producerProperties) {
+this.boltSpecfiedProperties = producerProperties;
+return this;
+}
+
+@Override
+public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+//for backward compatibility.
+if(mapper == null) {
+this.mapper = new FieldNameBasedTupleToKafkaMapper();
+}
+
+//for backward compatibility.
+if(topicSelector == null) {
+if(stormConf.containsKey(TOPIC)) {
+this.topicSelector = new DefaultTopicSelector((String) 
stormConf.get(TOPIC));
+} else {
+throw 

[GitHub] storm pull request #1808: STORM-2225: change spout config to be simpler.

2016-12-08 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1808#discussion_r91592134
  
--- Diff: docs/storm-kafka-client.md ---
@@ -1,90 +1,222 @@
-#Storm Kafka Spout with New Kafka Consumer API
+#Storm Kafka integration using the kafka-client jar
 
-Apache Storm Spout implementation to consume data from Apache Kafka 
versions 0.10 onwards (please see [Apache Kafka Version Compatibility] 
(#compatibility)). 
+##Compatability
 
-The Apache Storm Spout allows clients to consume data from Kafka starting 
at offsets as defined by the offset strategy specified in 
`FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from 
the offset that matches the chosen `FirstPollOffsetStrategy`.
+Apache Kafka versions 0.10 onwards (please see [Apache Kafka Version 
Compatibility] (#compatibility)). 
 
-The Kafka Spout implementation allows you to specify the stream 
(`KafkaSpoutStream`) associated with each topic or topic wildcard. 
`KafkaSpoutStream` represents the stream and output fields. For named topics 
use `KafkaSpoutStreamsNamedTopics`, and for topic wildcards use 
`KafkaSpoutStreamsWildcardTopics`. 
+##Writing to Kafka as part of your topology
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and 
attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, 
org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s 
from `ConsumerRecord`s. The logic is provided by the user through implementing 
the appropriate number of `KafkaSpoutTupleBuilder` instances. For named topics 
use `KafkaSpoutTuplesBuilderNamedTopics`, and for topic wildcard use 
`KafkaSpoutTuplesBuilderWildcardTopics`.
+You need to provide implementations for the following 2 interfaces
 
-Multiple topics and topic wildcards can use the same 
`KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s 
from `ConsumerRecord`s is identical.
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
 
+```java
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to a Kafka 
key and a Kafka message. If you just want one field
+as key and one field as value, then you can use the provided 
FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a 
field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper 
for backward compatibility
+reasons. Alternatively you could also specify a different key and message 
field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key 
and message as there is no default constructor.
+These should be specified while constructing and instance of 
FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the 
tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one 
static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+`FieldNameTopicSelector` and `FieldIndexTopicSelector` use to support 
decided which topic should to push message from tuple.
+User could specify the field name or field index in tuple ,selector will 
use this value as topic name which to publish message.
+When the topic name not found , `KafkaBolt` will write messages into 
default topic .
+Please make sure the default topic have created .
+
+### Specifying Kafka producer properties
+You can provide all the produce properties in your Storm topology by 
calling `KafkaBolt.withProducerProperties()` and 
`TridentKafkaStateFactory.withProducerProperties()`. Please see  
http://kafka.apache.org/documentation.html#newproducerconfigs
+Section "Important configuration properties for the producer" for more 
details.
+
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+ Config config = new Config();
+ config.put("kafka.topic.wildcard.match",true);
 
-# Usage Examples
+```
 
-### Create a Kafka Spout
+After this you can specify a wildcard 

  1   2   >