Modified: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?rev=655323&r1=655322&r2=655323&view=diff ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original) +++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Sun May 11 08:22:03 2008 @@ -59,24 +59,6 @@ * <td> [EMAIL PROTECTED] Job}, [EMAIL PROTECTED] Job.JobCompletionHandler} * </table> * - * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events. - * The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread - * pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads - * so there is concurrency. But why go to the trouble of seperating out the read and write events in that case? - * Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these - * stages. - * - * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in - * it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run - * and trips off another batch of 10 until they are all done. Why not just have a straight forward - * consumer/producer queue scenario without the batches of 10? So instead of having many jobs with batches of 10 - * in them, just have one queue of events and worker threads taking the next event. There will be coordination - * between worker threads and new events arriving on the job anyway, so the simpler scenario may have the same - * amount of contention. I can see that the batches of 10 is done, so that no job is allowed to hog the worker - * pool for too long. I'm not convinced this fairly complex scheme will actually add anything, and it might be - * better to encapsulate it under a Queue interface anyway, so that different queue implementations can easily - * be substituted in. - * * @todo The static helper methods are pointless. Could just call new. */ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler @@ -95,17 +77,20 @@ private final int _maxEvents; + private final boolean _readFilter; + /** * Creates a named pooling filter, on the specified shared thread pool. * * @param refCountingPool The thread pool reference. * @param name The identifying name of the filter type. */ - public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents) + public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter) { _poolReference = refCountingPool; _name = name; _maxEvents = maxEvents; + _readFilter = readFilter; } /** @@ -166,7 +151,6 @@ void fireAsynchEvent(Job job, Event event) { - // job.acquire(); //prevents this job being removed from _jobs job.add(event); final ExecutorService pool = _poolReference.getPool(); @@ -200,7 +184,7 @@ */ public void createNewJobForSession(IoSession session) { - Job job = new Job(session, this, MAX_JOB_EVENTS); + Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter); session.setAttribute(_name, job); } @@ -216,18 +200,6 @@ return (Job) session.getAttribute(_name); } - /*private Job createJobForSession(IoSession session) - { - return addJobForSession(session, new Job(session, this, _maxEvents)); - }*/ - - /*private Job addJobForSession(IoSession session, Job job) - { - // atomic so ensures all threads agree on the same job - Job existing = _jobs.putIfAbsent(session, job); - - return (existing == null) ? job : existing; - }*/ /** * Implements a terminal continuation for the [EMAIL PROTECTED] Job} for this filter. Whenever the Job completes its processing @@ -238,16 +210,6 @@ */ public void completed(IoSession session, Job job) { - // if (job.isComplete()) - // { - // job.release(); - // if (!job.isReferenced()) - // { - // _jobs.remove(session); - // } - // } - // else - if (!job.isComplete()) { @@ -454,7 +416,7 @@ */ public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS)); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true); } /** @@ -497,7 +459,7 @@ */ public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS)); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false); } /**
Added: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java?rev=655323&view=auto ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java (added) +++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReadWriteJobQueue.java Sun May 11 08:22:03 2008 @@ -0,0 +1,432 @@ +package org.apache.qpid.pool; + +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.atomic.AtomicInteger; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public class ReadWriteJobQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> +{ + + private final AtomicInteger _count = new AtomicInteger(0); + + private final ReentrantLock _takeLock = new ReentrantLock(); + + private final Condition _notEmpty = _takeLock.newCondition(); + + private final ReentrantLock _putLock = new ReentrantLock(); + + private final ConcurrentLinkedQueue<Job> _readJobQueue = new ConcurrentLinkedQueue<Job>(); + + private final ConcurrentLinkedQueue<Job> _writeJobQueue = new ConcurrentLinkedQueue<Job>(); + + + private class ReadWriteJobIterator implements Iterator<Runnable> + { + + private boolean _onReads; + private Iterator<Job> _iter = _writeJobQueue.iterator(); + + public boolean hasNext() + { + if(!_iter.hasNext()) + { + if(_onReads) + { + _iter = _readJobQueue.iterator(); + _onReads = true; + return _iter.hasNext(); + } + else + { + return false; + } + } + else + { + return true; + } + } + + public Runnable next() + { + if(_iter.hasNext()) + { + return _iter.next(); + } + else + { + return null; + } + } + + public void remove() + { + _takeLock.lock(); + try + { + _iter.remove(); + _count.decrementAndGet(); + } + finally + { + _takeLock.unlock(); + } + } + } + + public Iterator<Runnable> iterator() + { + return new ReadWriteJobIterator(); + } + + public int size() + { + return _count.get(); + } + + public boolean offer(final Runnable runnable) + { + final Job job = (Job) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + try + { + if(job.isReadJob()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + return true; + } + finally + { + putLock.unlock(); + } + } + + public void put(final Runnable runnable) throws InterruptedException + { + final Job job = (Job) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + + try + { + if(job.isReadJob()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + + } + finally + { + putLock.unlock(); + } + } + + + + public boolean offer(final Runnable runnable, final long timeout, final TimeUnit unit) throws InterruptedException + { + final Job job = (Job) runnable; + final ReentrantLock putLock = _putLock; + putLock.lock(); + + try + { + if(job.isReadJob()) + { + _readJobQueue.offer(job); + } + else + { + _writeJobQueue.offer(job); + } + if(_count.getAndIncrement() == 0) + { + _takeLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _takeLock.unlock(); + } + } + + return true; + } + finally + { + putLock.unlock(); + } + + } + + public Runnable take() throws InterruptedException + { + final ReentrantLock takeLock = _takeLock; + takeLock.lockInterruptibly(); + try + { + try + { + while (_count.get() == 0) + { + _notEmpty.await(); + } + } + catch (InterruptedException ie) + { + _notEmpty.signal(); + throw ie; + } + + Job job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + int c = _count.getAndDecrement(); + if (c > 1) + { + _notEmpty.signal(); + } + return job; + } + finally + { + takeLock.unlock(); + } + + + } + + public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException + { + final ReentrantLock takeLock = _takeLock; + final AtomicInteger count = _count; + long nanos = unit.toNanos(timeout); + takeLock.lockInterruptibly(); + Job job = null; + try + { + + for (;;) + { + if (count.get() > 0) + { + job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + int c = count.getAndDecrement(); + if (c > 1) + { + _notEmpty.signal(); + } + break; + } + if (nanos <= 0) + { + return null; + } + try + { + nanos = _notEmpty.awaitNanos(nanos); + } + catch (InterruptedException ie) + { + _notEmpty.signal(); + throw ie; + } + } + } + finally + { + takeLock.unlock(); + } + + return job; + } + + public int remainingCapacity() + { + return Integer.MAX_VALUE; + } + + public int drainTo(final Collection<? super Runnable> c) + { + int total = 0; + + _putLock.lock(); + _takeLock.lock(); + try + { + Job job; + while((job = _writeJobQueue.peek())!= null) + { + c.add(job); + _writeJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + while((job = _readJobQueue.peek())!= null) + { + c.add(job); + _readJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + } + finally + { + _takeLock.unlock(); + _putLock.unlock(); + } + return total; + } + + public int drainTo(final Collection<? super Runnable> c, final int maxElements) + { + int total = 0; + + _putLock.lock(); + _takeLock.lock(); + try + { + Job job; + while(total<=maxElements && (job = _writeJobQueue.peek())!= null) + { + c.add(job); + _writeJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + while(total<=maxElements && (job = _readJobQueue.peek())!= null) + { + c.add(job); + _readJobQueue.poll(); + _count.decrementAndGet(); + total++; + } + + } + finally + { + _takeLock.unlock(); + _putLock.unlock(); + } + return total; + + } + + public Runnable poll() + { + final ReentrantLock takeLock = _takeLock; + takeLock.lock(); + try + { + if(_count.get() > 0) + { + Job job = _writeJobQueue.poll(); + if(job == null) + { + job = _readJobQueue.poll(); + } + _count.decrementAndGet(); + return job; + } + else + { + return null; + } + } + finally + { + takeLock.unlock(); + } + + } + + public Runnable peek() + { + final ReentrantLock takeLock = _takeLock; + takeLock.lock(); + try + { + Job job = _writeJobQueue.peek(); + if(job == null) + { + job = _readJobQueue.peek(); + } + return job; + } + finally + { + takeLock.unlock(); + } + } +} Modified: incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java?rev=655323&r1=655322&r2=655323&view=diff ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java (original) +++ incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java Sun May 11 08:22:03 2008 @@ -22,6 +22,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; /** * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts @@ -111,7 +114,12 @@ { if (_refCount++ == 0) { - _pool = Executors.newFixedThreadPool(_poolSize); +// _pool = Executors.newFixedThreadPool(_poolSize); + + // Use a job queue that biases to writes + _pool = new ThreadPoolExecutor(_poolSize, _poolSize, + 0L, TimeUnit.MILLISECONDS, + new ReadWriteJobQueue()); } return _pool; Modified: incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java?rev=655323&r1=655322&r2=655323&view=diff ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java (original) +++ incubator/qpid/branches/broker-queue-refactor/java/management/eclipse-plugin/src/test/java/org/apache/qpid/management/ui/ManagementConsoleTest.java Sun May 11 08:22:03 2008 @@ -63,7 +63,8 @@ { // If this test fails due to changes in the broker code, // then the constants in the Constants.java shoule be updated accordingly - AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueueForManagement"), false, null, false, _virtualHost); + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueueForManagement"), false, null, false, _virtualHost, + null); AMQManagedObject mbean = new AMQQueueMBean(queue); MBeanInfo mbeanInfo = mbean.getMBeanInfo(); Modified: incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=655323&r1=655322&r2=655323&view=diff ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original) +++ incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Sun May 11 08:22:03 2008 @@ -34,7 +34,6 @@ import org.apache.qpid.server.exchange.AbstractExchange; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.queue.AMQQueueImpl; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; @@ -207,12 +206,7 @@ } - @Override - public Map<AMQShortString, List<AMQQueue>> getBindings() { - // TODO Auto-generated method stub - return null; - } - + public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) { // TODO Auto-generated method stub Modified: incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=655323&r1=655322&r2=655323&view=diff ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java (original) +++ incubator/qpid/branches/broker-queue-refactor/java/plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java Sun May 11 08:22:03 2008 @@ -7,7 +7,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.queue.AMQQueueImpl; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml?rev=655323&r1=655322&r2=655323&view=diff ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml (original) +++ incubator/qpid/branches/broker-queue-refactor/java/systests/pom.xml Sun May 11 08:22:03 2008 @@ -151,7 +151,6 @@ <MessageReturnTest>-n MessageReturnTest org.apache.qpid.server.queue.MessageReturnTest </MessageReturnTest> <QueueDepthWithSelectorTest>-n QueueDepthWithSelectorTest org.apache.qpid.server.queue.QueueDepthWithSelectorTest </QueueDepthWithSelectorTest> <!--<SubscriptionManagerTest>-n SubscriptionManagerTest org.apache.qpid.server.queue.SubscriptionManagerTest </SubscriptionManagerTest>--> - <SubscriptionSetTest>-n SubscriptionSetTest org.apache.qpid.server.queue.SubscriptionSetTest </SubscriptionSetTest> <TimeToLiveTest>-n TimeToLiveTest org.apache.qpid.server.queue.TimeToLiveTest </TimeToLiveTest> <TxnBufferTest>-n TxnBufferTest org.apache.qpid.server.txn.TxnBufferTest </TxnBufferTest> <!--<TxnTest>-n TxnTest org.apache.qpid.server.txn.TxnTest </TxnTest>--> Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=655323&r1=655322&r2=655323&view=diff ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original) +++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Sun May 11 08:22:03 2008 @@ -26,13 +26,16 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.QueueEntryImpl; import org.apache.qpid.server.queue.MessageHandleFactory; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.AMQMessageHandle; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; @@ -100,12 +103,16 @@ private final List<Long> _unacked; private StoreContext _storeContext = new StoreContext(); - Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws AMQException + Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception { TransactionalContext txnContext = new NonTransactionalContext(new TestableMemoryMessageStore(), _storeContext, null, new LinkedList<RequiredDeliveryException>() ); + AMQQueue queue = + AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, new VirtualHost("", new MemoryMessageStore()), + null); + for (int i = 0; i < messageCount; i++) { long deliveryTag = i + 1; @@ -140,7 +147,7 @@ }; TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); - _map.add(deliveryTag, new QueueEntryImpl(null,message, Long.MIN_VALUE)); + _map.add(deliveryTag, queue.enqueue(new StoreContext(), message)); } _acked = acked; _unacked = unacked; Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=655323&r1=655322&r2=655323&view=diff ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original) +++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Sun May 11 08:22:03 2008 @@ -33,6 +33,7 @@ import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.log4j.Logger; @@ -236,7 +237,7 @@ return properties; } - static class TestQueue extends AMQQueueImpl + static class TestQueue extends SimpleAMQQueue { final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>(); @@ -250,12 +251,160 @@ * not invoked. It is unnecessary since for this test we only care to know whether the message was * sent to the queue; the queue processing logic is not being tested. * @param msg - * @param deliverFirst * @throws AMQException */ - public void process(StoreContext context, QueueEntry msg, boolean deliverFirst) throws AMQException + @Override + public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException { - messages.add( new HeadersExchangeTest.Message(msg.getMessage())); + messages.add( new HeadersExchangeTest.Message(msg)); + return new QueueEntry() + { + + public AMQQueue getQueue() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public AMQMessage getMessage() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public long getSize() + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean getDeliveredToConsumer() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean expired() throws AMQException + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isAcquired() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean acquire() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean acquire(Subscription sub) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean delete() + { + return false; + } + + public boolean isDeleted() + { + return false; + } + + public boolean acquiredBySubscription() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setDeliveredToSubscription() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void release() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public String debugIdentity() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean immediateAndNotDelivered() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void setRedelivered(boolean b) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public Subscription getDeliveredSubscription() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void reject() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void reject(Subscription subscription) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isRejectedBy(Subscription subscription) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void requeue(StoreContext storeContext) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void dequeue(final StoreContext storeContext) throws FailedDequeueException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void dispose(final StoreContext storeContext) throws MessageCleanupException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void restoreCredit() + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void discard(StoreContext storeContext) throws AMQException + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isQueueDeleted() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void addStateChangeListener(StateChangeListener listener) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean removeStateChangeListener(StateChangeListener listener) + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public int compareTo(final QueueEntry o) + { + return 0; //To change body of implemented methods use File | Settings | File Templates. + } + }; } boolean isInQueue(Message msg) Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=655323&r1=655322&r2=655323&view=diff ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original) +++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Sun May 11 08:22:03 2008 @@ -59,7 +59,7 @@ false, new AMQShortString("test"), true, - _protocolSession.getVirtualHost()); + _protocolSession.getVirtualHost(), null); AMQChannel channel = new AMQChannel(_protocolSession,2, _messageStore); channel.setDefaultQueue(queue); _protocolSession.addChannel(channel); Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=655323&r1=655322&r2=655323&view=diff ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original) +++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Sun May 11 08:22:03 2008 @@ -78,7 +78,8 @@ _protocolSession.addChannel(_channel); - _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test")); + _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("myQ"), false, new AMQShortString("guest"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), + null); } Added: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java?rev=655323&view=auto ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java (added) +++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java Sun May 11 08:22:03 2008 @@ -0,0 +1,171 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import junit.framework.Assert; +import org.apache.log4j.Logger; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +import javax.jms.*; +import javax.naming.NamingException; +import javax.naming.Context; +import javax.naming.spi.InitialContextFactory; +import java.util.Hashtable; + +/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */ +public class PriorityTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(PriorityTest.class); + + + protected final String BROKER = "vm://:1"; + protected final String VHOST = "/test"; + protected final String QUEUE = "PriorityQueue"; + + + private static final int MSG_COUNT = 50; + + protected void setUp() throws Exception + { + super.setUp(); + + if (usingInVMBroker()) + { + TransportConnection.createVMBroker(1); + } + + + } + + private boolean usingInVMBroker() + { + return BROKER.startsWith("vm://"); + } + + protected void tearDown() throws Exception + { + if (usingInVMBroker()) + { + TransportConnection.killAllVMBrokers(); + } + super.tearDown(); + } + + public void testPriority() throws JMSException, NamingException, AMQException + { + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://guest:[EMAIL PROTECTED]" + VHOST + "?brokerlist='" + BROKER + "'"); + env.put("queue.queue", QUEUE); + + Context context = factory.getInitialContext(env); + + Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final FieldTable arguments = new FieldTable(); + arguments.put(new AMQShortString("x-qpid-priorities"),10); + + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + + Queue queue = (Queue) context.lookup("queue"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + + + + + + + producerConnection.start(); + + + MessageProducer producer = producerSession.createProducer(queue); + + + + + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + producer.setPriority(msg % 10); + producer.send(nextMessage(msg, false, producerSession, producer)); + } + + producer.close(); + producerSession.close(); + producerConnection.close(); + + + Connection consumerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + + + + + consumerConnection.start(); + + Message received; + //Receive Message 0 + StringBuilder buf = new StringBuilder(); + int receivedCount = 0; + Message previous = null; + while((received = consumer.receive(1000))!=null) + { + if(previous != null) + { + assertTrue("Messages arrived in unexpected order", (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) ); + } + + previous = received; + receivedCount++; + } + + assertEquals("Incorrect number of message received", 50, receivedCount); + + producerSession.close(); + producer.close(); + + } + + private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msg); + send.setIntProperty("msg", msg); + + return send; + } + + +} Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=655323&r1=655322&r2=655323&view=diff ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original) +++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Sun May 11 08:22:03 2008 @@ -106,12 +106,12 @@ //To change body of implemented methods use File | Settings | File Templates. } - public Object getQueueContext() + public QueueEntry getLastSeenEntry() { return null; //To change body of implemented methods use File | Settings | File Templates. } - public boolean setQueueContext(Object expected, Object newValue) + public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue) { return false; //To change body of implemented methods use File | Settings | File Templates. } @@ -126,7 +126,7 @@ //no-op } - public AMQShortString getConumerTag() + public AMQShortString getConsumerTag() { return null; //To change body of implemented methods use File | Settings | File Templates. } @@ -141,6 +141,11 @@ return null; } + public QueueEntry.SubscriptionAcquiredState getOwningState() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public void queueDeleted(AMQQueue queue) { } Modified: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=655323&r1=655322&r2=655323&view=diff ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original) +++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Sun May 11 08:22:03 2008 @@ -25,7 +25,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; -import org.apache.qpid.server.queue.AMQQueueImpl; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; Added: incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java?rev=655323&view=auto ============================================================================== --- incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java (added) +++ incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java Sun May 11 08:22:03 2008 @@ -0,0 +1,209 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.test.client; + +import org.apache.qpid.test.VMTestCase; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.AMQException; +import org.apache.log4j.Logger; + +import javax.jms.*; +import javax.naming.NamingException; +import java.util.Enumeration; + +public class FlowControlTest extends VMTestCase +{ + private static final Logger _logger = Logger.getLogger(FlowControlTest.class); + + private Connection _clientConnection; + private Session _clientSession; + private Queue _queue; + + public void setUp() throws Exception + { + + super.setUp(); + + + } + + /** + * Simply + */ + public void testBasicBytesFlowControl() throws JMSException, NamingException, AMQException + { + _queue = new AMQQueue("amq.direct","testqueue");//(Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Ensure _queue is created + _clientSession.createConsumer(_queue).close(); + + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); + + BytesMessage m1 = producerSession.createBytesMessage(); + m1.writeBytes(new byte[128]); + m1.setIntProperty("msg",1); + producer.send(m1); + BytesMessage m2 = producerSession.createBytesMessage(); + m2.writeBytes(new byte[128]); + m2.setIntProperty("msg",2); + producer.send(m2); + BytesMessage m3 = producerSession.createBytesMessage(); + m3.writeBytes(new byte[256]); + m3.setIntProperty("msg",3); + producer.send(m3); + + producer.close(); + producerSession.close(); + producerConnection.close(); + + + Connection consumerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession)consumerSession).setPrefecthLimits(0,256); + MessageConsumer recv = consumerSession.createConsumer(_queue); + consumerConnection.start(); + + Message r1 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("First message not received", r1); + assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg")); + + Message r2 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("Second message not received", r2); + assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); + + Message r3 = recv.receiveNoWait(); + assertNull("Third message incorrectly delivered", r3); + + r1.acknowledge(); + + r3 = recv.receiveNoWait(); + assertNull("Third message incorrectly delivered", r3); + + r2.acknowledge(); + + + r3 = recv.receive(RECEIVE_TIMEOUT); + assertNotNull("Third message not received", r3); + assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); + + r3.acknowledge(); + recv.close(); + consumerSession.close(); + consumerConnection.close(); + + } + + public void testTwoConsumersBytesFlowControl() throws JMSException, NamingException, AMQException + { + _queue = new AMQQueue("amq.direct","testqueue1");//(Queue) _context.lookup("queue"); + + //Create Client + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientConnection.start(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Ensure _queue is created + _clientSession.createConsumer(_queue).close(); + + Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(_queue); + + BytesMessage m1 = producerSession.createBytesMessage(); + m1.writeBytes(new byte[128]); + m1.setIntProperty("msg",1); + producer.send(m1); + BytesMessage m2 = producerSession.createBytesMessage(); + m2.writeBytes(new byte[256]); + m2.setIntProperty("msg",2); + producer.send(m2); + BytesMessage m3 = producerSession.createBytesMessage(); + m3.writeBytes(new byte[128]); + m3.setIntProperty("msg",3); + producer.send(m3); + + producer.close(); + producerSession.close(); + producerConnection.close(); + + + Connection consumerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + Session consumerSession1 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession)consumerSession1).setPrefecthLimits(0,256); + MessageConsumer recv1 = consumerSession1.createConsumer(_queue); + + consumerConnection.start(); + + Message r1 = recv1.receive(RECEIVE_TIMEOUT); + assertNotNull("First message not received", r1); + assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg")); + + + Message r2 = recv1.receiveNoWait(); + assertNull("Second message incorrectly delivered", r2); + + Session consumerSession2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + ((AMQSession)consumerSession2).setPrefecthLimits(0,256); + MessageConsumer recv2 = consumerSession2.createConsumer(_queue); + + + r2 = recv2.receive(100000L);//RECEIVE_TIMEOUT); + assertNotNull("Second message not received", r2); + assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); + + Message r3 = recv2.receiveNoWait(); + assertNull("Third message incorrectly delivered", r3); + + r3 = recv1.receive(100000L);//RECEIVE_TIMEOUT); + assertNotNull("Third message not received", r3); + assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); + + + + r2.acknowledge(); + r3.acknowledge(); + recv1.close(); + recv2.close(); + consumerSession1.close(); + consumerSession2.close(); + consumerConnection.close(); + + } + +}
