[1/4] storm git commit: STORM-2092: optimize TridentKafkaState batch sending

2016-09-18 Thread kabhwan
Repository: storm
Updated Branches:
  refs/heads/master cd5c9e8f9 -> dbe187845


STORM-2092: optimize TridentKafkaState batch sending


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9888bf6b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9888bf6b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9888bf6b

Branch: refs/heads/master
Commit: 9888bf6b994a29a2f18ac8126c249dd314ed764b
Parents: cd5c9e8
Author: vesense 
Authored: Tue Sep 13 17:22:48 2016 +0800
Committer: vesense 
Committed: Tue Sep 13 17:33:26 2016 +0800

--
 .../storm/kafka/trident/TridentKafkaState.java  | 36 
 1 file changed, 21 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/9888bf6b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
--
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
index 5741dc7..7ff34cd 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -31,6 +31,7 @@ import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.tuple.TridentTuple;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
@@ -73,30 +74,35 @@ public class TridentKafkaState implements State {
 
 public void updateState(List tuples, TridentCollector 
collector) {
 String topic = null;
-for (TridentTuple tuple : tuples) {
-try {
+try {
+List futures = new 
ArrayList<>(tuples.size());
+for (TridentTuple tuple : tuples) {
 topic = topicSelector.getTopic(tuple);
 
 if(topic != null) {
 Future result = producer.send(new 
ProducerRecord(topic,
 mapper.getKeyFromTuple(tuple), 
mapper.getMessageFromTuple(tuple)));
-try {
-result.get();
-} catch (ExecutionException e) {
-String errorMsg = "Could not retrieve result for 
message with key = "
-+ mapper.getKeyFromTuple(tuple) + " from topic 
= " + topic;
-LOG.error(errorMsg, e);
-throw new FailedException(errorMsg, e);
-}
+futures.add(result);
 } else {
 LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) 
+ ", topic selector returned null.");
 }
-} catch (Exception ex) {
-String errorMsg = "Could not send message with key = " + 
mapper.getKeyFromTuple(tuple)
-+ " to topic = " + topic;
-LOG.warn(errorMsg, ex);
-throw new FailedException(errorMsg, ex);
 }
+
+for (int i = 0 ; i < futures.size(); i++) {
+Future future = futures.get(i);
+try {
+future.get();
+} catch (ExecutionException e) {
+String errorMsg = "Could not retrieve result for message 
with key = "
++ mapper.getKeyFromTuple(tuples.get(i)) + " from 
topic = " + topic;
+LOG.error(errorMsg, e);
+throw new FailedException(errorMsg, e);
+}
+}
+} catch (Exception ex) {
+String errorMsg = "Could not send messages " + tuples + " to topic 
= " + topic;
+LOG.warn(errorMsg, ex);
+throw new FailedException(errorMsg, ex);
 }
 }
 }



[1/4] storm git commit: STORM-2092: optimize TridentKafkaState batch sending

2016-09-18 Thread kabhwan
Repository: storm
Updated Branches:
  refs/heads/1.x-branch bf5d5e644 -> da35dabdd


STORM-2092: optimize TridentKafkaState batch sending


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/345b4c2d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/345b4c2d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/345b4c2d

Branch: refs/heads/1.x-branch
Commit: 345b4c2dbf681d562c83de25798db2af88b42ad2
Parents: bf5d5e6
Author: vesense 
Authored: Tue Sep 13 17:22:48 2016 +0800
Committer: Jungtaek Lim 
Committed: Mon Sep 19 08:46:18 2016 +0900

--
 .../storm/kafka/trident/TridentKafkaState.java  | 36 
 1 file changed, 21 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/storm/blob/345b4c2d/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
--
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
index 5741dc7..7ff34cd 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -31,6 +31,7 @@ import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.tuple.TridentTuple;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
@@ -73,30 +74,35 @@ public class TridentKafkaState implements State {
 
 public void updateState(List tuples, TridentCollector 
collector) {
 String topic = null;
-for (TridentTuple tuple : tuples) {
-try {
+try {
+List futures = new 
ArrayList<>(tuples.size());
+for (TridentTuple tuple : tuples) {
 topic = topicSelector.getTopic(tuple);
 
 if(topic != null) {
 Future result = producer.send(new 
ProducerRecord(topic,
 mapper.getKeyFromTuple(tuple), 
mapper.getMessageFromTuple(tuple)));
-try {
-result.get();
-} catch (ExecutionException e) {
-String errorMsg = "Could not retrieve result for 
message with key = "
-+ mapper.getKeyFromTuple(tuple) + " from topic 
= " + topic;
-LOG.error(errorMsg, e);
-throw new FailedException(errorMsg, e);
-}
+futures.add(result);
 } else {
 LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) 
+ ", topic selector returned null.");
 }
-} catch (Exception ex) {
-String errorMsg = "Could not send message with key = " + 
mapper.getKeyFromTuple(tuple)
-+ " to topic = " + topic;
-LOG.warn(errorMsg, ex);
-throw new FailedException(errorMsg, ex);
 }
+
+for (int i = 0 ; i < futures.size(); i++) {
+Future future = futures.get(i);
+try {
+future.get();
+} catch (ExecutionException e) {
+String errorMsg = "Could not retrieve result for message 
with key = "
++ mapper.getKeyFromTuple(tuples.get(i)) + " from 
topic = " + topic;
+LOG.error(errorMsg, e);
+throw new FailedException(errorMsg, e);
+}
+}
+} catch (Exception ex) {
+String errorMsg = "Could not send messages " + tuples + " to topic 
= " + topic;
+LOG.warn(errorMsg, ex);
+throw new FailedException(errorMsg, ex);
 }
 }
 }