Author: arnaudsimon
Date: Tue Oct 14 06:56:27 2008
New Revision: 704541

URL: http://svn.apache.org/viewvc?rev=704541&view=rev
Log:
qpid-1284: qman_14102008_latest.patch (on behalf Andrea) 

Removed:
    
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/Constants.java
    
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/ManagementQueueMessageListenerParser.java
    
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/MethodReplyQueueMessageListenerParser.java
Modified:
    
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
    
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java
    
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java

Modified: 
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java?rev=704541&r1=704540&r2=704541&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
 Tue Oct 14 06:56:27 2008
@@ -56,7 +56,6 @@
          * Builds a new decorator with the given connection.
          *
          * @param brokerId the broker identifier.
-         * @param decoratee the underlying connection.
          */
         private PooledConnection(UUID brokerId)
         {
@@ -132,7 +131,7 @@
         @Override
         public Connection makeObject () throws Exception
         {
-            PooledConnection connection = new PooledConnection(_brokerId);
+               PooledConnection connection = new PooledConnection(_brokerId);
             connection.connect(
                     _connectionData.getHost(),
                     _connectionData.getPort(),
@@ -206,12 +205,13 @@
 
     /**
      * Adds a connection pool to this datasource.
-     *
+     * 
      * @param brokerId the broker identifier that will be associated with the 
new connection pool.
      * @param connectionData the broker connection data.
      * @throws Exception when the pool cannot be created.
      */
-    void addConnectionPool(UUID brokerId,BrokerConnectionData connectionData) 
throws Exception {
+    void addConnectionPool(UUID brokerId,BrokerConnectionData connectionData) 
throws Exception 
+    {
         GenericObjectPoolFactory factory = new GenericObjectPoolFactory(
                 new QpidConnectionFactory(brokerId,connectionData),
                 connectionData.getMaxPoolCapacity(),
@@ -219,13 +219,25 @@
                 connectionData.getMaxWaitTimeout(),-1,
                 true,
                 false);
+    
         ObjectPool pool = factory.createPool();
 
-        for (int i  = 0; i < connectionData.getInitialPoolCapacity(); i++)
+        // Open connections at startup according to initial capacity param 
value.
+        int howManyConnectionAtStartup = 
connectionData.getInitialPoolCapacity(); 
+        Object [] openStartupList = new Object[howManyConnectionAtStartup];
+        
+        // Open...
+        for (int index  = 0; index < howManyConnectionAtStartup; index++)
+        {
+            openStartupList[index] = pool.borrowObject();
+        }
+        
+        // ...and immediately return them to pool. In this way the pooled 
connection has been opened.
+        for (int index = 0; index < howManyConnectionAtStartup; index++)
         {
-            pool.returnObject(pool.borrowObject());
+            pool.returnObject(openStartupList[index]);
         }
 
         pools.put(brokerId,pool);
     }
-}
\ No newline at end of file
+}

Modified: 
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java?rev=704541&r1=704540&r2=704541&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java
 Tue Oct 14 06:56:27 2008
@@ -739,7 +739,7 @@
         try
         {
             _service.connect();
-           // _service.requestSchema(_parent.getName(), _name, _hash);
+           _service.requestSchema(_parent.getName(), _name, _hash);
             _service.sync();
         } finally
         {
@@ -770,7 +770,7 @@
             
             int sequenceNumber = 
SequenceNumberGenerator.getNextSequenceNumber();
             _methodInvocationListener.operationIsGoingToBeInvoked(new 
InvocationEvent(this,sequenceNumber,_exchangeChannelForMethodInvocations));
-           // _service.invoke(_parent.getName(), _name, 
_hash,objectId,parameters, method,sequenceNumber);
+           _service.invoke(_parent.getName(), _name, 
_hash,objectId,parameters, method,sequenceNumber);
             
             // TODO : Shoudl be configurable?
             InvocationResult result = 
_exchangeChannelForMethodInvocations.poll(5000,TimeUnit.MILLISECONDS);
@@ -879,4 +879,4 @@
         }
         _service.close();
     }
-}
\ No newline at end of file
+}

