Author: arnaudsimon
Date: Mon Sep 10 05:35:27 2007
New Revision: 574221

URL: http://svn.apache.org/viewvc?rev=574221&view=rev
Log: (empty)

Added:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
   (with props)
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
   (with props)
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
   (with props)

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=574221&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Mon Sep 10 05:35:27 2007
@@ -0,0 +1,445 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpidity.client.Session;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Destination;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This is a 0.10 Session
+ */
+public class AMQSession_0_10 extends AMQSession
+{
+
+    /**
+     * This class logger
+     */
+    private static final Logger _logger = 
LoggerFactory.getLogger(AMQSession_0_10.class);
+
+    /**
+     * The maximum number of pre-fetched messages per destination
+     */
+    private static final long MAX_PREFETCH = 100;
+
+    /**
+     * The underlying QpidSession
+     */
+    private Session _qpidSession;
+
+    /**
+     * The latest qpid Exception that has been reaised.
+     */
+    private QpidException _currentException;
+
+    /**
+     * All the not yet acknoledged message tags
+     */
+    private ConcurrentLinkedQueue<Long> _unacknowledgedMessageTags = new 
ConcurrentLinkedQueue<Long>();
+
+    //--- constructors
+
+    /**
+     * Creates a new session on a connection.
+     *
+     * @param con                     The connection on which to create the 
session.
+     * @param channelId               The unique identifier for the session.
+     * @param transacted              Indicates whether or not the session is 
transactional.
+     * @param acknowledgeMode         The acknoledgement mode for the session.
+     * @param messageFactoryRegistry  The message factory factory for the 
session.
+     * @param defaultPrefetchHighMark The maximum number of messages to 
prefetched before suspending the session.
+     * @param defaultPrefetchLowMark  The number of prefetched messages at 
which to resume the session.
+     */
+    AMQSession_0_10(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode,
+                    MessageFactoryRegistry messageFactoryRegistry, int 
defaultPrefetchHighMark,
+                    int defaultPrefetchLowMark)
+    {
+
+        super(con, channelId, transacted, acknowledgeMode, 
messageFactoryRegistry, defaultPrefetchHighMark,
+              defaultPrefetchLowMark);
+        // create the qpid session with an expiry  <= 0 so that the session 
does not expire
+        _qpidSession = null; // todo when the connection is finalized 
_connection.getQpidConnection().createSession(0);
+        // set the exception listnere for this session
+        _qpidSession.setExceptionListener(new QpidSessionExceptionListener());
+        // set transacted if required
+        if (_transacted)
+        {
+            _qpidSession.txSelect();
+        }
+    }
+
+    /**
+     * Creates a new session on a connection with the default 0-10 message 
factory.
+     *
+     * @param con                 The connection on which to create the 
session.
+     * @param channelId           The unique identifier for the session.
+     * @param transacted          Indicates whether or not the session is 
transactional.
+     * @param acknowledgeMode     The acknoledgement mode for the session.
+     * @param defaultPrefetchHigh The maximum number of messages to prefetched 
before suspending the session.
+     * @param defaultPrefetchLow  The number of prefetched messages at which 
to resume the session.
+     */
+    AMQSession_0_10(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode, int defaultPrefetchHigh,
+                    int defaultPrefetchLow)
+    {
+
+        this(con, channelId, transacted, acknowledgeMode, 
MessageFactoryRegistry.newDefault010Registry(),
+             defaultPrefetchHigh, defaultPrefetchLow);
+    }
+
+    //------- 0-10 specific methods
+
+    /**
+     * Add a message tag to be acknowledged
+     * This is used for client ack mode
+     *
+     * @param tag The id of the message to be acknowledged
+     */
+    void addMessageTag(long tag)
+    {
+        _unacknowledgedMessageTags.add(tag);
+    }
+
+    //------- overwritten methods of class AMQSession
+
+    /**
+     * Acknowledge one or many messages.
+     *
+     * @param deliveryTag The tag of the last message to be acknowledged.
+     * @param multiple    <tt>true</tt> to acknowledge all messages up to and 
including the one specified by the
+     *                    delivery tag, <tt>false</tt> to just acknowledge 
that message.
+     */
+    public void acknowledgeMessage(long deliveryTag, boolean multiple)
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on 
session " + _channelId);
+        }
+        // acknowledge this message
+        RangeSet ranges = new RangeSet();
+        if (multiple)
+        {
+            for (Long messageTag : _unacknowledgedMessageTags)
+            {
+                ranges.add(messageTag);
+            }
+            //empty the list of unack messages
+            _unacknowledgedMessageTags.clear();
+        }
+        else
+        {
+            ranges.add(deliveryTag);
+            _unacknowledgedMessageTags.remove(deliveryTag);
+        }
+        getQpidSession().messageAcknowledge(ranges);
+    }
+
+    /**
+     * Bind a queue with an exchange.
+     *
+     * @param queueName    Specifies the name of the queue to bind. If the 
queue name is empty,
+     *                     refers to the current
+     *                     queue for the session, which is the last declared 
queue.
+     * @param exchangeName The exchange name.
+     * @param routingKey   Specifies the routing key for the binding.
+     * @param arguments    0_8 specific
+     */
+    public void sendQueueBind(final AMQShortString queueName, final 
AMQShortString routingKey,
+                              final FieldTable arguments, final AMQShortString 
exchangeName)
+            throws AMQException, FailoverException
+    {
+        getQpidSession().queueBind(queueName.toString(), 
exchangeName.toString(), routingKey.toString(), null);
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+
+    /**
+     * Close this session.
+     *
+     * @param timeout no used / 0_8 specific
+     * @throws AMQException
+     * @throws FailoverException
+     */
+    public void sendClose(long timeout) throws AMQException, FailoverException
+    {
+        getQpidSession().sessionClose();
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * Commit the receipt and the delivery of all messages exchanged by this 
session resources.
+     */
+    public void sendCommit() throws AMQException, FailoverException
+    {
+        getQpidSession().txCommit();
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * Create a queue with a given name.
+     *
+     * @param name       The queue name
+     * @param autoDelete If this field is set and the exclusive field is also 
set,
+     *                   then the queue is deleted when the connection closes.
+     * @param durable    If set when creating a new queue,
+     *                   the queue will be marked as durable.
+     * @param exclusive  Exclusive queues can only be used from one connection 
at a time.
+     * @throws AMQException
+     * @throws FailoverException
+     */
+    public void sendCreateQueue(AMQShortString name, final boolean autoDelete, 
final boolean durable,
+                                final boolean exclusive) throws AMQException, 
FailoverException
+    {
+        getQpidSession().queueDeclare(name.toString(), null, null, durable ? 
Option.DURABLE : Option.NO_OPTION,
+                                      autoDelete ? Option.AUTO_DELETE : 
Option.NO_OPTION,
+                                      exclusive ? Option.EXCLUSIVE : 
Option.NO_OPTION);
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * This method asks the broker to redeliver all unacknowledged messages
+     *
+     * @throws AMQException
+     * @throws FailoverException
+     */
+    public void sendRecover() throws AMQException, FailoverException
+    {
+        // release all unack messages
+        RangeSet ranges = new RangeSet();
+        for (long messageTag : _unacknowledgedMessageTags)
+        {
+            // release this message           
+            ranges.add(messageTag);
+        }
+        getQpidSession().messageRelease(ranges);
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * Release (0_8 notion of Reject) an acquired message
+     *
+     * @param deliveryTag the message ID
+     * @param requeue     always true
+     */
+    public void rejectMessage(long deliveryTag, boolean requeue)
+    {
+        // The value of requeue is always true
+        RangeSet ranges = new RangeSet();
+        ranges.add(deliveryTag);
+        getQpidSession().messageRelease(ranges);
+        //I don't think we need to sync
+    }
+
+    /**
+     * Create an 0_10 message consumer
+     */
+    public BasicMessageConsumer createMessageConsumer(final AMQDestination 
destination, final int prefetchHigh,
+                                                      final int prefetchLow, 
final boolean noLocal,
+                                                      final boolean exclusive, 
String messageSelector,
+                                                      final FieldTable ft, 
final boolean noConsume,
+                                                      final boolean autoClose)
+    {
+
+        final AMQProtocolHandler protocolHandler = getProtocolHandler();
+        return new BasicMessageConsumer_0_10(_channelId, _connection, 
destination, messageSelector, noLocal,
+                                             _messageFactoryRegistry, this, 
protocolHandler, ft, prefetchHigh,
+                                             prefetchLow, exclusive, 
_acknowledgeMode, noConsume, autoClose);
+    }
+
+    /**
+     * Bind a queue with an exchange.
+     */
+    public boolean isQueueBound(final AMQShortString exchangeName, final 
AMQShortString queueName,
+                                final AMQShortString routingKey) throws 
JMSException
+    {
+        getQpidSession().queueBind(queueName.toString(), 
exchangeName.toString(), routingKey.toString(), null);
+        // we asume that a binding is always successful
+        return true;
+    }
+
+    /**
+     * This method is invoked when a consumer is creted
+     * Registers the consumer with the broker
+     */
+    public void sendConsume(BasicMessageConsumer consumer, AMQShortString 
queueName, AMQProtocolHandler protocolHandler,
+                            boolean nowait, String messageSelector, 
AMQShortString tag)
+            throws AMQException, FailoverException
+    {
+        getQpidSession().messageSubscribe(queueName.toString(), 
tag.toString(), Session.TRANSFER_CONFIRM_MODE_REQUIRED,
+                                          
Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+                                          new 
MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer),
+                                          null, consumer.isNoLocal() ? 
Option.NO_LOCAL : Option.NO_OPTION,
+                                          consumer.isExclusive() ? 
Option.EXCLUSIVE : Option.NO_OPTION);
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * Create an 0_10 message producer
+     */
+    public BasicMessageProducer createMessageProducer(final Destination 
destination, final boolean mandatory,
+                                                      final boolean immediate, 
final boolean waitUntilSent,
+                                                      long producerId)
+    {
+        return new BasicMessageProducer_0_10(_connection, (AMQDestination) 
destination, _transacted, _channelId, this,
+                                             getProtocolHandler(), producerId, 
immediate, mandatory, waitUntilSent);
+
+    }
+
+    /**
+     * creates an exchange if it does not already exist
+     */
+    public void sendExchangeDeclare(final AMQShortString name, final 
AMQShortString type,
+                                    final AMQProtocolHandler protocolHandler, 
final boolean nowait)
+            throws AMQException, FailoverException
+    {
+        getQpidSession().exchangeDeclare(name.toString(), type.toString(), 
null, null);
+        // autoDelete --> false
+        // durable --> false
+        // passive -- false
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * Declare a queue with the given queueName
+     */
+    public void sendQueueDeclare(final AMQDestination amqd, final 
AMQProtocolHandler protocolHandler)
+            throws AMQException, FailoverException
+    {
+        getQpidSession().queueDeclare(amqd.getAMQQueueName().toString(), null, 
null,
+                                      amqd.isAutoDelete() ? Option.AUTO_DELETE 
: Option.NO_OPTION,
+                                      amqd.isDurable() ? Option.DURABLE : 
Option.NO_OPTION,
+                                      amqd.isExclusive() ? Option.EXCLUSIVE : 
Option.NO_OPTION);
+        // passive --> false
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * deletes a queue
+     */
+    public void sendQueueDelete(final AMQShortString queueName) throws 
AMQException, FailoverException
+    {
+        getQpidSession().queueDelete(queueName.toString());
+        // ifEmpty --> false
+        // ifUnused --> false
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+    /**
+     * Activate/deactivate the message flow for all the consumers of this 
session.
+     */
+    public void sendSuspendChannel(boolean suspend) throws AMQException, 
FailoverException
+    {
+        if (suspend)
+        {
+            for (BasicMessageConsumer consumer : _consumers.values())
+            {
+                
getQpidSession().messageStop(consumer.getConsumerTag().toString());
+            }
+        }
+        else
+        {
+            for (BasicMessageConsumer consumer : _consumers.values())
+            {
+                
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), 
Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                                             MAX_PREFETCH);
+            }
+        }
+        // We need to sync so that we get notify of an error.
+        getQpidSession().sync();
+        getCurrentException();
+    }
+
+
+    //------ Private methods
+    /**
+     * Access to the underlying Qpid Session
+     *
+     * @return The associated Qpid Session.
+     */
+    protected org.apache.qpidity.client.Session getQpidSession()
+    {
+        return _qpidSession;
+    }
+
+
+    /**
+     * Get the latest thrown exception.
+     *
+     * @throws org.apache.qpid.AMQException get the latest thrown error.
+     */
+    public synchronized void getCurrentException() throws AMQException
+    {
+        if (_currentException != null)
+        {
+            QpidException toBeTrhown = _currentException;
+            _currentException = null;
+            throw new 
AMQException(AMQConstant.getConstant(toBeTrhown.getErrorCode().getCode()),
+                                   toBeTrhown.getMessage(), toBeTrhown);
+        }
+    }
+
+    //------ Inner classes
+    /**
+     * Lstener for qpid protocol exceptions
+     */
+    private class QpidSessionExceptionListener implements 
org.apache.qpidity.client.ExceptionListener
+    {
+        public void onException(QpidException exception)
+        {
+            synchronized (this)
+            {
+                //todo check the error code for finding out if we need to 
notify the
+                // JMS connection exception listener
+                _currentException = exception;
+            }
+        }
+    }
+
+}

Propchange: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=574221&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Mon Sep 10 05:35:27 2007
@@ -0,0 +1,112 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.Struct;
+
+import javax.jms.JMSException;
+import java.io.IOException;
+
+/**
+ * This is a 0.10 message consumer.
+ */
+public class BasicMessageConsumer_0_10 extends BasicMessageConsumer
+        implements org.apache.qpidity.client.util.MessageListener
+{
+    /**
+     * This class logger
+     */
+    protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
+    protected BasicMessageConsumer_0_10(int channelId, AMQConnection 
connection, AMQDestination destination,
+                                        String messageSelector, boolean 
noLocal, MessageFactoryRegistry messageFactory,
+                                        AMQSession session, AMQProtocolHandler 
protocolHandler,
+                                        FieldTable rawSelectorFieldTable, int 
prefetchHigh, int prefetchLow,
+                                        boolean exclusive, int 
acknowledgeMode, boolean noConsume, boolean autoClose)
+    {
+        super(channelId, connection, destination, messageSelector, noLocal, 
messageFactory, session, protocolHandler,
+              rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, 
acknowledgeMode, noConsume, autoClose);
+
+    }
+
+    // ----- Interface org.apache.qpidity.client.util.MessageListener
+    public void onMessage(Message message)
+    {
+        int channelId = getSession().getChannelId();
+        long deliveryId = message.getMessageTransferId();
+        String consumerTag = getConsumerTag().toString();
+        AMQShortString exchange = new 
AMQShortString(message.getDeliveryProperties().getExchange());
+        AMQShortString routingKey = new 
AMQShortString(message.getDeliveryProperties().getRoutingKey());
+        boolean redelivered = message.getDeliveryProperties().getRedelivered();
+        UnprocessedMessage_0_10 newMessage =
+                new UnprocessedMessage_0_10(channelId, deliveryId, 
consumerTag, exchange, routingKey, redelivered);
+        try
+        {
+            newMessage.receiveBody(message.readData());
+        }
+        catch (IOException e)
+        {
+            getSession().getAMQConnection().exceptionReceived(e);
+        }
+        Struct[] headers = {message.getMessageProperties(), 
message.getDeliveryProperties()};
+        newMessage.setContentHeader(headers);
+        getSession().messageReceived(newMessage);
+    }
+
+    //----- overwritten methods
+
+    /**
+     * This method is invoked when this consumer is stopped.
+     * It tells the broker to stop delivering messages to this consumer.
+     */
+    public void sendCancel() throws JMSAMQException
+    {
+        ((AMQSession_0_10) 
getSession()).getQpidSession().messageStop(getConsumerTag().toString());
+        ((AMQSession_0_10) getSession()).getQpidSession().sync();
+        try
+        {
+            ((AMQSession_0_10) getSession()).getCurrentException();
+        }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException("Problem when stopping consumer", e);
+        }
+    }
+
+    /**
+     * This is invoked just before a message is delivered to the jms consumer
+     */
+    void postDeliver(AbstractJMSMessage msg) throws JMSException
+    {
+        // notify the session
+        ((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag());
+        super.postDeliver(msg);
+    }
+  
+}
\ No newline at end of file

Propchange: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=574221&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
 Mon Sep 10 05:35:27 2007
@@ -0,0 +1,150 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpidity.jms.message.MessageImpl;
+import org.apache.qpidity.jms.message.MessageHelper;
+import org.apache.qpidity.jms.ExceptionHelper;
+import org.apache.qpidity.QpidException;
+
+import javax.jms.Message;
+import javax.jms.JMSException;
+import java.util.UUID;
+import java.io.IOException;
+
+/**
+ *
+ *  This is a 0_10 message producer. 
+ */
+public class BasicMessageProducer_0_10 extends BasicMessageProducer
+{
+
+    BasicMessageProducer_0_10(AMQConnection connection, AMQDestination 
destination, boolean transacted, int channelId,
+                              AMQSession session, AMQProtocolHandler 
protocolHandler, long producerId,
+                              boolean immediate, boolean mandatory, boolean 
waitUntilSent)
+    {
+        super(connection, destination, transacted, channelId, session, 
protocolHandler, producerId, immediate,
+              mandatory, waitUntilSent);
+    }
+
+    public void declareDestination(AMQDestination destination)
+    {
+        // Declare the exchange
+        // Note that the durable and internal arguments are ignored since 
passive is set to false
+        AMQFrame declare = ExchangeDeclareBody.createAMQFrame(_channelId, 
_protocolHandler.getProtocolMajorVersion(),
+                                                              
_protocolHandler.getProtocolMinorVersion(), null,
+                                                              // arguments
+                                                              false, // 
autoDelete
+                                                              false, // durable
+                                                              
destination.getExchangeName(), // exchange
+                                                              false, // 
internal
+                                                              true, // nowait
+                                                              false, // passive
+                                                              
_session.getTicket(), // ticket
+                                                              
destination.getExchangeClass()); // type
+        _protocolHandler.writeFrame(declare);
+    }
+
+    //--- Overwritten methods
+
+
+    /**
+     * Sends a message to a given destination
+     * We will always convert the received message
+     */
+    public void sendMessage(AMQDestination destination, Message origMessage, 
AbstractJMSMessage message,
+                            int deliveryMode, int priority, long timeToLive, 
boolean mandatory, boolean immediate,
+                            boolean wait) throws JMSException
+    {
+        // Only get current time if required
+        long currentTime = Long.MIN_VALUE;
+        if (!((timeToLive == 0) && _disableTimestamps))
+        {
+            currentTime = System.currentTimeMillis();
+        }
+        // the messae UID
+        String uid = (getDisableMessageID()) ? "MSG_ID_DISABLED" : 
UUID.randomUUID().toString();
+        MessageImpl qpidMessage;
+        // check that the message is not a foreign one
+        try
+        {
+            qpidMessage = (MessageImpl) origMessage;
+        }
+        catch (ClassCastException cce)
+        {
+            // this is a foreign message
+            qpidMessage = MessageHelper.transformMessage(origMessage);
+            // set message's properties in case they are queried after send.
+            origMessage.setJMSDestination(destination);
+            origMessage.setJMSDeliveryMode(deliveryMode);
+            origMessage.setJMSPriority(priority);
+            origMessage.setJMSMessageID(uid);
+            if (timeToLive != 0)
+            {
+                origMessage.setJMSExpiration(timeToLive + currentTime);
+                _logger.debug("Setting JMSExpiration:" + 
message.getJMSExpiration());
+            }
+            else
+            {
+                origMessage.setJMSExpiration(timeToLive);
+            }
+            origMessage.setJMSTimestamp(currentTime);
+        }
+        // set the message properties
+        qpidMessage.setJMSDestination(destination);
+        qpidMessage.setJMSMessageID(uid);
+        qpidMessage.setJMSDeliveryMode(deliveryMode);
+        qpidMessage.setJMSPriority(priority);
+        if (timeToLive != 0)
+        {
+            qpidMessage.setJMSExpiration(timeToLive + currentTime);
+        }
+        else
+        {
+            qpidMessage.setJMSExpiration(timeToLive);
+        }
+        qpidMessage.setJMSTimestamp(currentTime);
+        qpidMessage.setRoutingKey(destination.getDestinationName().toString());
+        qpidMessage.setExchangeName(destination.getExchangeName().toString());
+        // call beforeMessageDispatch
+        try
+        {
+            qpidMessage.beforeMessageDispatch();
+        }
+        catch (QpidException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }
+        try
+        {
+            ((AMQSession_0_10) 
getSession()).getQpidSession().messageTransfer(qpidMessage.getExchangeName(),
+                                                                              
qpidMessage.getQpidityMessage(),
+                                                                              
org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+                                                                              
org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+        }
+        catch (IOException e)
+        {
+            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+        }       
+    }
+}
+

Propchange: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to