On Tue, 2012-07-17 at 09:28 +0200, Toralf Lund wrote:
> Hi
> 
> Does anyone here receive "published" messages in an application using a 
> something like a GLib main loop? Or Gtk or Qt? If you do, how exactly do 
> you integrate the message availability check with the main loop? I was 
> hoping there would be filedescriptor that had data available whenever 
> messages have arrived, but can't find anything. So I'm experimenting 
> with a dual thread setup, where one thread just checks for messages and 
> notifies the other via a pipe. I'm not quite sure what the best way of 
> checking is, though... I've also tried doing the check in an idle 
> handler, which does work, but this approach has certain limitations.

There's no FD that you can add to a poll or select loop so a
multi-threaded design is probably the only way to go right now. (I would
like to see something like that in the future.)

You have a couple of options for how you wait for messages. You can have
a thread-per-receiver in a loop calling Receiver::fetch(), or have a
thread per session in a loop calling Session::nextReceiver() and then
calling fetch() on that receiver. At present there's no way to have a
single thread handle all the traffic on a connection, or to have single
global thread handling all the Qpid traffic. (Those are both interesting
avenues to develop)

As to delivering your messages from the listening thread to the active
thread, one way would be to use a qpid::sys::PollableQueue. This is an
in-memory queue (not corresponding to a Qpid queue) that has an
associated FD that you *can* integrate with poll or select. So the
architecture I imagine is: multiple threads doing blocking receive and
putting the messages on a PollableQueue (maybe more than one) that has
its FD registered with your main loop.

You could use some other inter-thread communication, PollableQueue is
just one I know well and find convenient.

Cheers,
Alan.
Index: src/qpid/framing/IsInSequenceSet.h
===================================================================
--- src/qpid/framing/IsInSequenceSet.h	(revision 1356952)
+++ src/qpid/framing/IsInSequenceSet.h	(working copy)
@@ -41,6 +41,10 @@
         return i != set.rangesEnd() && i->begin() <= n;
     }
 
+    bool completed() {
+        return i == set.rangesEnd();
+    }
+
   private:
     const SequenceSet& set;
     SequenceSet::RangeIterator i;
Index: src/qpid/broker/SemanticState.cpp
===================================================================
--- src/qpid/broker/SemanticState.cpp	(revision 1356952)
+++ src/qpid/broker/SemanticState.cpp	(working copy)
@@ -780,6 +780,14 @@
     }
 };
 
+template <class F> void forEachInSequenceSet(const SequenceSet& commands, DeliveryRecords& unacked, F functor)
+{
+    IsInSequenceSet test(commands);
+    for (DeliveryRecords::iterator i = unacked.begin(); i != unacked.end() && !test.completed(); ++i) {
+        if (test(i->getId())) functor(*i);
+    }
+}
+
 template<class Predicate> IsInSequenceSetAnd<Predicate>
 isInSequenceSetAnd(const SequenceSet& s, Predicate p) {
     return IsInSequenceSetAnd<Predicate>(s,p);
@@ -802,29 +810,21 @@
             //mark the relevant messages as 'ended' in unacked
             //if the messages are already completed, they can be
             //removed from the record
-            DeliveryRecords::iterator removed =
-                remove_if(unacked.begin(), unacked.end(),
-                          isInSequenceSetAnd(commands,
-                                             bind(&DeliveryRecord::setEnded, _1)));
-            unacked.erase(removed, unacked.end());
+            for_each(unacked.begin(), unacked.end(), isInSequenceSetAnd(commands, bind(&DeliveryRecord::setEnded, _1)));
+            while (!unacked.empty() && unacked.front().isRedundant()) unacked.pop_front();
         }
     } else {
-        DeliveryRecords::iterator removed =
-            remove_if(unacked.begin(), unacked.end(),
-                      isInSequenceSetAnd(commands,
-                                         bind(&DeliveryRecord::accept, _1,
-                                              (TransactionContext*) 0)));
-        unacked.erase(removed, unacked.end());
+        for_each(unacked.begin(), unacked.end(), isInSequenceSetAnd(commands, bind(&DeliveryRecord::accept, _1,(TransactionContext*) 0)));
+        while (!unacked.empty() && unacked.front().isRedundant()) unacked.pop_front();
     }
     getSession().setUnackedCount(unacked.size());
 }
 
 void SemanticState::completed(const SequenceSet& commands) {
-    DeliveryRecords::iterator removed =
-        remove_if(unacked.begin(), unacked.end(),
-                  isInSequenceSetAnd(commands,
-                                     bind(&SemanticState::complete, this, _1)));
-    unacked.erase(removed, unacked.end());
+    for_each(unacked.begin(), unacked.end(), isInSequenceSetAnd(commands, bind(&SemanticState::complete, this, _1)));
+    forEachInSequenceSet(commands, unacked, bind(&SemanticState::complete, this, _1));
+    while (!unacked.empty() && unacked.front().isRedundant()) unacked.pop_front();
+
     requestDispatch();
     getSession().setUnackedCount(unacked.size());
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to