Fri Dec 27 2024
14 min readA tale of making Eclipse Paho work on a production IoT project.
Disclaimer: The article was written in October 2021, but was only published in December 2024. Therefore some of the information might be outdated.
MQTT is a popular choice of protocol, especially for IoT systems. It is a perfect for connecting constraint embedded devices to the cloud because of its light footprint, publish-subscribe model, bi-directional connection, and support to secure connection with TLS.
In general, an embedded device connects to an MQTT broker to publish data - mostly sensor readings and subscribes to a topic to listen for commands from the cloud. The user’s web or app client can send these commands directly to the broker, or the server (on behalf of the user) could send it. This post covers the latter method in detail; it is an essential piece of the system, especially when you want non-MQTT clients (like OAuth application) to access the IoT-enabled device.
Since our cloud application is in Python, all the implementation details mentioned here are for it, but the same approaches could be taken in other languages/stacks. Also, this post is limited to MQTT v3.1 and v3.1.1.
As mentioned above, we had a similar setup in our IoT project - devices connecting to MQTT broker and users (via mobile apps) connecting to our web server. We wanted to send “commands” to the devices from the web server. The commands could either - be fire-and-forget or request-response types, executed in-process (of web request) or by the background workers, sent to one or multiple devices at a time.
Considering those use-cases, we were able to jot down the core utilities required for it:
After that, we started looking for an MQTT client to connect the webserver to the broker. We decided to go with paho-mqtt - the python module part of the Eclipse Paho project, which also maintains MQTT libraries for other popular languages. The module is stable, feature-rich as well as well designed. However, it took us some time and iteration to work with it.
This article goes through that journey of making MQTT work with our web server like a charm - it puts forward various iterations, learnings, and know-hows we had during a year with this project.
I write this post as a series of actions, events, and decisions to give a complete picture. If you want some code snippets to work with, you can find the link to the code snippet below.
We thought this would be easy - as paho mostly did the heavy lifting and had the blocking APIs we needed(well, almost, you will see!).
Publish function was simple. It was a “wrapper” around the paho.mqtt.publish.single
function.
Similarly, we found paho.mqtt.subscribe.simple
, but this one was not enough. It did not have any “timeout”, meaning it will block forever if no message on that topic is received. We decided to copy the code of it and add a check for timeout (the bold part below). That did the job.
We naively thought that combining the above two functions would do the job for request-response. But that did not work very well, as client instances were created and connected twice, making it really slow and practically useless - as most of the time, our devices will send responses before the server even subscribes.
So instead of using the publish_to_mqtt
function, we modified the above subscribe_to_mqtt
such that the “publish arguments” are passed as userdata
. When the client is connected, it publishes this data after subscribing. This optimization made sure that the publish happens in the same connection as subscribe, and also subscription is done before publishing so that the response is not lost.
You can find the full code for this under mqtt_v1.py
file in the code snippet.
And that’s it. We were up and running as it passed the basic tests, and that was enough until our users started complaining about latency - more about that in the next section.
nce after rolling out the system in beta with about 20–25 devices and its users, we got some feedback from them. They were experiencing noticeable latency (more than 3 seconds) when running commands to control their devices.
After doing some basic profiling, it was clear that the bulk of the latency is because of the TLS handshake and connection establishment with the broker. Each time the sever interacts with the broker, it has to establish a new TCP+TLS connection. We decided to optimize this in the same way Database connections in web servers usually do — persisiting the connection and re-using it. Since most webservers (at least the WSGI once) re-use the processes created for serving web requests, it is possible to have an MQTT connection that can outlive the HTTP request-response cycle.
To accomplish this, we decided to use a Singleton class that holds the client and its connection. This class would then expose those utilities and under the hood use the persisted connection. Most of the work was to re-organize the existing code from the previous version to support this pattern. The singleton class (we called it MQTTAgent
) looked something like the below.
You can find the full code for this under mqtt_v2.py
file in the code snippet.
The MQTT utility is now clean and elegant to use as well as efficient. The latency improved by a factor of 10 (check the demo code output above). Users noticed the difference and appreciated it. A win for the day!
When we got the requirement to make concurrent command calls to multiple devices, we had two options — execute them in various threads or use a distributed task queue system (we already had a Celery setup with our server).
The former looked like the apt solution but would require us to make the v2 solution “thread-safe”, as mqtt_agent.subscribe
(and therefore mqtt_agent.request_response
) was not designed keeping concurrency in mind. Instead of doing that, we lazily went with the “task queue” approach. It would schedule the concurrent command calls and hands them to distributed background workers for execution. It did the job, but because the tasks were “queued” and “distributed-ly executed”, the performance was not consistent. It would take more than expected time — when the queue is long, or workers are busy. It often happened as the number of devices per user increased. It looked like we had no other option but to make MQTTAgent
thread-safe.
In v2, the userdata
(set as an attribute of MQTTClient
and passed to on_message
handler) keeps the “subscription state”; the key to making MQTTAgent
thread-safe is to make it thread-safe. The idea is to have a “store” of all “subscription states”, and when a message is received, it checks the store, and if any “subscription” had requested to receive messages from that topic, then we add this message to that “subscription state”. We also need to handle subscriptions to the topics smartly — if two thread subscribes to the same topic then internally it should only subscribe once and give the same message to both.
Firstly, we created _MQTTSubscription
class for storing “subscription state” instead of using dict
. Also moved the logic of adding new messages inside the class.
After that, we added the “subscription store” and “topic reference count” attributes to MQTTAgent
. The “subscription store” is a set of all subscription instances that are waiting for messages. The “topic reference count” is a map of the topic and the number of subscription instances subscribed to it.
When a new “subscription” is requested, a subscription
instance is created and added to _subscriptions
set, “topic reference count” is incremented for those topics, the _client
subscribes to the topics that are not already subscribed (with reference count as 1). The reverse happens when the subscription needs to be “closed or unsubscribed”, the reference count is decremented, and the _client
unsubscribes to the topics that are not referenced anymore (with reference count as 0). In __on_message
callback, we check all _subscriptions
and call subscription.add_message
for the once where the subscription topic and message topic match.
You can find the full code for this under mqtt_v3.py
file in the code snippet.
It was now possible to call MQTTAgent
functionalities from multiple threads safely. We started calling commands to multiple-device concurrently with the help of Python ThreadPoolExecutor
(as shown in the above demo code) instead of distributing it to background workers. It has not just made it faster and efficient, but we could now scale it to make simultaneous calls to a higher number of devices (tested with 100 devices at a time).
We have some ideas that could take the utility to the next level. Some of them are as follows.
The current implementation is enough for our use cases, so we might not work on it. But if we get good responses to this post, we could think of working on it and releasing it as an open-source python package. While Eclipse Paho is solid and stable, it requires an “elegance layer” on top of it to make it work like a charm, like how the requests
package does it for urllib3
. This package could exactly be that — MQTT for Humans™!