This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3e4cf070d0f866843650ded6bef8893b21e10276 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Aug 6 11:17:47 2019 +0200 CAMEL-10533: AggregateController - Add forceDiscardOfGroup method --- .../processor/aggregate/AggregateController.java | 15 +++++ .../processor/aggregate/AggregateProcessor.java | 65 +++++++++++++++++++++- .../aggregate/DefaultAggregateController.java | 17 ++++++ .../aggregator/AggregateControllerTest.java | 63 ++++++++++++++++++++- .../mbean/ManagedAggregateProcessorMBean.java | 6 ++ .../mbean/ManagedAggregateProcessor.java | 18 ++++++ 6 files changed, 182 insertions(+), 2 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java index bacaa8e..703065e 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java @@ -51,4 +51,19 @@ public interface AggregateController { */ int forceCompletionOfAllGroups(); + /** + * To force discarding a specific group by its key. + * + * @param key the key + * @return <tt>1</tt> if the group was forced discarded, <tt>0</tt> if the group does not exists + */ + int forceDiscardingOfGroup(String key); + + /** + * To force discardingof all groups + * + * @return number of groups that was forced discarded + */ + int forceDiscardingOfAllGroups(); + } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index c7dbb09..edcd71c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -799,7 +799,6 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat } } - log.debug("Processing aggregated exchange: {}", exchange); // add on completion task so we remember to update the inProgressCompleteExchanges @@ -1665,4 +1664,68 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat return total; } + public int forceDiscardingOfGroup(String key) { + // must acquire the shared aggregation lock to be able to trigger force completion + int total = 0; + + lock.lock(); + try { + Exchange exchange = aggregationRepository.get(camelContext, key); + if (exchange != null) { + total = 1; + log.trace("Force discarded triggered for correlation key: {}", key); + // force discarding by setting aggregate failed as true + onCompletion(key, exchange, exchange, false, true); + } + } finally { + lock.unlock(); + } + log.trace("Completed force discarded of group {}", key); + + if (total > 0) { + log.debug("Forcing discarding of group {} with {} exchanges", key, total); + } + return total; + } + + public int forceDiscardingOfAllGroups() { + + // only run if CamelContext has been fully started or is stopping + boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping(); + if (!allow) { + log.warn("Cannot start force discarding of all groups because CamelContext({}) has not been started", camelContext.getName()); + return 0; + } + + log.trace("Starting force discarding of all groups task"); + + // trigger completion for all in the repository + Set<String> keys = aggregationRepository.getKeys(); + + int total = 0; + if (keys != null && !keys.isEmpty()) { + // must acquire the shared aggregation lock to be able to trigger force completion + lock.lock(); + total = keys.size(); + try { + for (String key : keys) { + Exchange exchange = aggregationRepository.get(camelContext, key); + if (exchange != null) { + log.trace("Force discarded triggered for correlation key: {}", key); + // force discarding by setting aggregate failed as true + onCompletion(key, exchange, exchange, false, true); + } + } + } finally { + lock.unlock(); + } + } + log.trace("Completed force discarding of all groups task"); + + if (total > 0) { + log.debug("Forcing discarding of all groups with {} exchanges", total); + } + return total; + } + } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java index 1650133..6765bfd 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java @@ -51,4 +51,21 @@ public class DefaultAggregateController implements AggregateController { } } + @Override + public int forceDiscardingOfGroup(String key) { + if (processor != null) { + return processor.forceDiscardingOfGroup(key); + } else { + return 0; + } + } + + @Override + public int forceDiscardingOfAllGroups() { + if (processor != null) { + return processor.forceDiscardingOfAllGroups(); + } else { + return 0; + } + } } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java index be792be..00e348d 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java @@ -77,6 +77,67 @@ public class AggregateControllerTest extends ContextTestSupport { assertMockEndpointsSatisfied(); } + @Test + public void testForceDiscardingOfGroup() throws Exception { + getMockEndpoint("mock:aggregated").expectedMessageCount(1); + getMockEndpoint("mock:aggregated").expectedHeaderReceived("id", "1"); + // the first 5 messages are discarded + getMockEndpoint("mock:aggregated").message(0).body().startsWith("test6"); + + template.sendBodyAndHeader("direct:start", "test1", "id", "1"); + template.sendBodyAndHeader("direct:start", "test2", "id", "1"); + template.sendBodyAndHeader("direct:start", "test3", "id", "1"); + template.sendBodyAndHeader("direct:start", "test4", "id", "1"); + template.sendBodyAndHeader("direct:start", "test5", "id", "1"); + + int groups = getAggregateController().forceDiscardingOfGroup("1"); + assertEquals(1, groups); + + template.sendBodyAndHeader("direct:start", "test6", "id", "1"); + template.sendBodyAndHeader("direct:start", "test7", "id", "1"); + template.sendBodyAndHeader("direct:start", "test8", "id", "1"); + template.sendBodyAndHeader("direct:start", "test9", "id", "1"); + template.sendBodyAndHeader("direct:start", "test10", "id", "1"); + template.sendBodyAndHeader("direct:start", "test11", "id", "1"); + template.sendBodyAndHeader("direct:start", "test12", "id", "1"); + template.sendBodyAndHeader("direct:start", "test13", "id", "1"); + template.sendBodyAndHeader("direct:start", "test14", "id", "1"); + template.sendBodyAndHeader("direct:start", "test15", "id", "1"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testForceDiscardingOfAll() throws Exception { + getMockEndpoint("mock:aggregated").expectedMessageCount(1); + getMockEndpoint("mock:aggregated").expectedHeaderReceived("id", "1"); + // the first 5 messages are discarded + getMockEndpoint("mock:aggregated").message(0).body().startsWith("test6"); + + template.sendBodyAndHeader("direct:start", "test0", "id", "2"); + template.sendBodyAndHeader("direct:start", "test1", "id", "1"); + template.sendBodyAndHeader("direct:start", "test2", "id", "1"); + template.sendBodyAndHeader("direct:start", "test3", "id", "1"); + template.sendBodyAndHeader("direct:start", "test4", "id", "1"); + template.sendBodyAndHeader("direct:start", "test5", "id", "1"); + + int groups = getAggregateController().forceDiscardingOfAllGroups(); + assertEquals(2, groups); + + template.sendBodyAndHeader("direct:start", "test6", "id", "1"); + template.sendBodyAndHeader("direct:start", "test7", "id", "1"); + template.sendBodyAndHeader("direct:start", "test8", "id", "1"); + template.sendBodyAndHeader("direct:start", "test9", "id", "1"); + template.sendBodyAndHeader("direct:start", "test10", "id", "1"); + template.sendBodyAndHeader("direct:start", "test11", "id", "1"); + template.sendBodyAndHeader("direct:start", "test12", "id", "1"); + template.sendBodyAndHeader("direct:start", "test13", "id", "1"); + template.sendBodyAndHeader("direct:start", "test14", "id", "1"); + template.sendBodyAndHeader("direct:start", "test15", "id", "1"); + + assertMockEndpointsSatisfied(); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -85,7 +146,7 @@ public class AggregateControllerTest extends ContextTestSupport { from("direct:start") .aggregate(header("id"), new MyAggregationStrategy()).aggregateController(getAggregateController()) .completionSize(10) - .to("mock:aggregated"); + .to("log:aggregated", "mock:aggregated"); } }; } diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java index cae35d9..fabf1d3 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java @@ -99,6 +99,12 @@ public interface ManagedAggregateProcessorMBean extends ManagedProcessorMBean { @ManagedOperation(description = "To force complete of all groups") int forceCompletionOfAllGroups(); + @ManagedOperation(description = "To force discarding a specific group by its key") + int forceDiscardingOfGroup(String key); + + @ManagedOperation(description = "To force discarding of all groups") + int forceDiscardingOfAllGroups(); + @ManagedAttribute(description = "Current number of closed correlation keys in the memory cache") int getClosedCorrelationKeysCacheSize(); diff --git a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java index f77b4ec..e509319 100644 --- a/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java +++ b/core/camel-management-impl/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java @@ -225,6 +225,24 @@ public class ManagedAggregateProcessor extends ManagedProcessor implements Manag } @Override + public int forceDiscardingOfGroup(String key) { + if (processor.getAggregateController() != null) { + return processor.getAggregateController().forceDiscardingOfGroup(key); + } else { + return 0; + } + } + + @Override + public int forceDiscardingOfAllGroups() { + if (processor.getAggregateController() != null) { + return processor.getAggregateController().forceDiscardingOfAllGroups(); + } else { + return 0; + } + } + + @Override public int getClosedCorrelationKeysCacheSize() { return processor.getClosedCorrelationKeysCacheSize(); }