storm git commit: Added STORM-2098 to CHANGELOG.md [Forced Update!]

2016-09-18 Thread satishd
Repository: storm
Updated Branches:
  refs/heads/master 2e0ac3d68 -> 2b08b95aa (forced update)


Added STORM-2098 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2b08b95a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2b08b95a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2b08b95a

Branch: refs/heads/master
Commit: 2b08b95aaf49f1a78e3b76bf97fdaa6484fd5961
Parents: 520270d
Author: Satish Duggana 
Authored: Mon Sep 19 08:50:22 2016 +0530
Committer: Satish Duggana 
Committed: Mon Sep 19 09:24:03 2016 +0530

--
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/2b08b95a/CHANGELOG.md
--
diff --git a/CHANGELOG.md b/CHANGELOG.md
index be0c4a9..0429be2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-2098: DruidBeamBolt: Pass DruidConfig.Builder as constructor argument
  * STORM-2067: Fix "array element type mismatch" from compute-executors in 
nimbus.clj
  * STORM-2054: DependencyResolver should be aware of relative path and 
absolute path
  * STORM-2052: Kafka Spout New Client API - Log Improvements and Parameter 
Tuning for Better Performance



[3/3] storm git commit: Added STORM-2089 to CHANGELOG.md

2016-09-18 Thread satishd
Added STORM-2089 to CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2e0ac3d6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2e0ac3d6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2e0ac3d6

Branch: refs/heads/master
Commit: 2e0ac3d68e5c88a14f2dc0c8e546c7e7358c9ae1
Parents: 520270d
Author: Satish Duggana 
Authored: Mon Sep 19 08:50:22 2016 +0530
Committer: Satish Duggana 
Committed: Mon Sep 19 08:50:22 2016 +0530

--
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/2e0ac3d6/CHANGELOG.md
--
diff --git a/CHANGELOG.md b/CHANGELOG.md
index be0c4a9..0429be2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-2098: DruidBeamBolt: Pass DruidConfig.Builder as constructor argument
  * STORM-2067: Fix "array element type mismatch" from compute-executors in 
nimbus.clj
  * STORM-2054: DependencyResolver should be aware of relative path and 
absolute path
  * STORM-2052: Kafka Spout New Client API - Log Improvements and Parameter 
Tuning for Better Performance



[1/3] storm git commit: STORM-2098: DruidBeamBolt: Pass DruidConfig.Builder as constructor argument

2016-09-18 Thread satishd
Repository: storm
Updated Branches:
  refs/heads/master dbe187845 -> 2e0ac3d68


STORM-2098: DruidBeamBolt: Pass DruidConfig.Builder as constructor argument


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8d57f611
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8d57f611
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8d57f611

Branch: refs/heads/master
Commit: 8d57f611bd66f8fec6b0f864e484e205400d3aaa
Parents: cd5c9e8
Author: Manikumar Reddy O 
Authored: Fri Sep 16 11:54:49 2016 +0530
Committer: Manikumar Reddy O 
Committed: Fri Sep 16 15:22:54 2016 +0530

--
 .../main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java   | 4 ++--
 .../java/org/apache/storm/druid/SampleDruidBoltTopology.java   | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/8d57f611/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
--
diff --git 
a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
 