Modified: 
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java?rev=704541&r1=704540&r2=704541&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
 Tue Oct 14 06:56:27 2008
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.management.domain.services;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Map;
@@ -27,10 +28,14 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.qpid.QpidException;
-import org.apache.qpid.management.Constants;
+import org.apache.qpid.api.Message;
 import org.apache.qpid.management.Names;
 import org.apache.qpid.management.configuration.Configuration;
 import org.apache.qpid.management.configuration.QpidDatasource;
+import org.apache.qpid.management.domain.model.QpidMethod;
+import org.apache.qpid.management.domain.model.type.Binary;
+import org.apache.qpid.management.messages.MethodInvocationRequestMessage;
+import org.apache.qpid.management.messages.SchemaRequestMessage;
 import org.apache.qpid.nclient.util.MessageListener;
 import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
 import org.apache.qpid.transport.Connection;
@@ -110,7 +115,7 @@
     {
         _connection = QpidDatasource.getInstance().getConnection(_brokerId);
         _listeners = new 
ConcurrentHashMap<String,MessagePartListenerAdapter>();
-        _session = _connection.createSession(Constants.NO_EXPIRATION);
+        _session = _connection.createSession(0);
         _session.setSessionListener(this);
     }
 
@@ -299,4 +304,126 @@
 
         Log.logMessageContent (messageData);
     }
-}
\ No newline at end of file
+    
+    /**
+     * Requests a schema for the given package.class.hash.
+     * 
+     * @param packageName the package name.
+     * @param className the class name.
+     * @param schemaHash the schema hash.
+     * @throws IOException when the schema request cannot be sent.
+     */
+    public void requestSchema(final String packageName, final String 
className, final Binary schemaHash) throws IOException
+    {
+        Message message = new SchemaRequestMessage()
+        {
+            @Override
+            protected String className ()
+            {
+                return className;
+            }
+
+            @Override
+            protected String packageName ()
+            {
+                return packageName;
+            }
+
+            @Override
+            protected Binary schemaHash ()
+            {
+                return schemaHash;
+            }
+        };
+        
+        sendMessage(message);
+    }
+    
+    /**
+     * Invokes an operation on a broker object instance.
+     * 
+     * @param packageName the package name.
+     * @param className the class name.
+     * @param schemaHash the schema hash of the corresponding class.
+     * @param objectId the object instance identifier.
+     * @param parameters the parameters for this invocation.
+     * @param method the method (definition) invoked.
+     * @return the sequence number used for this message.
+     * @throws MethodInvocationException when the invoked method returns an 
error code.
+     * @throws UnableToComplyException when it wasn't possibile to invoke the 
requested operation. 
+     */
+    public void invoke(
+            final String packageName, 
+            final String className, 
+            final Binary schemaHash, 
+            final Binary objectId, 
+            final Object[] parameters, 
+            final QpidMethod method,
+            final int sequenceNumber) throws MethodInvocationException, 
UnableToComplyException 
+    {
+        Message message = new MethodInvocationRequestMessage()
+        {
+            
+            @Override
+            protected int sequenceNumber ()
+            {
+                return sequenceNumber;
+            }
+            
+            protected Binary objectId() {
+                return objectId;
+            }
+            
+            protected String packageName()
+            {
+                return packageName;
+            }
+            
+            protected String className() 
+            {
+                return className;
+            }
+            
+            @Override
+            protected QpidMethod method ()
+            {
+                return method;
+            }
+
+            @Override
+            protected Object[] parameters ()
+            {
+                return parameters;
+            }
+
+            @Override
+            protected Binary schemaHash ()
+            {
+                return schemaHash;
+            }
+        };
+        
+        try {
+            sendMessage(message);
+            sync();
+        } catch(Exception exception) {
+            throw new UnableToComplyException(exception);
+        }
+    }     
+    
+    /**
+     * Sends a command message.
+     * 
+     * @param message the command message.
+     * @throws IOException when the message cannot be sent.
+     */
+    public void sendMessage(Message message) throws IOException
+    {
+        _session.messageTransfer(
+                Names.MANAGEMENT_EXCHANGE,
+                MessageAcceptMode.EXPLICIT,
+                MessageAcquireMode.PRE_ACQUIRED,
+                message.getHeader(),
+                message.readData());
+    }      
+}


Reply via email to