I've pushed recoco.consumer to the carp branch which has a couple generic
consumer classes. Using those, you can write a simple component like:
from pox.core import core
from pox.lib.recoco.consumer import BaseConsumer
import pox.openflow.libopenflow_01 as of
class MyConsumer (BaseConsumer):
def _do_work (self, work):
# Work is (dpid,packet_in)
dpid,packet_in = work
po = of.ofp_packet_out(data = packet_in)
po.actions.append(of.ofp_action_output(port = of.OFPP_FLOOD))
connection = core.openflow.getConnection(dpid)
if connection is None:
self.log.warn("Connection lost before sending packet")
return
connection.send(po)
def handle_PacketIn (event):
consumer.add_work((event.dpid,event.ofp))
def launch ():
global consumer
consumer = MyConsumer()
core.openflow.addListenerByName("PacketIn", handle_PacketIn)
Here, the normal OpenFlow event Task is acting like a producer, pushing work
items (which in this case are a DPID and a packet_in) to a simple consumer
which just floods the packet back out. So it's a dumb hub, but written
producer-consumer style. BaseConsumer's initializer has some parameters for
maximum batch size and Task priority.
More comments below.
On Mar 23, 2013, at 3:59 PM, Tmusic wrote:
> I'm still a bit confused about the work producing and consuming as it's
> implemented now. So there is the main task loop in OpenFlow_01_Task which
> loops over the connections and calls the read function on each of them. This
> read functions calls the appropriate handler which in its turn fires the
> appropriate event on the pox core (which are then further handled). So
> everything would be processed connection by connection...
>
> But if I understand you correctly, the handlers called by the read function
> put the jobs in a queue, which is than emptied by a separate task loop (which
> I can't find at the moment). Can you give a hint where (in the code) the task
> loop runs that empties the queue and where the filling of the queue exactly
> happens?
Ah, we have miscommunicated. There's no queue in general. The OpenFlow events
are entirely (or almost entirely?) raised by a single Task with a select loop
(OpenFlow_01_Task or whatever). It raises them directly.
The miscommunication I believe stems from me saying, "The OpenFlow event
handlers are producers that fill a work queue and then you have a consumer in
the form of a recoco Task that tries to drain the queue." I wasn't describing
how it works now. I was describing the solution to your problem.
The example above *does* implement this. The idea being rather than do
expensive processing directly in the handlers, you're better off handling them
quickly by just shoving them off at a work queue which can try to handle them
later (and perhaps with more flexible priorities). The new
BaseConsumer/FlexConsumer classes are meant to simplify this pattern. (They're
based on the producer/consumer example I posed a few days ago, but now it's
generic.)
-- Murphy