storm git commit: Added STORM-2098 to CHANGELOG.md [Forced Update!]
Repository: storm Updated Branches: refs/heads/master 2e0ac3d68 -> 2b08b95aa (forced update) Added STORM-2098 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2b08b95a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2b08b95a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2b08b95a Branch: refs/heads/master Commit: 2b08b95aaf49f1a78e3b76bf97fdaa6484fd5961 Parents: 520270d Author: Satish DugganaAuthored: Mon Sep 19 08:50:22 2016 +0530 Committer: Satish Duggana Committed: Mon Sep 19 09:24:03 2016 +0530 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/2b08b95a/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index be0c4a9..0429be2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 2.0.0 + * STORM-2098: DruidBeamBolt: Pass DruidConfig.Builder as constructor argument * STORM-2067: Fix "array element type mismatch" from compute-executors in nimbus.clj * STORM-2054: DependencyResolver should be aware of relative path and absolute path * STORM-2052: Kafka Spout New Client API - Log Improvements and Parameter Tuning for Better Performance
[3/3] storm git commit: Added STORM-2089 to CHANGELOG.md
Added STORM-2089 to CHANGELOG.md Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2e0ac3d6 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2e0ac3d6 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2e0ac3d6 Branch: refs/heads/master Commit: 2e0ac3d68e5c88a14f2dc0c8e546c7e7358c9ae1 Parents: 520270d Author: Satish DugganaAuthored: Mon Sep 19 08:50:22 2016 +0530 Committer: Satish Duggana Committed: Mon Sep 19 08:50:22 2016 +0530 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/2e0ac3d6/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index be0c4a9..0429be2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 2.0.0 + * STORM-2098: DruidBeamBolt: Pass DruidConfig.Builder as constructor argument * STORM-2067: Fix "array element type mismatch" from compute-executors in nimbus.clj * STORM-2054: DependencyResolver should be aware of relative path and absolute path * STORM-2052: Kafka Spout New Client API - Log Improvements and Parameter Tuning for Better Performance
[1/3] storm git commit: STORM-2098: DruidBeamBolt: Pass DruidConfig.Builder as constructor argument
Repository: storm Updated Branches: refs/heads/master dbe187845 -> 2e0ac3d68 STORM-2098: DruidBeamBolt: Pass DruidConfig.Builder as constructor argument Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8d57f611 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8d57f611 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8d57f611 Branch: refs/heads/master Commit: 8d57f611bd66f8fec6b0f864e484e205400d3aaa Parents: cd5c9e8 Author: Manikumar Reddy OAuthored: Fri Sep 16 11:54:49 2016 +0530 Committer: Manikumar Reddy O Committed: Fri Sep 16 15:22:54 2016 +0530 -- .../main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java | 4 ++-- .../java/org/apache/storm/druid/SampleDruidBoltTopology.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/8d57f611/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java -- diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java index 721eaa1..822b92c 100644 --- a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java +++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java @@ -55,9 +55,9 @@ public class DruidBeamBolt extends BaseRichBolt { private Tranquilizer tranquilizer = null; private ITupleDruidEventMapper druidEventMapper = null; -public DruidBeamBolt(DruidBeamFactory beamFactory, ITupleDruidEventMapper druidEventMapper, DruidConfig druidConfig) { +public DruidBeamBolt(DruidBeamFactory beamFactory, ITupleDruidEventMapper druidEventMapper, DruidConfig.Builder druidConfigBuilder) { this.beamFactory = beamFactory; -this.druidConfig = druidConfig; +this.druidConfig = druidConfigBuilder.build(); this.druidEventMapper = druidEventMapper; } http://git-wip-us.apache.org/repos/asf/storm/blob/8d57f611/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java -- diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java index 99a6f67..88b2bf1 100644 --- a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java +++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java @@ -52,11 +52,11 @@ public class SampleDruidBoltTopology { topologyBuilder.setSpout("event-gen", new SimpleSpout(), 5); DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap ()); -DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build(); +DruidConfig.Builder builder = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID); ITupleDruidEventMapper
[2/3] storm git commit: Merge branch 'druid-changes' of https://github.com/omkreddy/storm into STORM-2098
Merge branch 'druid-changes' of https://github.com/omkreddy/storm into STORM-2098 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/520270d8 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/520270d8 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/520270d8 Branch: refs/heads/master Commit: 520270d8783b1bb99d1508af445be5956d7f4818 Parents: dbe1878 8d57f61 Author: Satish DugganaAuthored: Mon Sep 19 08:47:03 2016 +0530 Committer: Satish Duggana Committed: Mon Sep 19 08:47:03 2016 +0530 -- .../main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java | 4 ++-- .../java/org/apache/storm/druid/SampleDruidBoltTopology.java | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) --
[2/4] storm git commit: STORM-2092: throw summarized error messages
STORM-2092: throw summarized error messages Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9552ea39 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9552ea39 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9552ea39 Branch: refs/heads/1.x-branch Commit: 9552ea39a05320078c85acb7b45bcee6e7bb1b56 Parents: 345b4c2 Author: vesenseAuthored: Sun Sep 18 11:14:18 2016 +0800 Committer: Jungtaek Lim Committed: Mon Sep 19 08:46:21 2016 +0900 -- .../storm/kafka/trident/TridentKafkaState.java | 19 +-- 1 file changed, 13 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/9552ea39/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java index 7ff34cd..eb6737a 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java @@ -88,17 +88,24 @@ public class TridentKafkaState implements State { } } -for (int i = 0 ; i < futures.size(); i++) { -Future future = futures.get(i); +List exceptions = new ArrayList<>(futures.size()); +for (Future future : futures) { try { future.get(); } catch (ExecutionException e) { -String errorMsg = "Could not retrieve result for message with key = " -+ mapper.getKeyFromTuple(tuples.get(i)) + " from topic = " + topic; -LOG.error(errorMsg, e); -throw new FailedException(errorMsg, e); +exceptions.add(e); } } + +if(exceptions.size() > 0){ +String errorMsg = "Could not retrieve result for messages " + tuples + " from topic = " + topic ++ " because of the following exceptions: \n"; +for (ExecutionException exception : exceptions) { +errorMsg = errorMsg + exception.getMessage() + "\n"; +} +LOG.error(errorMsg); +throw new FailedException(errorMsg); +} } catch (Exception ex) { String errorMsg = "Could not send messages " + tuples + " to topic = " + topic; LOG.warn(errorMsg, ex);
[4/4] storm git commit: add STORM-2092 to CHANGELOG
add STORM-2092 to CHANGELOG Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/da35dabd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/da35dabd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/da35dabd Branch: refs/heads/1.x-branch Commit: da35dabdd00535f8058eed2ce8fc33dccad171d3 Parents: 937c0e8 Author: Jungtaek LimAuthored: Mon Sep 19 08:46:41 2016 +0900 Committer: Jungtaek Lim Committed: Mon Sep 19 08:46:41 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/da35dabd/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 302a6fe..4ed3ea7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.1.0 + * STORM-2092: optimize TridentKafkaState batch sending * STORM-1979: Storm Druid Connector implementation. * STORM-2057: Support JOIN statement in Storm SQL * STORM-1970: external project examples refator
[3/4] storm git commit: Merge branch 'STORM-2092' of https://github.com/vesense/storm into STORM-2092
Merge branch 'STORM-2092' of https://github.com/vesense/storm into STORM-2092 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3dd4b43b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3dd4b43b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3dd4b43b Branch: refs/heads/master Commit: 3dd4b43b130081afa796dde335674665ea09884d Parents: cd5c9e8 ae3b90f Author: Jungtaek LimAuthored: Mon Sep 19 08:43:56 2016 +0900 Committer: Jungtaek Lim Committed: Mon Sep 19 08:43:56 2016 +0900 -- .../storm/kafka/trident/TridentKafkaState.java | 43 +--- 1 file changed, 28 insertions(+), 15 deletions(-) --
[1/4] storm git commit: STORM-2092: optimize TridentKafkaState batch sending
Repository: storm Updated Branches: refs/heads/master cd5c9e8f9 -> dbe187845 STORM-2092: optimize TridentKafkaState batch sending Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9888bf6b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9888bf6b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9888bf6b Branch: refs/heads/master Commit: 9888bf6b994a29a2f18ac8126c249dd314ed764b Parents: cd5c9e8 Author: vesenseAuthored: Tue Sep 13 17:22:48 2016 +0800 Committer: vesense Committed: Tue Sep 13 17:33:26 2016 +0800 -- .../storm/kafka/trident/TridentKafkaState.java | 36 1 file changed, 21 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/9888bf6b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java index 5741dc7..7ff34cd 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java @@ -31,6 +31,7 @@ import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.State; import org.apache.storm.trident.tuple.TridentTuple; +import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -73,30 +74,35 @@ public class TridentKafkaState implements State { public void updateState(List tuples, TridentCollector collector) { String topic = null; -for (TridentTuple tuple : tuples) { -try { +try { +List futures = new ArrayList<>(tuples.size()); +for (TridentTuple tuple : tuples) { topic = topicSelector.getTopic(tuple); if(topic != null) { Future result = producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple))); -try { -result.get(); -} catch (ExecutionException e) { -String errorMsg = "Could not retrieve result for message with key = " -+ mapper.getKeyFromTuple(tuple) + " from topic = " + topic; -LOG.error(errorMsg, e); -throw new FailedException(errorMsg, e); -} +futures.add(result); } else { LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null."); } -} catch (Exception ex) { -String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple) -+ " to topic = " + topic; -LOG.warn(errorMsg, ex); -throw new FailedException(errorMsg, ex); } + +for (int i = 0 ; i < futures.size(); i++) { +Future future = futures.get(i); +try { +future.get(); +} catch (ExecutionException e) { +String errorMsg = "Could not retrieve result for message with key = " ++ mapper.getKeyFromTuple(tuples.get(i)) + " from topic = " + topic; +LOG.error(errorMsg, e); +throw new FailedException(errorMsg, e); +} +} +} catch (Exception ex) { +String errorMsg = "Could not send messages " + tuples + " to topic = " + topic; +LOG.warn(errorMsg, ex); +throw new FailedException(errorMsg, ex); } } }
[3/4] storm git commit: Merge branch 'STORM-2092-1.x' into 1.x-branch
Merge branch 'STORM-2092-1.x' into 1.x-branch Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/937c0e8b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/937c0e8b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/937c0e8b Branch: refs/heads/1.x-branch Commit: 937c0e8bc4fa71eeab84359b50b6ee6dee79f8b1 Parents: bf5d5e6 9552ea3 Author: Jungtaek LimAuthored: Mon Sep 19 08:46:27 2016 +0900 Committer: Jungtaek Lim Committed: Mon Sep 19 08:46:27 2016 +0900 -- .../storm/kafka/trident/TridentKafkaState.java | 43 +--- 1 file changed, 28 insertions(+), 15 deletions(-) --
[4/4] storm git commit: add STORM-2092 to CHANGELOG
add STORM-2092 to CHANGELOG Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dbe18784 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dbe18784 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dbe18784 Branch: refs/heads/master Commit: dbe1878453f63a01cc143134975a90e0da1fb378 Parents: 3dd4b43 Author: Jungtaek LimAuthored: Mon Sep 19 08:45:23 2016 +0900 Committer: Jungtaek Lim Committed: Mon Sep 19 08:45:23 2016 +0900 -- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/storm/blob/dbe18784/CHANGELOG.md -- diff --git a/CHANGELOG.md b/CHANGELOG.md index 17ba7a1..be0c4a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -147,6 +147,7 @@ * STORM-1769: Added a test to check local nimbus with notifier plugin ## 1.1.0 + * STORM-2092: optimize TridentKafkaState batch sending * STORM-1979: Storm Druid Connector implementation. * STORM-2057: Support JOIN statement in Storm SQL * STORM-1970: external project examples refator
[2/4] storm git commit: STORM-2092: throw summarized error messages
STORM-2092: throw summarized error messages Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ae3b90fd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ae3b90fd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ae3b90fd Branch: refs/heads/master Commit: ae3b90fdfd22dabfe2f72155f48802e1f1cb2d19 Parents: 9888bf6 Author: vesenseAuthored: Sun Sep 18 11:14:18 2016 +0800 Committer: vesense Committed: Sun Sep 18 11:19:14 2016 +0800 -- .../storm/kafka/trident/TridentKafkaState.java | 19 +-- 1 file changed, 13 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/ae3b90fd/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java index 7ff34cd..eb6737a 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java @@ -88,17 +88,24 @@ public class TridentKafkaState implements State { } } -for (int i = 0 ; i < futures.size(); i++) { -Future future = futures.get(i); +List exceptions = new ArrayList<>(futures.size()); +for (Future future : futures) { try { future.get(); } catch (ExecutionException e) { -String errorMsg = "Could not retrieve result for message with key = " -+ mapper.getKeyFromTuple(tuples.get(i)) + " from topic = " + topic; -LOG.error(errorMsg, e); -throw new FailedException(errorMsg, e); +exceptions.add(e); } } + +if(exceptions.size() > 0){ +String errorMsg = "Could not retrieve result for messages " + tuples + " from topic = " + topic ++ " because of the following exceptions: \n"; +for (ExecutionException exception : exceptions) { +errorMsg = errorMsg + exception.getMessage() + "\n"; +} +LOG.error(errorMsg); +throw new FailedException(errorMsg); +} } catch (Exception ex) { String errorMsg = "Could not send messages " + tuples + " to topic = " + topic; LOG.warn(errorMsg, ex);
[1/4] storm git commit: STORM-2092: optimize TridentKafkaState batch sending
Repository: storm Updated Branches: refs/heads/1.x-branch bf5d5e644 -> da35dabdd STORM-2092: optimize TridentKafkaState batch sending Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/345b4c2d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/345b4c2d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/345b4c2d Branch: refs/heads/1.x-branch Commit: 345b4c2dbf681d562c83de25798db2af88b42ad2 Parents: bf5d5e6 Author: vesenseAuthored: Tue Sep 13 17:22:48 2016 +0800 Committer: Jungtaek Lim Committed: Mon Sep 19 08:46:18 2016 +0900 -- .../storm/kafka/trident/TridentKafkaState.java | 36 1 file changed, 21 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/storm/blob/345b4c2d/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java -- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java index 5741dc7..7ff34cd 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java @@ -31,6 +31,7 @@ import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.State; import org.apache.storm.trident.tuple.TridentTuple; +import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -73,30 +74,35 @@ public class TridentKafkaState implements State { public void updateState(List tuples, TridentCollector collector) { String topic = null; -for (TridentTuple tuple : tuples) { -try { +try { +List futures = new ArrayList<>(tuples.size()); +for (TridentTuple tuple : tuples) { topic = topicSelector.getTopic(tuple); if(topic != null) { Future result = producer.send(new ProducerRecord(topic, mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple))); -try { -result.get(); -} catch (ExecutionException e) { -String errorMsg = "Could not retrieve result for message with key = " -+ mapper.getKeyFromTuple(tuple) + " from topic = " + topic; -LOG.error(errorMsg, e); -throw new FailedException(errorMsg, e); -} +futures.add(result); } else { LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null."); } -} catch (Exception ex) { -String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple) -+ " to topic = " + topic; -LOG.warn(errorMsg, ex); -throw new FailedException(errorMsg, ex); } + +for (int i = 0 ; i < futures.size(); i++) { +Future future = futures.get(i); +try { +future.get(); +} catch (ExecutionException e) { +String errorMsg = "Could not retrieve result for message with key = " ++ mapper.getKeyFromTuple(tuples.get(i)) + " from topic = " + topic; +LOG.error(errorMsg, e); +throw new FailedException(errorMsg, e); +} +} +} catch (Exception ex) { +String errorMsg = "Could not send messages " + tuples + " to topic = " + topic; +LOG.warn(errorMsg, ex); +throw new FailedException(errorMsg, ex); } } }