Author: rgodfrey
Date: Wed Apr 16 13:30:38 2008
New Revision: 648834

URL: http://svn.apache.org/viewvc?rev=648834&view=rev
Log:
QPID-156 : Add an Apache licensed store - created an experimental Derby based 
store

Added:
    
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
Modified:
    incubator/qpid/branches/M2.x/java/broker/pom.xml
    incubator/qpid/branches/M2.x/java/pom.xml

Modified: incubator/qpid/branches/M2.x/java/broker/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/pom.xml?rev=648834&r1=648833&r2=648834&view=diff
==============================================================================
--- incubator/qpid/branches/M2.x/java/broker/pom.xml (original)
+++ incubator/qpid/branches/M2.x/java/broker/pom.xml Wed Apr 16 13:30:38 2008
@@ -128,6 +128,11 @@
             <artifactId>org.apache.felix.framework</artifactId>
             <version>1.0.0</version>
         </dependency>
+       <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+            <version>10.3.2.1</version>
+        </dependency>        
     </dependencies>
 
     <build>

Added: 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=648834&view=auto
==============================================================================
--- 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
 (added)
+++ 
incubator/qpid/branches/M2.x/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
 Wed Apr 16 13:30:38 2008
@@ -0,0 +1,1445 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.QueueRegistry;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+import java.io.File;
+import java.io.ByteArrayInputStream;
+import java.sql.DriverManager;
+import java.sql.Driver;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Blob;
+import java.sql.Types;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.TreeMap;
+
+
+public class DerbyMessageStore implements MessageStore
+{
+
+    private static final Logger _logger = 
Logger.getLogger(DerbyMessageStore.class);
+
+    private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+
+
+    private static final String SQL_DRIVER_NAME = 
"org.apache.derby.jdbc.EmbeddedDriver";
+
+    private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
+
+    private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE";
+    private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
+    private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
+    private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
+    private static final String MESSAGE_META_DATA_TABLE_NAME = 
"QPID_MESSAGE_META_DATA";
+    private static final String MESSAGE_CONTENT_TABLE_NAME = 
"QPID_MESSAGE_CONTENT";
+
+    private static final int DB_VERSION = 1;
+
+
+
+    private VirtualHost _virtualHost;
+    private static Class<Driver> DRIVER_CLASS;
+
+    private final AtomicLong _messageId = new AtomicLong(1);
+    private AtomicBoolean _closed = new AtomicBoolean(false);
+
+    private String _connectionURL;
+
+
+
+    private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE 
"+DB_VERSION_TABLE_NAME+" ( version int not null )";
+    private static final String INSERT_INTO_DB_VERSION = "INSERT INTO 
"+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
+    private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE 
"+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not 
null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
+    private static final String CREATE_QUEUE_TABLE = "CREATE TABLE 
"+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY 
KEY ( name ) )";
+    private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE 
"+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name 
varchar(255) not null, binding_key varchar(255) not null, arguments blob , 
PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
+    private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE 
"+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id 
bigint not null, PRIMARY KEY (queue_name, message_id) )";
+    private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE 
"+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name 
varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not 
null, flag_immediate smallint not null, content_header blob, chunk_count int 
not null, PRIMARY KEY ( message_id ) )";
+    private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE 
"+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not 
null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
+    private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " 
+ QUEUE_TABLE_NAME;
+    private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, 
autodelete FROM " + EXCHANGE_TABLE_NAME;
+    private static final String SELECT_FROM_BINDINGS =
+            "SELECT queue_name, binding_key, arguments FROM " + 
BINDINGS_TABLE_NAME + " WHERE exchange_name = ?";
+    private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " 
+ MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
+    private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + 
MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
+    private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + 
EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
+    private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + 
EXCHANGE_TABLE_NAME + " WHERE name = ?";
+    private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + 
BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) 
values ( ?, ?, ?, ? )";
+    private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + 
BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND 
binding_key = ?";
+    private static final String INSERT_INTO_QUEUE = "INSERT INTO " + 
QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)";
+    private static final String DELETE_FROM_QUEUE = "DELETE FROM " + 
QUEUE_TABLE_NAME + " WHERE name = ?";
+    private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + 
QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
+    private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + 
QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
+    private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + 
MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values 
(?, ?, ?)";
+    private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " 
+ MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , 
flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, 
?, ?, ?, ?, ?)";
+    private static final String SELECT_FROM_MESSAGE_META_DATA =
+            "SELECT exchange_name , routing_key , flag_mandatory , 
flag_immediate , content_header , chunk_count FROM " + 
MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
+    private static final String SELECT_FROM_MESSAGE_CONTENT =
+            "SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " 
WHERE message_id = ? and chunk_id = ?";
+    private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, 
message_id FROM " + QUEUE_ENTRY_TABLE_NAME;
+    private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM 
SYS.SYSTABLES WHERE TABLENAME = ?";
+
+
+    private enum State
+    {
+        INITIAL,
+        CONFIGURING,
+        RECOVERING,
+        STARTED,
+        CLOSING,
+        CLOSED
+    }
+
+    private State _state = State.INITIAL;
+
+
+    public void configure(VirtualHost virtualHost, String base, Configuration 
config) throws Exception
+    {
+        stateTransition(State.INITIAL, State.CONFIGURING);
+
+        initialiseDriver();
+
+        _virtualHost = virtualHost;
+
+        _logger.info("Configuring Derby message store for virtual host " + 
virtualHost.getName());
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+        final String databasePath = config.getString(base + "." + 
ENVIRONMENT_PATH_PROPERTY, "derbyDB");
+
+        File environmentPath = new File(databasePath);
+        if (!environmentPath.exists())
+        {
+            if (!environmentPath.mkdirs())
+            {
+                throw new IllegalArgumentException("Environment path " + 
environmentPath + " could not be read or created. "
+                    + "Ensure the path is correct and that the permissions are 
correct.");
+            }
+        }
+
+        createOrOpenDatabase(databasePath);
+
+        // this recovers durable queues and persistent messages
+
+        recover();
+
+        stateTransition(State.RECOVERING, State.STARTED);
+
+    }
+
+    private static synchronized void initialiseDriver() throws 
ClassNotFoundException
+    {
+        if(DRIVER_CLASS == null)
+        {
+            DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
+        }
+    }
+
+    private void createOrOpenDatabase(final String environmentPath) throws 
SQLException
+    {
+        _connectionURL = "jdbc:derby:" + environmentPath + "/" + 
_virtualHost.getName() + ";create=true";
+
+        Connection conn = newConnection();
+
+        createVersionTable(conn);
+        createExchangeTable(conn);
+        createQueueTable(conn);
+        createBindingsTable(conn);
+        createQueueEntryTable(conn);
+        createMessageMetaDataTable(conn);
+        createMessageContentTable(conn);
+
+        conn.close();
+    }
+
+
+
+    private void createVersionTable(final Connection conn) throws SQLException
+    {
+        if(!tableExists(DB_VERSION_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+
+            stmt.execute(CREATE_DB_VERSION_TABLE);
+            stmt.close();
+
+            PreparedStatement pstmt = 
conn.prepareStatement(INSERT_INTO_DB_VERSION);
+            pstmt.setInt(1, DB_VERSION);
+            pstmt.execute();
+            pstmt.close();
+        }
+
+    }
+
+
+    private void createExchangeTable(final Connection conn) throws SQLException
+    {
+        if(!tableExists(EXCHANGE_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+
+            stmt.execute(CREATE_EXCHANGE_TABLE);
+            stmt.close();
+        }
+    }
+
+    private void createQueueTable(final Connection conn) throws SQLException
+    {
+        if(!tableExists(QUEUE_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            stmt.execute(CREATE_QUEUE_TABLE);
+            stmt.close();
+        }
+    }
+
+    private void createBindingsTable(final Connection conn) throws SQLException
+    {
+        if(!tableExists(BINDINGS_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            stmt.execute(CREATE_BINDINGS_TABLE);
+
+            stmt.close();
+        }
+
+    }
+
+    private void createQueueEntryTable(final Connection conn) throws 
SQLException
+    {
+        if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
+
+            stmt.close();
+        }
+
+    }
+
+    private void createMessageMetaDataTable(final Connection conn) throws 
SQLException
+    {
+        if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            stmt.execute(CREATE_MESSAGE_META_DATA_TABLE);
+
+            stmt.close();
+        }
+
+    }
+
+
+    private void createMessageContentTable(final Connection conn) throws 
SQLException
+    {
+        if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
+
+            stmt.close();
+        }
+
+    }
+
+
+
+    private boolean tableExists(final String tableName, final Connection conn) 
throws SQLException
+    {
+        PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
+        stmt.setString(1, tableName);
+        ResultSet rs = stmt.executeQuery();
+        boolean exists = rs.next();
+        rs.close();
+        stmt.close();
+        return exists;
+    }
+
+    public void recover() throws AMQException
+    {
+        stateTransition(State.CONFIGURING, State.RECOVERING);
+
+        _logger.info("Recovering persistent state...");
+        StoreContext context = new StoreContext();
+
+        try
+        {
+            Map<AMQShortString, AMQQueue> queues = loadQueues();
+
+            recoverExchanges();
+
+            try
+            {
+
+                beginTran(context);
+
+                deliverMessages(context, queues);
+                _logger.info("Persistent state recovered successfully");
+                commitTran(context);
+
+            }
+            finally
+            {
+                if(inTran(context))
+                {
+                    abortTran(context);
+                }
+            }
+        }
+        catch (SQLException e)
+        {
+
+            throw new AMQException("Error recovering persistent state: " + e, 
e);
+        }
+
+    }
+
+    private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, 
AMQException
+    {
+        Connection conn = newConnection();
+
+
+        Statement stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
+        Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, 
AMQQueue>();
+        while(rs.next())
+        {
+            String queueName = rs.getString(1);
+            String owner = rs.getString(2);
+            AMQShortString queueNameShortString = new 
AMQShortString(queueName);
+            AMQQueue q =  new AMQQueue(queueNameShortString, true, owner == 
null ? null : new AMQShortString(owner), false, _virtualHost);
+            _virtualHost.getQueueRegistry().registerQueue(q);
+            queueMap.put(queueNameShortString,q);
+
+        }
+        return queueMap;
+    }
+
+    private void recoverExchanges() throws AMQException, SQLException
+    {
+        for (Exchange exchange : loadExchanges())
+        {
+            recoverExchange(exchange);
+        }
+    }
+
+
+    private List<Exchange> loadExchanges() throws AMQException, SQLException
+    {
+
+        List<Exchange> exchanges = new ArrayList<Exchange>();
+        Connection conn = null;
+        try
+        {
+            conn = newConnection();
+
+
+            Statement stmt = conn.createStatement();
+            ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
+
+            Exchange exchange;
+            while(rs.next())
+            {
+                String exchangeName = rs.getString(1);
+                String type = rs.getString(2);
+                boolean autoDelete = rs.getShort(3) != 0;
+
+                exchange = 
_virtualHost.getExchangeFactory().createExchange(new 
AMQShortString(exchangeName), new AMQShortString(type), true, autoDelete, 0);
+                _virtualHost.getExchangeRegistry().registerExchange(exchange);
+                exchanges.add(exchange);
+
+            }
+            return exchanges;
+
+        }
+        finally
+        {
+            if(conn != null)
+            {
+                conn.close();
+            }
+        }
+
+    }
+
+    private void recoverExchange(Exchange exchange) throws AMQException, 
SQLException
+    {
+        _logger.info("Recovering durable exchange " + exchange.getName() + " 
of type " + exchange.getType() + "...");
+
+        QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
+
+        Connection conn = null;
+        try
+        {
+            conn = newConnection();
+
+            PreparedStatement stmt = 
conn.prepareStatement(SELECT_FROM_BINDINGS);
+            stmt.setString(1, exchange.getName().toString());
+
+            ResultSet rs = stmt.executeQuery();
+
+
+            while(rs.next())
+            {
+                String queueName = rs.getString(1);
+                String bindingKey = rs.getString(2);
+                Blob arguments = rs.getBlob(3);
+
+
+                AMQQueue queue = queueRegistry.getQueue(new 
AMQShortString(queueName));
+                if (queue == null)
+                {
+                    _logger.error("Unkown queue: " + queueName + " cannot be 
bound to exchange: "
+                        + exchange.getName());
+                }
+                else
+                {
+                    _logger.info("Restoring binding: (Exchange: " + 
exchange.getName() + ", Queue: " + queueName
+                        + ", Routing Key: " + bindingKey + ", Arguments: " + 
arguments
+                        + ")");
+
+                    FieldTable argumentsFT = null;
+                    if(arguments != null)
+                    {
+                        byte[] argumentBytes = arguments.getBytes(0, (int) 
arguments.length());
+                        ByteBuffer buf = ByteBuffer.wrap(argumentBytes);
+                        argumentsFT = new FieldTable(buf,arguments.length());
+                    }
+
+                    queue.bind(bindingKey == null ? null : new 
AMQShortString(bindingKey), argumentsFT, exchange);
+                }
+            }
+        }
+        finally
+        {
+            if(conn != null)
+            {
+                conn.close();
+            }
+        }
+    }
+
+    public void close() throws Exception
+    {
+        _closed.getAndSet(true);
+    }
+
+    public void removeMessage(StoreContext storeContext, Long messageId) 
throws AMQException
+    {
+
+        boolean localTx = getOrCreateTransaction(storeContext);
+
+        Connection conn = getConnection(storeContext);
+        ConnectionWrapper wrapper = (ConnectionWrapper) 
storeContext.getPayload();
+
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Message Id: " + messageId + " Removing");
+        }
+
+        // first we need to look up the header to get the chunk count
+        MessageMetaData mmd = getMessageMetaData(storeContext, messageId);
+        try
+        {
+            PreparedStatement stmt = 
conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
+            stmt.setLong(1,messageId);
+            wrapper.setRequiresCommit();
+            int results = stmt.executeUpdate();
+
+            if (results == 0)
+            {
+                if (localTx)
+                {
+                    abortTran(storeContext);
+                }
+
+                throw new AMQException("Message metadata not found for message 
id " + messageId);
+            }
+            stmt.close();
+
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Deleted metadata for message " + messageId);
+            }
+
+            stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
+            stmt.setLong(1,messageId);
+            results = stmt.executeUpdate();
+
+            if(results != mmd.getContentChunkCount())
+            {
+                if (localTx)
+                {
+                    abortTran(storeContext);
+                }
+                throw new AMQException("Unexpected number of content chunks 
when deleting message.  Expected " + mmd.getContentChunkCount() + " but found " 
+ results);
+
+            }
+
+            if (localTx)
+            {
+                commitTran(storeContext);
+            }
+        }
+        catch (SQLException e)
+        {
+            if ((conn != null) && localTx)
+            {
+                abortTran(storeContext);
+            }
+
+            throw new AMQException("Error writing AMQMessage with id " + 
messageId + " to database: " + e, e);
+        }
+
+    }
+
+    public void createExchange(Exchange exchange) throws AMQException
+    {
+        if (_state != State.RECOVERING)
+        {
+            try
+            {
+                Connection conn = null;
+
+                try
+                {
+                    conn = newConnection();
+
+                    PreparedStatement stmt = 
conn.prepareStatement(INSERT_INTO_EXCHANGE);
+                    stmt.setString(1, exchange.getName().toString());
+                    stmt.setString(2, exchange.getType().toString());
+                    stmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : 
(short) 0);
+                    stmt.execute();
+                    stmt.close();
+                    conn.commit();
+
+                }
+                finally
+                {
+                    if(conn != null)
+                    {
+                        conn.close();
+                    }
+                }
+            }
+            catch (SQLException e)
+            {
+                throw new AMQException("Error writing Exchange with name " + 
exchange.getName() + " to database: " + e, e);
+            }
+        }
+
+    }
+
+    public void removeExchange(Exchange exchange) throws AMQException
+    {
+        Connection conn = null;
+
+        try
+        {
+            conn = newConnection();
+            PreparedStatement stmt = 
conn.prepareStatement(DELETE_FROM_EXCHANGE);
+            stmt.setString(1, exchange.getName().toString());
+            int results = stmt.executeUpdate();
+            if(results == 0)
+            {
+                throw new AMQException("Exchange " + exchange.getName() + " 
not found");
+            }
+            else
+            {
+                conn.commit();
+                stmt.close();
+            }
+        }
+        catch (SQLException e)
+        {
+            throw new AMQException("Error writing deleting with name " + 
exchange.getName() + " from database: " + e, e);
+        }
+        finally
+        {
+            if(conn != null)
+            {
+               try
+               {
+                   conn.close();
+               }
+               catch (SQLException e)
+               {
+                   _logger.error(e);
+               }
+            }
+
+        }
+    }
+
+    public void bindQueue(Exchange exchange, AMQShortString routingKey, 
AMQQueue queue, FieldTable args)
+            throws AMQException
+    {
+        if (_state != State.RECOVERING)
+        {
+            Connection conn = null;
+
+
+            try
+            {
+                conn = newConnection();
+                PreparedStatement stmt = 
conn.prepareStatement(INSERT_INTO_BINDINGS);
+                stmt.setString(1, exchange.getName().toString() );
+                stmt.setString(2, queue.getName().toString());
+                stmt.setString(3, routingKey == null ? null : 
routingKey.toString());
+                if(args != null)
+                {
+                    /* This would be the Java 6 way of setting a Blob
+                    Blob blobArgs = conn.createBlob();
+                    blobArgs.setBytes(0, args.getDataAsBytes());
+                    stmt.setBlob(4, blobArgs);
+                    */
+                    ByteArrayInputStream bis = new 
ByteArrayInputStream(args.getDataAsBytes());
+                    stmt.setBinaryStream(4, bis);
+                }
+                else
+                {
+                    stmt.setNull(4, Types.BLOB);
+                }
+
+                stmt.executeUpdate();
+                conn.commit();
+                stmt.close();
+            }
+            catch (SQLException e)
+            {
+                throw new AMQException("Error writing binding for AMQQueue 
with name " + queue.getName() + " to exchange "
+                    + exchange.getName() + " to database: " + e, e);
+            }
+            finally
+            {
+                if(conn != null)
+                {
+                   try
+                   {
+                       conn.close();
+                   }
+                   catch (SQLException e)
+                   {
+                       _logger.error(e);
+                   }
+                }
+
+            }
+
+        }
+
+
+    }
+
+    public void unbindQueue(Exchange exchange, AMQShortString routingKey, 
AMQQueue queue, FieldTable args)
+            throws AMQException
+    {
+        Connection conn = null;
+
+
+        try
+        {
+            conn = newConnection();
+            // exchange_name varchar(255) not null, queue_name varchar(255) 
not null, binding_key varchar(255), arguments blob
+            PreparedStatement stmt = 
conn.prepareStatement(DELETE_FROM_BINDINGS);
+            stmt.setString(1, exchange.getName().toString() );
+            stmt.setString(2, queue.getName().toString());
+            stmt.setString(3, routingKey == null ? null : 
routingKey.toString());
+
+
+            if(stmt.executeUpdate() != 1)
+            {
+                 throw new AMQException("Queue binding for queue with name " + 
queue.getName() + " to exchange "
+                + exchange.getName() + "  not found");
+            }
+            conn.commit();
+            stmt.close();
+        }
+        catch (SQLException e)
+        {
+            throw new AMQException("Error removing binding for AMQQueue with 
name " + queue.getName() + " to exchange "
+                + exchange.getName() + " in database: " + e, e);
+        }
+        finally
+        {
+            if(conn != null)
+            {
+               try
+               {
+                   conn.close();
+               }
+               catch (SQLException e)
+               {
+                   _logger.error(e);
+               }
+            }
+
+        }
+
+
+    }
+
+    public void createQueue(AMQQueue queue) throws AMQException
+    {
+        _logger.debug("public void createQueue(AMQQueue queue = " + queue + 
"): called");
+
+        if (_state != State.RECOVERING)
+        {
+            try
+            {
+                Connection conn = newConnection();
+
+                PreparedStatement stmt =
+                        conn.prepareStatement(INSERT_INTO_QUEUE);
+
+                stmt.setString(1, queue.getName().toString());
+                stmt.setString(2, queue.getOwner() == null ? null : 
queue.getOwner().toString());
+
+                stmt.execute();
+
+                stmt.close();
+
+                conn.commit();
+
+                conn.close();
+            }
+            catch (SQLException e)
+            {
+                throw new AMQException("Error writing AMQQueue with name " + 
queue.getName() + " to database: " + e, e);
+            }
+        }
+    }
+
+    private Connection newConnection() throws SQLException
+    {
+        final Connection connection = 
DriverManager.getConnection(_connectionURL);
+        return connection;
+    }
+
+    public void removeQueue(AMQShortString name) throws AMQException
+    {
+
+        _logger.debug("public void removeQueue(AMQShortString name = " + name 
+ "): called");
+        Connection conn = null;
+
+
+        try
+        {
+            conn = newConnection();
+            PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
+            stmt.setString(1, name.toString());
+            int results = stmt.executeUpdate();
+
+
+            if (results == 0)
+            {
+                throw new AMQException("Queue " + name + " not found");
+            }
+
+            conn.commit();
+            stmt.close();
+        }
+        catch (SQLException e)
+        {
+            throw new AMQException("Error writing deleting with name " + name 
+ " from database: " + e, e);
+        }
+        finally
+        {
+            if(conn != null)
+            {
+               try
+               {
+                   conn.close();
+               }
+               catch (SQLException e)
+               {
+                   _logger.error(e);
+               }
+            }
+
+        }
+
+
+    }
+
+    public void enqueueMessage(StoreContext context, AMQShortString name, Long 
messageId) throws AMQException
+    {
+
+        boolean localTx = getOrCreateTransaction(context);
+        Connection conn = getConnection(context);
+        ConnectionWrapper connWrapper = (ConnectionWrapper) 
context.getPayload();
+
+        try
+        {
+            PreparedStatement stmt = 
conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
+            stmt.setString(1,name.toString());
+            stmt.setLong(2,messageId);
+            stmt.executeUpdate();
+            connWrapper.requiresCommit();
+
+            if(localTx)
+            {
+                commitTran(context);
+            }
+
+
+
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Enqueuing message " + messageId + " on queue " 
+ name + "[Connection" + conn + "]");
+            }
+        }
+        catch (SQLException e)
+        {
+            if(localTx)
+            {
+                abortTran(context);
+            }
+            _logger.error("Failed to enqueue: " + e, e);
+            throw new AMQException("Error writing enqueued message with id " + 
messageId + " for queue " + name
+                + " to database", e);
+        }
+
+    }
+
+    public void dequeueMessage(StoreContext context, AMQShortString name, Long 
messageId) throws AMQException
+    {
+
+        boolean localTx = getOrCreateTransaction(context);
+        Connection conn = getConnection(context);
+        ConnectionWrapper connWrapper = (ConnectionWrapper) 
context.getPayload();
+
+        try
+        {
+            PreparedStatement stmt = 
conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
+            stmt.setString(1,name.toString());
+            stmt.setLong(2,messageId);
+            int results = stmt.executeUpdate();
+
+            connWrapper.requiresCommit();
+
+            if(results != 1)
+            {
+                throw new AMQException("Unable to find message with id " + 
messageId + " on queue " + name);
+            }
+
+            if(localTx)
+            {
+                commitTran(context);
+            }
+
+
+
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Dequeuing message " + messageId + " on queue " 
+ name + "[Connection" + conn + "]");
+            }
+        }
+        catch (SQLException e)
+        {
+            if(localTx)
+            {
+                abortTran(context);
+            }
+            _logger.error("Failed to dequeue: " + e, e);
+            throw new AMQException("Error deleting enqueued message with id " 
+ messageId + " for queue " + name
+                + " from database", e);
+        }
+
+    }
+
+    private static final class ConnectionWrapper
+    {
+        private final Connection _connection;
+        private boolean _requiresCommit;
+
+        public ConnectionWrapper(Connection conn)
+        {
+            _connection = conn;
+        }
+
+        public void setRequiresCommit()
+        {
+            _requiresCommit = true;
+        }
+
+        public boolean requiresCommit()
+        {
+            return _requiresCommit;
+        }
+
+        public Connection getConnection()
+        {
+            return _connection;
+        }
+    }
+
+    public void beginTran(StoreContext context) throws AMQException
+    {
+        if (context.getPayload() != null)
+        {
+            throw new AMQException("Fatal internal error: transactional 
context is not empty at beginTran: "
+                + context.getPayload());
+        }
+        else
+        {
+            try
+            {
+                Connection conn = newConnection();
+
+
+                context.setPayload(new ConnectionWrapper(conn));
+            }
+            catch (SQLException e)
+            {
+                throw new AMQException("Error starting transaction: " + e, e);
+            }
+        }
+    }
+
+    public void commitTran(StoreContext context) throws AMQException
+    {
+        ConnectionWrapper connWrapper = (ConnectionWrapper) 
context.getPayload();
+
+        if (connWrapper == null)
+        {
+            throw new AMQException("Fatal internal error: transactional 
context is empty at commitTran");
+        }
+
+        try
+        {
+            Connection conn = connWrapper.getConnection();
+            if(connWrapper.requiresCommit())
+            {
+                conn.commit();
+
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("commit tran completed");
+                }
+
+            }
+            conn.close();
+        }
+        catch (SQLException e)
+        {
+            throw new AMQException("Error commit tx: " + e, e);
+        }
+        finally
+        {
+            context.setPayload(null);
+        }
+    }
+
+    public void abortTran(StoreContext context) throws AMQException
+    {
+        ConnectionWrapper connWrapper = (ConnectionWrapper) 
context.getPayload();
+
+        if (connWrapper == null)
+        {
+            throw new AMQException("Fatal internal error: transactional 
context is empty at abortTran");
+        }
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("abort tran called: " + connWrapper.getConnection());
+        }
+
+        try
+        {
+            Connection conn = connWrapper.getConnection();
+            if(connWrapper.requiresCommit())
+            {
+                conn.rollback();
+            }
+
+            conn.close();
+        }
+        catch (SQLException e)
+        {
+            throw new AMQException("Error aborting transaction: " + e, e);
+        }
+        finally
+        {
+            context.setPayload(null);
+        }
+    }
+
+    public boolean inTran(StoreContext context)
+    {
+        return context.getPayload() != null;
+    }
+
+    public Long getNewMessageId()
+    {
+        return _messageId.getAndIncrement();
+    }
+
+    public void storeContentBodyChunk(StoreContext context,
+                                      Long messageId,
+                                      int index,
+                                      ContentChunk contentBody,
+                                      boolean lastContentBody) throws 
AMQException
+    {
+        boolean localTx = getOrCreateTransaction(context);
+        Connection conn = getConnection(context);
+        ConnectionWrapper connWrapper = (ConnectionWrapper) 
context.getPayload();
+
+        try
+        {
+            PreparedStatement stmt = 
conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
+            stmt.setLong(1,messageId);
+            stmt.setInt(2, index);
+            byte[] chunkData = new byte[contentBody.getSize()];
+            contentBody.getData().duplicate().get(chunkData);
+            /* this would be the Java 6 way of doing things
+            Blob dataAsBlob = conn.createBlob();
+            dataAsBlob.setBytes(1L, chunkData);
+            stmt.setBlob(3, dataAsBlob);
+            */
+            ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
+            stmt.setBinaryStream(3, bis);
+            stmt.executeUpdate();
+            connWrapper.requiresCommit();
+
+            if(localTx)
+            {
+                commitTran(context);
+            }
+        }
+        catch (SQLException e)
+        {
+            if(localTx)
+            {
+                abortTran(context);
+            }
+
+            throw new AMQException("Error writing AMQMessage with id " + 
messageId + " to database: " + e, e);
+        }
+
+    }
+
+    public void storeMessageMetaData(StoreContext context, Long messageId, 
MessageMetaData mmd)
+            throws AMQException
+    {
+
+        boolean localTx = getOrCreateTransaction(context);
+        Connection conn = getConnection(context);
+        ConnectionWrapper connWrapper = (ConnectionWrapper) 
context.getPayload();
+
+        try
+        {
+
+            PreparedStatement stmt = 
conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
+            stmt.setLong(1,messageId);
+            stmt.setString(2, 
mmd.getMessagePublishInfo().getExchange().toString());
+            stmt.setString(3, 
mmd.getMessagePublishInfo().getRoutingKey().toString());
+            stmt.setShort(4, mmd.getMessagePublishInfo().isMandatory() ? 
(short) 1 : (short) 0);
+            stmt.setShort(5, mmd.getMessagePublishInfo().isImmediate() ? 
(short) 1 : (short) 0);
+
+            ContentHeaderBody headerBody = mmd.getContentHeaderBody();
+            final int bodySize = headerBody.getSize();
+            byte[] underlying = new byte[bodySize];
+            ByteBuffer buf = ByteBuffer.wrap(underlying);
+            headerBody.writePayload(buf);
+/*
+            Blob dataAsBlob = conn.createBlob();
+            dataAsBlob.setBytes(1L, underlying);
+            stmt.setBlob(6, dataAsBlob);
+*/
+            ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
+            stmt.setBinaryStream(6,bis);
+
+            stmt.setInt(7, mmd.getContentChunkCount());
+
+            stmt.executeUpdate();
+            connWrapper.requiresCommit();
+
+            if(localTx)
+            {
+                commitTran(context);
+            }
+        }
+        catch (SQLException e)
+        {
+            if(localTx)
+            {
+                abortTran(context);
+            }
+
+            throw new AMQException("Error writing AMQMessage with id " + 
messageId + " to database: " + e, e);
+        }
+
+
+    }
+
+    public MessageMetaData getMessageMetaData(StoreContext context, Long 
messageId) throws AMQException
+    {
+        boolean localTx = getOrCreateTransaction(context);
+        Connection conn = getConnection(context);
+
+
+        try
+        {
+
+            PreparedStatement stmt = 
conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
+            stmt.setLong(1,messageId);
+            ResultSet rs = stmt.executeQuery();
+
+            if(rs.next())
+            {
+                final AMQShortString exchange = new 
AMQShortString(rs.getString(1));
+                final AMQShortString routingKey = rs.getString(2) == null ? 
null : new AMQShortString(rs.getString(2));
+                final boolean mandatory = (rs.getShort(3) != (short)0);
+                final boolean immediate = (rs.getShort(4) != (short)0);
+                MessagePublishInfo info = new MessagePublishInfo()
+                                            {
+
+                                                public AMQShortString 
getExchange()
+                                                {
+                                                    return exchange;
+                                                }
+
+                                                public void 
setExchange(AMQShortString exchange)
+                                                {
+
+                                                }
+
+                                                public boolean isImmediate()
+                                                {
+                                                    return immediate;
+                                                }
+
+                                                public boolean isMandatory()
+                                                {
+                                                    return mandatory;
+                                                }
+
+                                                public AMQShortString 
getRoutingKey()
+                                                {
+                                                    return routingKey;
+                                                }
+                                            }   ;
+
+                Blob dataAsBlob = rs.getBlob(5);
+
+                byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) 
dataAsBlob.length());
+                ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+
+                ContentHeaderBody chb = 
ContentHeaderBody.createFromBuffer(buf, dataAsBytes.length);
+
+                if(localTx)
+                {
+                    commitTran(context);
+                }
+
+                return new MessageMetaData(info, chb, rs.getInt(6));
+
+            }
+            else
+            {
+                if(localTx)
+                {
+                    abortTran(context);
+                }
+                throw new AMQException("Metadata not found for message with id 
" + messageId);
+            }
+        }
+        catch (SQLException e)
+        {
+            if(localTx)
+            {
+                abortTran(context);
+            }
+
+            throw new AMQException("Error reading AMQMessage with id " + 
messageId + " from database: " + e, e);
+        }
+
+
+    }
+
+    public ContentChunk getContentBodyChunk(StoreContext context, Long 
messageId, int index) throws AMQException
+    {
+        boolean localTx = getOrCreateTransaction(context);
+                Connection conn = getConnection(context);
+
+
+                try
+                {
+
+                    PreparedStatement stmt = 
conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
+                    stmt.setLong(1,messageId);
+                    stmt.setInt(2, index);
+                    ResultSet rs = stmt.executeQuery();
+
+                    if(rs.next())
+                    {
+                        Blob dataAsBlob = rs.getBlob(1);
+
+                        final int size = (int) dataAsBlob.length();
+                        byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
+                        final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+
+                        ContentChunk cb = new ContentChunk()
+                        {
+
+                            public int getSize()
+                            {
+                                return size;
+                            }
+
+                            public ByteBuffer getData()
+                            {
+                                return buf;
+                            }
+
+                            public void reduceToFit()
+                            {
+
+                            }
+                        };
+
+                        if(localTx)
+                        {
+                            commitTran(context);
+                        }
+
+                        return cb;
+
+                    }
+                    else
+                    {
+                        if(localTx)
+                        {
+                            abortTran(context);
+                        }
+                        throw new AMQException("Message not found for message 
with id " + messageId);
+                    }
+                }
+                catch (SQLException e)
+                {
+                    if(localTx)
+                    {
+                        abortTran(context);
+                    }
+
+                    throw new AMQException("Error reading AMQMessage with id " 
+ messageId + " from database: " + e, e);
+                }
+
+
+
+    }
+
+    private void checkNotClosed() throws MessageStoreClosedException
+    {
+        if (_closed.get())
+        {
+            throw new MessageStoreClosedException();
+        }
+    }
+
+
+    private static final class ProcessAction
+    {
+        private final AMQQueue _queue;
+        private final StoreContext _context;
+        private final AMQMessage _message;
+
+        public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage 
message)
+        {
+            _queue = queue;
+            _context = context;
+            _message = message;
+        }
+
+        public void process() throws AMQException
+        {
+            _queue.process(_context, _queue.createEntry(_message), false);
+        }
+
+    }
+
+
+    private void deliverMessages(final StoreContext context, 
Map<AMQShortString, AMQQueue> queues)
+        throws SQLException, AMQException
+    {
+        Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>();
+        List<ProcessAction> actions = new ArrayList<ProcessAction>();
+
+        Map<AMQShortString, Integer> queueRecoveries = new 
TreeMap<AMQShortString, Integer>();
+
+        final boolean inLocaltran = inTran(context);
+        Connection conn = null;
+        try
+        {
+
+            if(inLocaltran)
+            {
+                conn = getConnection(context);
+            }
+            else
+            {
+                conn = newConnection();
+            }
+
+
+            MessageHandleFactory messageHandleFactory = new 
MessageHandleFactory();
+            long maxId = 1;
+
+            TransactionalContext txnContext = new 
NonTransactionalContext(this, new StoreContext(), null, null);
+
+            Statement stmt = conn.createStatement();
+            ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+
+
+            while (rs.next())
+            {
+
+
+
+                AMQShortString queueName = new AMQShortString(rs.getString(1));
+
+
+                AMQQueue queue = queues.get(queueName);
+                if (queue == null)
+                {
+                    queue = new AMQQueue(queueName, false, null, false, 
_virtualHost);
+                    _virtualHost.getQueueRegistry().registerQueue(queue);
+                    queues.put(queueName, queue);
+                }
+
+                long messageId = rs.getLong(2);
+                maxId = Math.max(maxId, messageId);
+                AMQMessage message = msgMap.get(messageId);
+
+                if(message != null)
+                {
+                    message.incrementReference();
+                }
+                else
+                {
+                    message = new AMQMessage(messageId, this, 
messageHandleFactory, txnContext);
+                    msgMap.put(messageId,message);
+                }
+
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("On recovery, delivering " + 
message.getMessageId() + " to " + queue.getName());
+                }
+
+                if (_logger.isInfoEnabled())
+                {
+                    Integer count = queueRecoveries.get(queueName);
+                    if (count == null)
+                    {
+                        count = 0;
+                    }
+
+                    queueRecoveries.put(queueName, ++count);
+
+                }
+
+                actions.add(new ProcessAction(queue, context, message));
+
+            }
+
+            for(ProcessAction action : actions)
+            {
+                action.process();
+            }
+
+            _messageId.set(maxId + 1);
+        }
+        catch (SQLException e)
+        {
+            _logger.error("Error: " + e, e);
+            throw e;
+        }
+        finally
+        {
+            if (inLocaltran && conn != null)
+            {
+                conn.close();
+            }
+        }
+
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Recovered message counts: " + queueRecoveries);
+        }
+    }
+
+    private Connection getConnection(final StoreContext context)
+    {
+        return ((ConnectionWrapper)context.getPayload()).getConnection();
+    }
+
+    private boolean getOrCreateTransaction(StoreContext context) throws 
AMQException
+    {
+
+        ConnectionWrapper tx = (ConnectionWrapper) context.getPayload();
+        if (tx == null)
+        {
+            beginTran(context);
+            return true;
+        }
+
+        return false;
+    }
+
+    private synchronized void stateTransition(State requiredState, State 
newState) throws AMQException
+    {
+        if (_state != requiredState)
+        {
+            throw new AMQException("Cannot transition to the state: " + 
newState + "; need to be in state: " + requiredState
+                + "; currently in state: " + _state);
+        }
+
+        _state = newState;
+    }
+}

Modified: incubator/qpid/branches/M2.x/java/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.x/java/pom.xml?rev=648834&r1=648833&r2=648834&view=diff
==============================================================================
--- incubator/qpid/branches/M2.x/java/pom.xml (original)
+++ incubator/qpid/branches/M2.x/java/pom.xml Wed Apr 16 13:30:38 2008
@@ -524,6 +524,11 @@
                 <version>1.0.1</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.derby</groupId>
+                <artifactId>derby</artifactId>
+                <version>10.3.2.1</version>
+            </dependency>
+            <dependency>
                 <groupId>org.apache.mina</groupId>
                 <artifactId>mina-java5</artifactId>
                 <version>1.0.1</version>


Reply via email to