Hi,
My name is Wai Yang Yap and I'm new in the JMS & QPID.
Currently we are trying to connect a Java side with a Ruby side using
QPID.
We have the following situation; we have a server written in Java that
is accepting jobs from clients, and we have different servers (all in
Ruby) where we actually process the jobs. We will be pushing jobs into
the queues and the different servers will read from these queues. When
the Ruby clients are done, they will sent a message back to the queue
that is read by Java server again.
I have experimented with QPID for a week now, and I got the Java
Subscriber and Ruby Publisher working. So I can send messages from Ruby
to Java with the QPID broker in between them. The problem that I have
now, is when I use the same setup (only reverse, Java sending messages
to Ruby), it crashes at Ruby. Ruby receives the messages, but when it
tries to parse the message, it crashes.
The exception that it gives is:
undefined method `timestamp' for #<Codec::Decoder:0x2e388a0>
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/codec.rb:17
2:in `send'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/codec.rb:17
2:in `decode'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/connection.
rb:223:in `decode'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/connection.
rb:222:in `each'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/connection.
rb:222:in `decode'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/connection.
rb:102:in `decode'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/connection.
rb:61:in `read'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:90:
in `reader'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:74:
in `send'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:74:
in `spawn'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:72:
in `initialize'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:72:
in `new'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:72:
in `spawn'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:55:
in `start'
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/client.rb:8
3:in `start'
D:/1-Projects/waiyang/Projects/WeBellen/Reviva/Test/src/Receiver.rb:10
deadlock 0x2e5eaa0: sleep:- -
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/queue.rb:36
deadlock 0x284c748: sleep:- (main) -
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/queue.rb:36
deadlock 0x2e5e550: sleep:- -
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/queue.rb:36
D:\1-Projects\waiyang\Projects\WeBellen\Reviva\Test\src/qpid/peer.rb:72:
in `pop': Thread(0x2e5e758): deadlock (fatal)
from
D:/1-Projects/waiyang/Projects/WeBellen/Reviva/Test/src/Receiver.rb:18
I'm currently using the amqp-0.8 spec version.
I tried with 0.9 however I crashed when it tries to read the specs.
Below I have added the code that I used for testing.
Ruby code:
require "qpid"
require 'rexml/document'
include Qpid
include REXML
spec =
Spec.load("D:/1-Projects/waiyang/Projects/WeBellen/Reviva/Test/src/specs
/amqp.0-8.xml")
c = Client.new("127.0.0.1", 5672, spec, "localhost")
#c.start("\0guest\0guest", mechanism="PLAIN")
c.start({"LOGIN" => "guest", "PASSWORD" => "guest"})
ch = c.channel(1)
ch.channel_open()
ch.queue_declare(:queue => "test-queue")
#ch.queue_bind(:queue_name => "test-queue")
ch.basic_consume(:queue => "test-queue", :consumer_tag => "ctag")
while true
msg = c.queue("ctag").pop()
text = Document.new(msg.content.body)
print "#{text.elements['content/msg/sentence'].text}\n"
print "#{text.elements['content/script'].text}\n"
ch.basic_ack(msg.delivery_tag)
sleep 0.00001
end
ch.channel_close(:reply_code => 200, :reply_text => "Ok")
Java code:
package nl.quince.publisher;
import java.io.ByteArrayOutputStream;
import java.util.Hashtable;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.spi.InitialContextFactory;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.dom4j.io.DOMReader;
import org.dom4j.io.OutputFormat;
import org.dom4j.io.XMLWriter;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
public class MessagePublisher {
public static Document createXmlDoc() {
Document result = null;
try {
DocumentBuilderFactory factory =
DocumentBuilderFactory.newInstance();
DocumentBuilder builder =
factory.newDocumentBuilder();
result = builder.newDocument();
Element root =
result.createElement("content");
Element msg =
result.createElement("msg");
Element sentence =
result.createElement("sentence");
sentence.setTextContent("hello
world");
msg.appendChild(sentence);
root.appendChild(msg);
Element script =
result.createElement("script");
script.setTextContent("puts 'hello
world'");
root.appendChild(script);
result.appendChild(root);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
public static void main(String[] args) {
Hashtable<String, String> env = new Hashtable<String, String>();
env.put("connectionfactory.connection",
"amqp://guest:[EMAIL PROTECTED]/localhost?brokerlist='tcp://127.0.0.1:5672'
");
env.put("queue.queue", "test-queue");
//env.put("topic.name", "ctag");
try {
InitialContextFactory factory = new
PropertiesFileInitialContextFactory();
Context _context =
factory.getInitialContext(env);
Connection producerConnection =
((ConnectionFactory) _context.lookup("connection")).createConnection();
Session producerSession =
producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue _queue = (Queue) _context.lookup("queue");
MessageProducer producer =
producerSession.createProducer(_queue);
producerConnection.start();
for (int msg = 0; msg < 10; msg++) {
Document doc = createXmlDoc();
ByteArrayOutputStream out = new
ByteArrayOutputStream();
XMLWriter writer = new XMLWriter(out,
OutputFormat.createPrettyPrint());
writer.write((new DOMReader().read(doc)));
writer.flush();
writer.close();
BytesMessage message =
producerSession.createBytesMessage();
message.writeBytes(out.toByteArray());
producer.send(message);
}
producer.close();
producerConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
How can I solve the problem with the timestamp?
Thanx in advance for your help.
Best regards,
Wai Yang
Wai Yang Yap | quince
------------------------------------------
mobile
e-mail [EMAIL PROTECTED]
web www.quince.nl
------------------------------------------
assumburg 73
1081 gb amsterdam
the netherlands
tel: +31 (0)20 3471000
fax:+31 (0)20 3471005
Nederlands:
Dit bericht kan vertrouwelijke informatie bevatten. Indien u niet de
geadresseerde van dit bericht bent, verzoeken wij u dit bericht te vernietigen
zonder van de inhoud kennis te nemen en de inhoud ervan niet te gebruiken, niet
te kopieren en niet onder derden te verspreiden. Quince is een handelsnaam die
wordt gevoerd door Quince B.V. te Amsterdam.
Op alle werkzaamheden zijn de algemene voorwaarden van Quince B.V. van
toepassing. Daarin is in artikel 11 een beperking van aansprakelijkheid
opgenomen. De voorwaarden worden op verzoek kosteloos toegezonden. De
voorwaarden zijn ook na te lezen op http://www.quince.nl/algemenevoorwaarden
English:
This message may contain information that is privileged or confidential. If you
are not the named addressee of this message, please destroy it without reading,
using, copying or disclosing its contents to any other person. Quince is a
trade name of Quince B.V. with its office in Amsterdam.
All services are governed by the general terms and conditions of Quince B.V.
which contain a limitation of liability in article 11. A free copy of the
general terms and conditions will be provided upon request. The conditions can
also be read on http://www.quince.nl/termsandconditions