b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
index 721eaa1..822b92c 100644
--- 
a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
+++ 
b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
@@ -55,9 +55,9 @@ public class DruidBeamBolt extends BaseRichBolt {
 private Tranquilizer tranquilizer = null;
 private ITupleDruidEventMapper druidEventMapper = null;
 
-public DruidBeamBolt(DruidBeamFactory beamFactory, 
ITupleDruidEventMapper druidEventMapper, DruidConfig druidConfig) {
+public DruidBeamBolt(DruidBeamFactory beamFactory, 
ITupleDruidEventMapper druidEventMapper, DruidConfig.Builder 
druidConfigBuilder) {
 this.beamFactory = beamFactory;
-this.druidConfig = druidConfig;
+this.druidConfig = druidConfigBuilder.build();
 this.druidEventMapper = druidEventMapper;
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/8d57f611/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
--
diff --git 
a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
 
b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
index 99a6f67..88b2bf1 100644
--- 
a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
+++ 
b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java
@@ -52,11 +52,11 @@ public class SampleDruidBoltTopology {
 
 topologyBuilder.setSpout("event-gen", new SimpleSpout(), 5);
 DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new 
HashMap());
-DruidConfig druidConfig = 
DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
+DruidConfig.Builder builder = 
DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID);
 ITupleDruidEventMapper> eventMapper = new 
TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
-DruidBeamBolt> druidBolt = new 
DruidBeamBolt>(druidBeamFactory, eventMapper, druidConfig);
+DruidBeamBolt> druidBolt = new 
DruidBeamBolt>(druidBeamFactory, eventMapper, builder);
 topologyBuilder.setBolt("druid-bolt", 
druidBolt).shuffleGrouping("event-gen");
-topologyBuilder.setBolt("printer-bolt", new 
PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
+topologyBuilder.setBolt("printer-bolt", new 
PrinterBolt()).shuffleGrouping("druid-bolt" , 
DruidConfig.DEFAULT_DISCARD_STREAM_ID);
 
 Config conf = new Config();
 conf.setDebug(true);



[2/3] storm git commit: Merge branch 'druid-changes' of https://github.com/omkreddy/storm into STORM-2098

2016-09-18 Thread satishd
Merge branch 'druid-changes' of https://github.com/omkreddy/storm into 
STORM-2098


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/520270d8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/520270d8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/520270d8

Branch: refs/heads/master
Commit: 520270d8783b1bb99d1508af445be5956d7f4818
Parents: dbe1878 8d57f61
Author: Satish Duggana 
Authored: Mon Sep 19 08:47:03 2016 +0530
Committer: Satish Duggana 
Committed: Mon Sep 19 08:47:03 2016 +0530

--
 .../main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java   | 4 ++--
 .../java/org/apache/storm/druid/SampleDruidBoltTopology.java   | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)
--




[2/4] storm git commit: STORM-2092: throw summarized error messages

2016-09-18 Thread kabhwan
STORM-2092: throw summarized error messages


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9552ea39
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9552ea39
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9552ea39

Branch: refs/heads/1.x-branch
Commit: 9552ea39a05320078c85acb7b45bcee6e7bb1b56
Parents: 345b4c2
Author: vesense 
Authored: Sun Sep 18 11:14:18 2016 +0800
Committer: Jungtaek Lim 
Committed: Mon Sep 19 08:46:21 2016 +0900

--
 .../storm/kafka/trident/TridentKafkaState.java   | 19 +--
 1 file changed, 13 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/9552ea39/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
--
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
index 7ff34cd..eb6737a 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -88,17 +88,24 @@ public class TridentKafkaState implements State {
 }
 }
 
-for (int i = 0 ; i < futures.size(); i++) {
-Future future = futures.get(i);
+List exceptions = new 
ArrayList<>(futures.size());
+for (Future future : futures) {
 try {
 future.get();
 } catch (ExecutionException e) {
-String errorMsg = "Could not retrieve result for message 
with key = "
-+ mapper.getKeyFromTuple(tuples.get(i)) + " from 
topic = " + topic;
-LOG.error(errorMsg, e);
-throw new FailedException(errorMsg, e);
+exceptions.add(e);
 }
 }
+
+if(exceptions.size() > 0){
+String errorMsg = "Could not retrieve result for messages " + 
tuples + " from topic = " + topic 
++ " because of the following exceptions: \n";
+for (ExecutionException exception : exceptions) {
+errorMsg = errorMsg + exception.getMessage() + "\n";
+}
+LOG.error(errorMsg);
+throw new FailedException(errorMsg);
+}
 } catch (Exception ex) {
 String errorMsg = "Could not send messages " + tuples + " to topic 
= " + topic;
 LOG.warn(errorMsg, ex);



[4/4] storm git commit: add STORM-2092 to CHANGELOG

2016-09-18 Thread kabhwan
add STORM-2092 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/da35dabd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/da35dabd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/da35dabd

Branch: refs/heads/1.x-branch
Commit: da35dabdd00535f8058eed2ce8fc33dccad171d3
Parents: 937c0e8
Author: Jungtaek Lim 
Authored: Mon Sep 19 08:46:41 2016 +0900
Committer: Jungtaek Lim 
Committed: Mon Sep 19 08:46:41 2016 +0900

--
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/da35dabd/CHANGELOG.md
--
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 302a6fe..4ed3ea7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-2092: optimize TridentKafkaState batch sending
  * STORM-1979: Storm Druid Connector implementation.
  * STORM-2057: Support JOIN statement in Storm SQL
  * STORM-1970: external project examples refator



[3/4] storm git commit: Merge branch 'STORM-2092' of https://github.com/vesense/storm into STORM-2092

2016-09-18 Thread kabhwan
Merge branch 'STORM-2092' of https://github.com/vesense/storm into STORM-2092


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3dd4b43b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3dd4b43b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3dd4b43b

Branch: refs/heads/master
Commit: 3dd4b43b130081afa796dde335674665ea09884d
Parents: cd5c9e8 ae3b90f
Author: Jungtaek Lim 
Authored: Mon Sep 19 08:43:56 2016 +0900
Committer: Jungtaek Lim 
Committed: Mon Sep 19 08:43:56 2016 +0900

--
 .../storm/kafka/trident/TridentKafkaState.java  | 43 +---
 1 file changed, 28 insertions(+), 15 deletions(-)
--




[1/4] storm git commit: STORM-2092: optimize TridentKafkaState batch sending

2016-09-18 Thread kabhwan
Repository: storm
Updated Branches:
  refs/heads/master cd5c9e8f9 -> dbe187845


STORM-2092: optimize TridentKafkaState batch sending


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9888bf6b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9888bf6b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9888bf6b

Branch: refs/heads/master
Commit: 9888bf6b994a29a2f18ac8126c249dd314ed764b
Parents: cd5c9e8
Author: vesense 
Authored: Tue Sep 13 17:22:48 2016 +0800
Committer: vesense 
Committed: Tue Sep 13 17:33:26 2016 +0800

--
 .../storm/kafka/trident/TridentKafkaState.java  | 36 
 1 file changed, 21 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/9888bf6b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
--
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
index 5741dc7..7ff34cd 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -31,6 +31,7 @@ import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.tuple.TridentTuple;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
@@ -73,30 +74,35 @@ public class TridentKafkaState implements State {
 
 public void updateState(List tuples, TridentCollector 
collector) {
 String topic = null;
-for (TridentTuple tuple : tuples) {
-try {
+try {
+List futures = new 
ArrayList<>(tuples.size());
+for (TridentTuple tuple : tuples) {
 topic = topicSelector.getTopic(tuple);
 
 if(topic != null) {
 Future result = producer.send(new 
ProducerRecord(topic,
 mapper.getKeyFromTuple(tuple), 
mapper.getMessageFromTuple(tuple)));
-try {
-result.get();
-} catch (ExecutionException e) {
-String errorMsg = "Could not retrieve result for 
message with key = "
-+ mapper.getKeyFromTuple(tuple) + " from topic 
= " + topic;
-LOG.error(errorMsg, e);
-throw new FailedException(errorMsg, e);
-}
+futures.add(result);
 } else {
 LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) 
+ ", topic selector returned null.");
 }
-} catch (Exception ex) {
-String errorMsg = "Could not send message with key = " + 
mapper.getKeyFromTuple(tuple)
-+ " to topic = " + topic;
-LOG.warn(errorMsg, ex);
-throw new FailedException(errorMsg, ex);
 }
+
+for (int i = 0 ; i < futures.size(); i++) {
+Future future = futures.get(i);
+try {
+future.get();
+} catch (ExecutionException e) {
+String errorMsg = "Could not retrieve result for message 
with key = "
++ mapper.getKeyFromTuple(tuples.get(i)) + " from 
topic = " + topic;
+LOG.error(errorMsg, e);
+throw new FailedException(errorMsg, e);
+}
+}
+} catch (Exception ex) {
+String errorMsg = "Could not send messages " + tuples + " to topic 
= " + topic;
+LOG.warn(errorMsg, ex);
+throw new FailedException(errorMsg, ex);
 }
 }
 }



[3/4] storm git commit: Merge branch 'STORM-2092-1.x' into 1.x-branch

2016-09-18 Thread kabhwan
Merge branch 'STORM-2092-1.x' into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/937c0e8b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/937c0e8b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/937c0e8b

Branch: refs/heads/1.x-branch
Commit: 937c0e8bc4fa71eeab84359b50b6ee6dee79f8b1
Parents: bf5d5e6 9552ea3
Author: Jungtaek Lim 
Authored: Mon Sep 19 08:46:27 2016 +0900
Committer: Jungtaek Lim 
Committed: Mon Sep 19 08:46:27 2016 +0900

--
 .../storm/kafka/trident/TridentKafkaState.java  | 43 +---
 1 file changed, 28 insertions(+), 15 deletions(-)
--




[4/4] storm git commit: add STORM-2092 to CHANGELOG

2016-09-18 Thread kabhwan
add STORM-2092 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dbe18784
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dbe18784
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dbe18784

Branch: refs/heads/master
Commit: dbe1878453f63a01cc143134975a90e0da1fb378
Parents: 3dd4b43
Author: Jungtaek Lim 
Authored: Mon Sep 19 08:45:23 2016 +0900
Committer: Jungtaek Lim 
Committed: Mon Sep 19 08:45:23 2016 +0900

--
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/dbe18784/CHANGELOG.md
--
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 17ba7a1..be0c4a9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -147,6 +147,7 @@
  * STORM-1769: Added a test to check local nimbus with notifier plugin
 
 ## 1.1.0
+ * STORM-2092: optimize TridentKafkaState batch sending
  * STORM-1979: Storm Druid Connector implementation.
  * STORM-2057: Support JOIN statement in Storm SQL
  * STORM-1970: external project examples refator



[2/4] storm git commit: STORM-2092: throw summarized error messages

2016-09-18 Thread kabhwan
STORM-2092: throw summarized error messages


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ae3b90fd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ae3b90fd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ae3b90fd

Branch: refs/heads/master
Commit: ae3b90fdfd22dabfe2f72155f48802e1f1cb2d19
Parents: 9888bf6
Author: vesense 
Authored: Sun Sep 18 11:14:18 2016 +0800
Committer: vesense 
Committed: Sun Sep 18 11:19:14 2016 +0800

--
 .../storm/kafka/trident/TridentKafkaState.java   | 19 +--
 1 file changed, 13 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/ae3b90fd/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
--
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
index 7ff34cd..eb6737a 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -88,17 +88,24 @@ public class TridentKafkaState implements State {
 }
 }
 
-for (int i = 0 ; i < futures.size(); i++) {
-Future future = futures.get(i);
+List exceptions = new 
ArrayList<>(futures.size());
+for (Future future : futures) {
 try {
 future.get();
 } catch (ExecutionException e) {
-String errorMsg = "Could not retrieve result for message 
with key = "
-+ mapper.getKeyFromTuple(tuples.get(i)) + " from 
topic = " + topic;
-LOG.error(errorMsg, e);
-throw new FailedException(errorMsg, e);
+exceptions.add(e);
 }
 }
+
+if(exceptions.size() > 0){
+String errorMsg = "Could not retrieve result for messages " + 
tuples + " from topic = " + topic 
++ " because of the following exceptions: \n";
+for (ExecutionException exception : exceptions) {
+errorMsg = errorMsg + exception.getMessage() + "\n";
+}
+LOG.error(errorMsg);
+throw new FailedException(errorMsg);
+}
 } catch (Exception ex) {
 String errorMsg = "Could not send messages " + tuples + " to topic 
= " + topic;
 LOG.warn(errorMsg, ex);



[1/4] storm git commit: STORM-2092: optimize TridentKafkaState batch sending

2016-09-18 Thread kabhwan
Repository: storm
Updated Branches:
  refs/heads/1.x-branch bf5d5e644 -> da35dabdd


STORM-2092: optimize TridentKafkaState batch sending


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/345b4c2d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/345b4c2d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/345b4c2d

Branch: refs/heads/1.x-branch
Commit: 345b4c2dbf681d562c83de25798db2af88b42ad2
Parents: bf5d5e6
Author: vesense 
Authored: Tue Sep 13 17:22:48 2016 +0800
Committer: Jungtaek Lim 
Committed: Mon Sep 19 08:46:18 2016 +0900

--
 .../storm/kafka/trident/TridentKafkaState.java  | 36 
 1 file changed, 21 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/345b4c2d/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
--
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
index 5741dc7..7ff34cd 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -31,6 +31,7 @@ import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.tuple.TridentTuple;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
@@ -73,30 +74,35 @@ public class TridentKafkaState implements State {
 
 public void updateState(List tuples, TridentCollector 
collector) {
 String topic = null;
-for (TridentTuple tuple : tuples) {
-try {
+try {
+List futures = new 
ArrayList<>(tuples.size());
+for (TridentTuple tuple : tuples) {
 topic = topicSelector.getTopic(tuple);
 
 if(topic != null) {
 Future result = producer.send(new 
ProducerRecord(topic,
 mapper.getKeyFromTuple(tuple), 
mapper.getMessageFromTuple(tuple)));
-try {
-result.get();
-} catch (ExecutionException e) {
-String errorMsg = "Could not retrieve result for 
message with key = "
-+ mapper.getKeyFromTuple(tuple) + " from topic 
= " + topic;
-LOG.error(errorMsg, e);
-throw new FailedException(errorMsg, e);
-}
+futures.add(result);
 } else {
 LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) 
+ ", topic selector returned null.");
 }
-} catch (Exception ex) {
-String errorMsg = "Could not send message with key = " + 
mapper.getKeyFromTuple(tuple)
-+ " to topic = " + topic;
-LOG.warn(errorMsg, ex);
-throw new FailedException(errorMsg, ex);
 }
+
+for (int i = 0 ; i < futures.size(); i++) {
+Future future = futures.get(i);
+try {
+future.get();
+} catch (ExecutionException e) {
+String errorMsg = "Could not retrieve result for message 
with key = "
++ mapper.getKeyFromTuple(tuples.get(i)) + " from 
topic = " + topic;
+LOG.error(errorMsg, e);
+throw new FailedException(errorMsg, e);
+}
+}
+} catch (Exception ex) {
+String errorMsg = "Could not send messages " + tuples + " to topic 
= " + topic;
+LOG.warn(errorMsg, ex);
+throw new FailedException(errorMsg, ex);
 }
 }
 }