[1/4] storm git commit: STORM-2092: optimize TridentKafkaState batch sending
Repository: storm Updated Branches: refs/heads/master cd5c9e8f9 -> dbe187845 STORM-2092: optimize TridentKafkaState batch sending Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9888bf6b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9888bf6b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9888bf6b Branch: refs/heads/master Commit: 9888bf6b994a29a2f18ac8126c249dd314ed764b Parents: cd5c9e8 Author: vesenseAuthored: Tue Sep 13 17:22:48 2016 +0800 Committer: vesense Committed: Tue Sep 13 17:33:26 2016 +0800 -- .../storm/kafka/trident/TridentKafkaState.java | 36 1 file changed, 21 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/9888bf6b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java index 5741dc7..7ff34cd 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java @@ -31,6 +31,7 @@ import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.State; import org.apache.storm.trident.tuple.TridentTuple; +import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -73,30 +74,35 @@ public class TridentKafkaState implements State { public void updateState(List tuples, TridentCollector collector) { String topic = null; -for (TridentTuple tuple : tuples) { -try { +try { +List futures = new ArrayList<>(tuples.size()); +for (TridentTuple tuple : tuples) { topic = topicSelector.getTopic(tuple); if(topic != null) { Future result = producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple))); -try { -result.get(); -} catch (ExecutionException e) { -String errorMsg = "Could not retrieve result for message with key = " -+ mapper.getKeyFromTuple(tuple) + " from topic = " + topic; -LOG.error(errorMsg, e); -throw new FailedException(errorMsg, e); -} +futures.add(result); } else { LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null."); } -} catch (Exception ex) { -String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple) -+ " to topic = " + topic; -LOG.warn(errorMsg, ex); -throw new FailedException(errorMsg, ex); } + +for (int i = 0 ; i < futures.size(); i++) { +Future future = futures.get(i); +try { +future.get(); +} catch (ExecutionException e) { +String errorMsg = "Could not retrieve result for message with key = " ++ mapper.getKeyFromTuple(tuples.get(i)) + " from topic = " + topic; +LOG.error(errorMsg, e); +throw new FailedException(errorMsg, e); +} +} +} catch (Exception ex) { +String errorMsg = "Could not send messages " + tuples + " to topic = " + topic; +LOG.warn(errorMsg, ex); +throw new FailedException(errorMsg, ex); } } }
[1/4] storm git commit: STORM-2092: optimize TridentKafkaState batch sending
Repository: storm Updated Branches: refs/heads/1.x-branch bf5d5e644 -> da35dabdd STORM-2092: optimize TridentKafkaState batch sending Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/345b4c2d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/345b4c2d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/345b4c2d Branch: refs/heads/1.x-branch Commit: 345b4c2dbf681d562c83de25798db2af88b42ad2 Parents: bf5d5e6 Author: vesenseAuthored: Tue Sep 13 17:22:48 2016 +0800 Committer: Jungtaek Lim Committed: Mon Sep 19 08:46:18 2016 +0900 -- .../storm/kafka/trident/TridentKafkaState.java | 36 1 file changed, 21 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/345b4c2d/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java index 5741dc7..7ff34cd 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java @@ -31,6 +31,7 @@ import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.State; import org.apache.storm.trident.tuple.TridentTuple; +import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -73,30 +74,35 @@ public class TridentKafkaState implements State { public void updateState(List tuples, TridentCollector collector) { String topic = null; -for (TridentTuple tuple : tuples) { -try { +try { +List futures = new ArrayList<>(tuples.size()); +for (TridentTuple tuple : tuples) { topic = topicSelector.getTopic(tuple); if(topic != null) { Future result = producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple))); -try { -result.get(); -} catch (ExecutionException e) { -String errorMsg = "Could not retrieve result for message with key = " -+ mapper.getKeyFromTuple(tuple) + " from topic = " + topic; -LOG.error(errorMsg, e); -throw new FailedException(errorMsg, e); -} +futures.add(result); } else { LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null."); } -} catch (Exception ex) { -String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple) -+ " to topic = " + topic; -LOG.warn(errorMsg, ex); -throw new FailedException(errorMsg, ex); } + +for (int i = 0 ; i < futures.size(); i++) { +Future future = futures.get(i); +try { +future.get(); +} catch (ExecutionException e) { +String errorMsg = "Could not retrieve result for message with key = " ++ mapper.getKeyFromTuple(tuples.get(i)) + " from topic = " + topic; +LOG.error(errorMsg, e); +throw new FailedException(errorMsg, e); +} +} +} catch (Exception ex) { +String errorMsg = "Could not send messages " + tuples + " to topic = " + topic; +LOG.warn(errorMsg, ex); +throw new FailedException(errorMsg, ex); } } }