Hi all,

This patch tries to make PGM play well with the new subscription forwarding mechanism.

However, I have no infrastructure to actually test it. It would be great if anyone could give it a go.

Martin
>From c7542981d18b13b251d5a3129f1ec7ba24aeb9a1 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Mon, 11 Jul 2011 10:18:30 +0200
Subject: [PATCH] PGM transport reconciled with subscription forwarding

As PGM is not capable of passing subscriptions upstream,
subscriptions are ignored at sub side and engine subscribes
for all messages on pub side.

Signed-off-by: Martin Sustrik <[email protected]>
---
 src/pgm_receiver.cpp |   12 +++++++++++-
 src/pgm_receiver.hpp |    4 ++++
 src/pgm_sender.cpp   |    9 +++++++++
 3 files changed, 24 insertions(+), 1 deletions(-)

diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 1fd687a..b859241 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -68,6 +68,9 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
     set_pollin (socket_handle);
 
     sink = sink_;
+
+    //  If there are any subscriptions already queued in the session, drop them.
+    drop_subscriptions ();
 }
 
 void zmq::pgm_receiver_t::unplug ()
@@ -101,7 +104,7 @@ void zmq::pgm_receiver_t::terminate ()
 
 void zmq::pgm_receiver_t::activate_out ()
 {
-    zmq_assert (false);
+    drop_subscriptions ();
 }
 
 void zmq::pgm_receiver_t::activate_in ()
@@ -255,5 +258,12 @@ void zmq::pgm_receiver_t::timer_event (int token)
     in_event ();
 }
 
+void zmq::pgm_receiver_t::drop_subscriptions ()
+{
+    msg_t msg;
+    while (sink->read (&msg))
+        msg.close ();
+}
+
 #endif
 
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 825e0c1..aa010dd 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -64,6 +64,10 @@ namespace zmq
 
     private:
 
+        //  PGM is not able to move subscriptions upstream. Thus, drop all
+        //  the pending subscriptions.
+        void drop_subscriptions ();
+
         //  RX timeout timer ID.
         enum {rx_timer_id = 0xa1};
 
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 314a0b4..9b1e215 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -88,6 +88,15 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
 
     //  Set POLLOUT for downlink_socket_handle.
     set_pollout (handle);
+
+    //  PGM is not able to pass subscriptions upstream, thus we have no idea
+    //  what messages are peers interested in. Because of that we have to
+    //  subscribe for all the messages.
+    msg_t msg;
+    msg.init ();
+    bool ok = sink_->write (&msg);
+    zmq_assert (ok);
+    sink_->flush ();
 }
 
 void zmq::pgm_sender_t::unplug ()
-- 
1.7.0.4

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to