Hi there, I hope this e-mail is still on time for you. I have implemented this architecture a few times, and they all work fine nowadays. However, your question made me review it and create a small gist.
I suggest you create a thread for every output and input connection. This makes it easier to focus on reading or writing inside a given object. For example, in your project, I would separate it into IotReader, IotWriter, MqttReader, and MqttWriter. Another thing I do to avoid manipulating multiple locks, semaphores, and so on is to create a central event loop. For every event that comes from IotReader or MqttReader, I would pack it into an event and send it to a central thread. This is the main gateway thread, and I would call it Gateway. I don't know if you have ever programmed in Android. But the Android framework uses a similar approach to processing data. Whenever you need to process something, you start a new 'Thread', and when you need to present the result in the interface you dispatch events until the main thread is notified and updates the corresponding Views. The point here is: never do any extensive processing in the main thread as it is going delay. You will probably not do it now, but if you ever need it, make a pool of workers to process this and keep the Gateway free. Replace the Threads with multiprocessing.Process, as well, as Python lacks true multithreading. Regarding thread/process communication, I like to implement this using Queues. The Gateway class would have a main_queue to receive events from IotReader and MqttReader. IotWriter and MqttWriter have a particular queue as well. Whenever the Gateway needs to send something to either of them, it just needs to reference their respective queues, which I wrap inside a method, for simplicity. Another benefit of this architecture is the ability to scale to more connections easily. In the past, I have used this strategy to schedule tasks for up to about 20 devices (each with an input and output thread). I believe it could go higher, but I haven't needed to. There are fully distributed architectures more suitable for hundreds and thousands of connections, but this is likely not what you need now. The following is a possible implementation for the IotReader. You need to replace the AnySerialReader class and its read method with the initialization of your own Bus wrapper. The read method must have a timeout parameter if you want to cancel the operation properly. The terminate method is used to terminate the program properly. *class IotReader(Thread): def __init__(self, queue_master, name='IotReader'): super().__init__() self.queue_master = queue_master self.queue = Queue() self.done = False self.name <http://self.name> = name self.start() def terminate(self): self.done = True def run(self): log.info <http://log.info>(f"Starting thread for {self.name <http://self.name>}") serial_reader = AnySerialReader('Serial' + self.name <http://self.name>) log.info <http://log.info>(f"Serial reader for {self.name <http://self.name>} initialized") while not self.done: try: data = serial_reader.read(timeout=1) if data is None: continue self.queue_master.put(('on_iot_event', data)) except: traceback.print_exc(file=sys.stdout) log.warning("Terminating IotReader") serial_reader.terminate()* The following is a possible implementation for IotWriter. It adds a method named send that adds new tasks to the queue. The main loop, running inside the thread, waits for these events and calls write in AnySerialWriter. This may be a slow operation, the connection may be down, and we need to reconnect, etc. This is why we need a thread for the output message as well. *class IotWriter(Thread): def __init__(self, name='IotWriter'): super().__init__() self.queue = Queue() self.done = False self.name <http://self.name> = name self.start() def terminate(self): self.done = True self.queue.put( ('terminate', None) ) def send(self, data): self.queue.put( ('write_message', data) ) def run(self): log.info <http://log.info>(f"Starting thread for {self.name <http://self.name>}") serial_writer = AnySerialWriter('Serial' + self.name <http://self.name>) log.info <http://log.info>(f"Serial writer for {self.name <http://self.name>} initialized") while not self.done: try: action, data = self.queue.get() if action == 'terminate': break elif action == 'write_message': serial_writer.write(data) else: log.error(f'Unknown action for IotWriter - action={action}, data={data}') except: traceback.print_exc(file=sys.stdout) log.warning("Terminating IotWriter") serial_writer.terminate()* I do not include the source for a MqttReceiver and MqttWriter as they are very similar, in structure, to IotWriter and MqttWriter. The code below is for the Gateway class. It initializes all readers and writers. Following that, it will wait for input messages and process them adequately. You could add as many events as you need. I used two to illustrate. *class Gateway(Thread): def __init__(self): super().__init__() self.queue_master = Queue() self.done = False self.start() def run(self): log.info <http://log.info>("Starting Gateway") while not self.done: try: self.iot_reader = IotReader(self.queue_master) self.iot_writer = IotWriter() self.mqtt_reader = MqttReader(self.queue_master) self.mqtt_writer = MqttWriter() log.info <http://log.info>(f"Starting {self.__class__.__name__}") while not self.done: try: action, data = self.queue_master.get() if action in 'on_mqtt_event': self.on_mqtt_event(data) elif action == 'on_iot_event': self.on_iot_event(data) elif action == 'terminate': break else: log.error(f'Unknown action, action={action}, data={data}') except: log.error("Error during message parsing") traceback.print_exc(file=sys.stdout) except: log.error("Error during gateway configuration") traceback.print_exc(file=sys.stdout) self.iot_reader.terminate() self.iot_writer.terminate() self.mqtt_reader.terminate() self.mqtt_writer.terminate() self.iot_reader.join() self.iot_writer.join() self.mqtt_reader.join() self.mqtt_writer.join() log.warning('Terminating Gateway') def terminate(self): self.done = True self.queue_master.put(('terminate', None)) def on_iot_event(self, data): log.info <http://log.info>(f'Event from iot device, forwarding to mqtt, data={data}') self.mqtt_writer.send(data) def on_mqtt_event(self, data): log.info <http://log.info>(f'Event from iot device, forwarding to iot, data={data}') self.iot_writer.send(data)* I started the Gateway using the code below. It calls the method terminate when I press Ctrl+C. This event could also come from the MQTT server or anywhere else. *gateway = Gateway()# Your main thread is free here. You could start a webserver and display # a dashboard. Or wait, like below.try: gateway.join()except KeyboardInterrupt: log.info <http://log.info>("Sending terminate command...") gateway.terminate()try: gateway.join()except KeyboardInterrupt: log.info <http://log.info>("Killing the app...") sys.exit(0) passlog.info <http://log.info>("Bye!")* If you want to check the full code, a small gist in the link below: *https://gist.github.com/diegofps/87945a0c3e800c747f3af07833ff6b7e <https://gist.github.com/diegofps/87945a0c3e800c747f3af07833ff6b7e>* You also mentioned discovering the device status and sending it back through MQTT. I can see two approaches to this. The first approach is to cache the status emitted from the device. This is fine if the data is small enough to keep in the gateway memory. Then, I would send it back through MQTT immediately. The second approach is to forward the request to the device. The device will later respond to your query with the original question and response. You likely need the question as you need to remember what you need to do with it now. It is stateless. You could also mix these two approaches and cache the state for a certain amount of time. After that, it would expire, and you would ask the device again. This is an overview of how I implement it nowadays. I am sure other people may have different strategies and ideas to improve it. Best, Diego On Fri, Apr 28, 2023 at 1:10 PM pozz <pozzu...@gmail.com> wrote: > I need to develop a Python application that is a sort of gateway between > to networks: a "serial bus" network (think of a serial port or a USB > connection) and a MQTT connection (in my case, it's AWS IoT service). > > On the bus, a proprietary protocol is implemented. From the bus, the app > knows the status of the system (think of a simple ON/OFF status). > The user can retrieve the status of the system through MQTT: it sends a > message to read/status MQTT topic and receive back a message with the > current status on read/status/reply MQTT topic. Of course, they are just > examples. > > On the contrary, when the app detects a status change reported from the > serial bus (think of a transition from ON to OFF), it sends a MQTT message. > > I'm thinking to split the application in three classes: Bus, App and > IoT. Bus and IoT are also threads. > The thread of Bus manages the serial protocol, while the thread of IoT > manages MQTT connection with the broker (the server). > > However I don't know if it could be a good architecture. Suppone Bus > thread receives a new status from the system. In the context of > ThreadBus, the object Bus could call a method of App object: > > app.set_status(new_status_from_the_bus) > > In the App I have: > > class App(): > .. > def set_status(new_status): # Could be called from ThreadBus > if new_status != self.new_status: > self.new_status = new_status > # Do some actions on status change > > def get_status(): # Could be called from ThreadIoT > return self.status > > Of course, IoT object needs to know the current status of the system > when a message is received from MQTT. So ThreadIoT could call > app.get_status(). > > I think this architecture has some problems with race conditions or > threads synchronization. What happens if IoT calls get_status() exactly > when set_status() called by ThreadBus is executing? If status is a big > data structure, set_status() could be interrupted by get_status() that > could get a completely corrupted status, because it was only partly > updated by set_status(). > > I know I can use locks or semaphores in get_status() and set_status(), > but I don't know if this is a good approach. Consider that the system is > complex, it isn't composed by a simple single status. It has many many > parameters that are collected from the serial bus. Should I use a lock > for every [get|set]_status(), [get|set]_dimensions(), > [get|set]_battery_level(), [get|set]_mains_present(), and so on? > > > Another possibility is to use a Queue for Bus and a Queue for IoT. So > the set_status(new_status) called from Bus object will be transformed in > a put in the queue: > > app_queue.put({"type": "new_status", "data": ...}) > > However how could be transformed the get_status() from IoT? How the > return value (the current status) is real retrieved? > > class IoT(): > .. > def status_request_from_MQTT(): > app_queue.put({"type": "get_status"}) > # How to get the status to return? > return current_status > > Should the app put the status on the same queue and should IoT waits for > a new message in the Queue? > > def status_request_from_MQTT(): > app_queue.put({"type": "get_status"}) > try: > current_status = app_queue.get(timeout=10) > except Empty: > # What to do? > return current_status > > > Again another approach is to avoid multi-threading at all and create a > single "main loop" function that waits at the same time for incoming > events on the serial bus and MQTT (how?). I don't know if this could be > done in my case, because I'm using awscrt Python module and it works > through callbacks that I think is called from another thread. > > > Any suggestions on this architecture? > -- > https://mail.python.org/mailman/listinfo/python-list > -- Diego Souza Wespa Intelligent Systems Rio de Janeiro - Brasil -- https://mail.python.org/mailman/listinfo/python-list