I am running C++ Broker with JAVA clients. I have one client that creates four producer threads with topics of NODE0, NODE1, NODE2, NODE3 each of these send a message. Then I have one java client that consume messages from topic NODE0. I am using the destination format amq.topic. destination.NODE0 = amq.topic destination.NODE1 = amq.topic destination.NODE2 = amq.topic destination.NODE3 = amq.topic
Why does the broker sends message from all producers to one consumer. I know if I use Queue I can send from NODE0 producer to NODE0 consumer. But what do I need to do so this also works. Is this in the Address Format or broker configuration? Thanks for help. 2011-11-15 15:19:42 trace [0x41b4b940] qpid/amqp_0_10/Connection.cpp:57:virtual size_t qpid::amqp_0_10::Connection::decode(const char*, size_t): RECV [10.192.4.220:5672-10.192.4.220:43564]: Frame[Ebe; channel=0; content (23 bytes) BusMsg 3 on topic NODE3] 2011-11-15 15:19:42 trace [0x41b4b940] qpid/SessionState.cpp:200:virtual bool qpid::SessionState::receiverRecord(const qpid::framing::AMQFrame&): [email protected]: recv cmd 11: content (23 bytes) BusMsg 3 on topic NODE3 2011-11-15 15:19:42 debug [0x41b4b940] qpid/broker/Queue.cpp:169:void qpid::broker::Queue::deliver(boost::intrusive_ptr<qpid::broker::Message>): Message 0x10c1b610 enqueued on TempQueue13e9eb5a-2cca-4d13-a4e6-1a624e121be1 2011-11-15 15:19:42 debug [0x41548940] qpid/broker/SemanticState.cpp:398:bool qpid::broker::SemanticState::ConsumerImpl::checkCredit(boost::intrusive_ptr<qpid::broker::Message>&): Sufficient credit for 1 on [email protected], have bytes: 4294967295 msgs: 497, need 134 bytes 2011-11-15 15:19:42 debug [0x41548940] qpid/broker/SemanticState.cpp:387:void qpid::broker::SemanticState::ConsumerImpl::allocateCredit(boost::intrusive_ptr<qpid::broker::Message>&): Credit allocated for 1 on [email protected], was bytes: 4294967295 msgs: 497 now bytes: 4294967295 msgs: 496 2011-11-15 15:19:42 trace [0x41548940] qpid/SessionState.cpp:119:virtual void qpid::SessionState::senderRecord(const qpid::framing::AMQFrame&): [email protected]: sent cmd 6: {MessageTransferBody: destination=1; accept-mode=1; acquire-mode=0; } 2011-11-15 15:19:42 trace [0x41548940] qpid/SessionState.cpp:119:virtual void qpid::SessionState::senderRecord(const qpid::framing::AMQFrame&): [email protected]: sent cmd 6: header (111 bytes); properties={{MessageProperties: content-length=0; message-id=54c21b27-5c31-33dc-942d-070a4533cacc; content-type=text/plain; content-encoding=UTF-8; user-id=guest; application-headers={qpid.subject:V2:1:str16(#)}; }{DeliveryProperties: priority=4; delivery-mode=2; timestamp=1321388382537; exchange=amq.topic; routing-key=#; }} 2011-11-15 15:19:42 trace [0x41548940] qpid/SessionState.cpp:119:virtual void qpid::SessionState::senderRecord(const qpid::framing::AMQFrame&): [email protected]: sent cmd 6: content (23 bytes) BusMsg 3 on topic NODE3 2011-11-15 15:19:42 debug [0x41548940] qpid/broker/DeliveryRecord.cpp:57:bool qpid::broker::DeliveryRecord::setEnded(): DeliveryRecord::setEnded() id=6 2011-11-15 15:19:42 debug [0x41b4b940] qpid/broker/SessionState.cpp:482:virtual void qpid::broker::SessionState::IncompleteIngressMsgXfer::completed(bool): : receive completed for msg seq=11 2011-11-15 15:19:42 debug [0x41b4b940] qpid/SessionState.cpp:214:virtual void qpid::SessionState::receiverCompleted(qpid::framing::SequenceNumber, bool): [email protected]: receiver marked completed: 11 incomplete: { } unknown-completed: { [1,11] } 2011-11-15 15:19:42 trace [0x41548940] qpid/amqp_0_10/Connection.cpp:95:virtual size_t qpid::amqp_0_10::Connection::encode(const char*, size_t): *SENT *[10.192.4.220:5672-10.192.4.220:43563]: Frame[Bbe; channel=0; {MessageTransferBody: destination=1; accept-mode=1; acquire-mode=0; }] 2011-11-15 15:19:42 trace [0x41548940] qpid/amqp_0_10/Connection.cpp:95:virtual size_t qpid::amqp_0_10::Connection::encode(const char*, size_t): *SENT *[10.192.4.220:5672-10.192.4.220:43563]: Frame[be; channel=0; header (111 bytes); properties={{MessageProperties: content-length=0; message-id=54c21b27-5c31-33dc-942d-070a4533cacc; content-type=text/plain; content-encoding=UTF-8; user-id=guest; application-headers={qpid.subject:V2:1:str16(#)}; }{DeliveryProperties: priority=4; delivery-mode=2; timestamp=1321388382537; exchange=amq.topic; routing-key=#; }}] 2011-11-15 15:19:42 trace [0x41548940] qpid/amqp_0_10/Connection.cpp:95:virtual size_t qpid::amqp_0_10::Connection::encode(const char*, size_t): *SENT *[10.192.4.220:5672-10.192.4.220:43563]: Frame[Ebe; channel=0; content (23 bytes) BusMsg 3 on topic NODE3] -- View this message in context: http://apache-qpid-users.2158936.n2.nabble.com/Broker-sends-messages-from-one-producer-topic-to-all-consumers-tp6997882p6997882.html Sent from the Apache Qpid users mailing list archive at Nabble.com. --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
