note: the only way to discard messages from a queue is to consume them (as in ack them) or have them expire by setting a message expiry time. The setPendingMessageLimitStrategy and setMessageEvictionStrategy policys only apply to non durable topic subscriptions.
On 18 March 2013 23:39, jaikit <jktsa...@gmail.com> wrote: > Currently I have 2 consumers set up - which are consuming from Queues. I > have > disabled producerflowcontrol, setMemoryLimit to 1, queueprefetch = 10, > topicprefetch = 10. > > I have added infinite sleep in consumer1 and I ran the load test with > 900000 > events. My expectation is consumer1 discarding all events and consumer2 > consuming all events. However both the consumers are now blocked. Also all > the events are saved in both the queue. Each queue size is 900000. > > I have pasted my code snippet below. Can some one please guide me. Thanks. > > /* configure activemq broker. */ > broker.setDeleteAllMessagesOnStartup(true); > broker.setUseJmx(true); > broker.setAdvisorySupport(false); > PolicyMap policyMap = new PolicyMap(); > List<PolicyEntry> entries = new ArrayList<PolicyEntry>(); > PolicyEntry topicPolicy = new PolicyEntry(); > topicPolicy.setTopic(">"); > topicPolicy.setProducerFlowControl(false); > entries.add(topicPolicy); > > PolicyEntry queuePolicy = new PolicyEntry(); > /* if this is true and if consumers are slow - producers will be > throttled and worst case halted */ > queuePolicy.setProducerFlowControl(false); > /* set flow control for all topics */ > queuePolicy.setQueue(">"); > > queuePolicy.setMemoryLimit(1); > ConstantPendingMessageLimitStrategy > constantPendingMessageLimitStrategy = new > ConstantPendingMessageLimitStrategy(); > constantPendingMessageLimitStrategy.setLimit(1); > > > > queuePolicy.setPendingMessageLimitStrategy(constantPendingMessageLimitStrategy); > OldestMessageEvictionStrategy oldestMessageEvictionStrategy = new > OldestMessageEvictionStrategy(); > > oldestMessageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1); > > queuePolicy.setMessageEvictionStrategy(oldestMessageEvictionStrategy); > /* send an advisory message if a consumer is deemed slow */ > queuePolicy.setAdvisoryForSlowConsumers(false); > /* the period (in ms) of checks for message expiry on queued > messages, value of 0 disables */ > queuePolicy.setExpireMessagesPeriod(1000); > /* Set the PrefetchSize for all topics. You can override this value > while creating consumer. */ > // policy.setTopicPrefetch(10); > queuePolicy.setQueuePrefetch(10); > entries.add(queuePolicy); > policyMap.setPolicyEntries(entries); > broker.setDestinationPolicy(policyMap); > > /* All undeliverable messages will be sent to ActiveMQ.DLQ which > has > fixed size. If it reaches fixed size, > * producers will be throttled. Drop dead letter queue. Enable it > case by case basis. */ > // DiscardingDLQBrokerPlugin dlqBrokerPlugin = new > DiscardingDLQBrokerPlugin(); > // dlqBrokerPlugin.setDropAll(true); > // dlqBrokerPlugin.setDropTemporaryTopics(true); > // dlqBrokerPlugin.setDropTemporaryQueues(true); > // BrokerPlugin[] plugins = { dlqBrokerPlugin }; > // broker.setPlugins(plugins); > > VirtualTopic virtualTopic = new VirtualTopic(); > // the new config that enables selectors on the intercepter > virtualTopic.setSelectorAware(true); > VirtualDestinationInterceptor interceptor = new > VirtualDestinationInterceptor(); > interceptor.setVirtualDestinations(new VirtualDestination[] { > virtualTopic }); > broker.setDestinationInterceptors(new DestinationInterceptor[] { > interceptor }); > broker.start(); > > > > > -- > View this message in context: > http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664856.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > -- http://redhat.com http://blog.garytully.com