Author: rgodfrey
Date: Sun Feb 11 05:40:10 2007
New Revision: 505972
URL: http://svn.apache.org/viewvc?view=rev&rev=505972
Log:
QPID-359 : Allow for rudimentary use of access tickets
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java?view=diff&rev=505972&r1=505971&r2=505972
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java
Sun Feb 11 05:40:10 2007
@@ -28,7 +28,7 @@
* Need this adaptor class to conform to JMS spec and throw
IllegalStateException
* from createDurableSubscriber, unsubscribe, createTopic &
createTemporaryTopic
*/
-public class AMQQueueSessionAdaptor implements QueueSession
+public class AMQQueueSessionAdaptor implements QueueSession, AMQSessionAdapter
{
//holds a session for delegation
protected final AMQSession _session;
@@ -176,4 +176,8 @@
throw new IllegalStateException("Cannot call unsubscribe from
QueueSession");
}
+ public AMQSession getSession()
+ {
+ return _session;
+ }
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=505972&r1=505971&r2=505972
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Sun Feb 11 05:40:10 2007
@@ -28,6 +28,7 @@
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -64,6 +65,8 @@
private int _channelId;
+ private int _ticket;
+
private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
@@ -145,6 +148,7 @@
private boolean _hasMessageListeners;
+
/**
* Responsible for decoding a message fragment and passing it to the
appropriate message consumer.
*/
@@ -1114,7 +1118,7 @@
false, //
internal
false, // nowait
false, //
passive
- 0, // ticket
+ getTicket(), //
ticket
type); // type
getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
}
@@ -1136,7 +1140,7 @@
false,
// internal
true,
// nowait
false,
// passive
- 0, //
ticket
+
getTicket(), // ticket
type);
// type
protocolHandler.writeFrame(exchangeDeclare);
}
@@ -1169,7 +1173,7 @@
true, //
nowait
false, //
passive
amqd.getAMQQueueName(), // queue
- 0); //
ticket
+ getTicket());
// ticket
protocolHandler.writeFrame(queueDeclare);
return amqd.getAMQQueueName();
@@ -1185,7 +1189,7 @@
true, // nowait
queueName, //
queue
amqd.getRoutingKey(), // routingKey
- 0); // ticket
+ getTicket()); //
ticket
protocolHandler.writeFrame(queueBind);
}
@@ -1233,7 +1237,7 @@
consumer.isNoLocal(), // noLocal
nowait,
// nowait
queueName,
// queue
- 0); //
ticket
+
getTicket()); // ticket
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
@@ -1426,7 +1430,7 @@
false,
// ifUnused
true,
// nowait
queueName, // queue
- 0);
// ticket
+
getTicket()); // ticket
getProtocolHandler().syncWrite(queueDeleteFrame,
QueueDeleteOkBody.class);
}
catch (AMQException e)
@@ -1823,5 +1827,49 @@
throw new javax.jms.InvalidDestinationException("Invalid Queue");
}
}
+
+
+
+ public int getTicket()
+ {
+ return _ticket;
+ }
+
+ public void setTicket(int ticket)
+ {
+ _ticket = ticket;
+ }
+
+
+ public void requestAccess(AMQShortString realm, boolean exclusive, boolean
passive, boolean active, boolean write, boolean read) throws AMQException
+ {
+
getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
+
getProtocolMajorVersion(),
+
getProtocolMinorVersion(),
+
active,
+
exclusive,
+
passive,
+ read,
+ realm,
+
write),
+ new
BlockingMethodFrameListener(_channelId)
+ {
+
+ public boolean processMethod(int
channelId, AMQMethodBody frame) throws AMQException
+ {
+ if(frame instanceof
AccessRequestOkBody)
+ {
+
setTicket(((AccessRequestOkBody)frame).getTicket());
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ });
+
+ }
+
}
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java?view=auto&rev=505972
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java
Sun Feb 11 05:40:10 2007
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+public interface AMQSessionAdapter
+{
+ public AMQSession getSession();
+}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java?view=diff&rev=505972&r1=505971&r2=505972
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java
Sun Feb 11 05:40:10 2007
@@ -24,7 +24,7 @@
import javax.jms.IllegalStateException;
import java.io.Serializable;
-public class AMQTopicSessionAdaptor implements TopicSession
+public class AMQTopicSessionAdaptor implements TopicSession, AMQSessionAdapter
{
protected final AMQSession _session;
@@ -199,4 +199,8 @@
throw new IllegalStateException("Cannot call createTemporaryQueue from
TopicSession");
}
+ public AMQSession getSession()
+ {
+ return _session;
+ }
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=505972&r1=505971&r2=505972
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
Sun Feb 11 05:40:10 2007
@@ -144,7 +144,7 @@
false, // internal
true, // nowait
false, // passive
- 0, // ticket
+ _session.getTicket(), // ticket
destination.getExchangeClass()); // type
_protocolHandler.writeFrame(declare);
}
@@ -465,7 +465,7 @@
immediate, // immediate
mandatory, // mandatory
destination.getRoutingKey(), // routingKey
- 0); // ticket
+ _session.getTicket()); // ticket
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=505972&r1=505971&r2=505972
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
Sun Feb 11 05:40:10 2007
@@ -442,7 +442,7 @@
* @param frame
* @param listener the blocking listener. Note the calling thread will
block.
*/
- private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
+ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
BlockingMethodFrameListener listener)
throws AMQException
{
@@ -457,7 +457,7 @@
* @param frame
* @param listener the blocking listener. Note the calling thread will
block.
*/
- private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
+ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
BlockingMethodFrameListener listener, long timeout)
throws AMQException
{