Go Back

Fri Dec 27 2024 14 min read

MQTT with Python Web Server like a Charm

A tale of making Eclipse Paho work on a production IoT project.

Python snake with a postal message envelope in its

Disclaimer: The article was written in October 2021, but was only published in December 2024. Therefore some of the information might be outdated.


MQTT is a popular choice of protocol, especially for IoT systems. It is a perfect for connecting constraint embedded devices to the cloud because of its light footprint, publish-subscribe model, bi-directional connection, and support to secure connection with TLS.

In general, an embedded device connects to an MQTT broker to publish data - mostly sensor readings and subscribes to a topic to listen for commands from the cloud. The user’s web or app client can send these commands directly to the broker, or the server (on behalf of the user) could send it. This post covers the latter method in detail; it is an essential piece of the system, especially when you want non-MQTT clients (like OAuth application) to access the IoT-enabled device.

┌──────────────────────────────┐
│ CLOUD │
┌──────┐ │ ┌──────┐ ┌────────────┐ │
│DEVICE│◄──┼─►│ │ │ │ │
│ A │ │ │ │ │ │ │
└──────┘ │ │ │ │ │ │
┌──────┐ │ │ │ │ │ │
│DEVICE│◄──┼─►│ MQTT |◄───►| WEB SERVER │ │
│ B │ │ │BROKER│ │ │ │
└──────┘ │ │ │ │ │ │
┌──────┐ │ │ │ │ │ │
│DEVICE│◄──┼─►│ │ │ │ │
│ C │ │ │ │ │ │ │
└──────┘ │ └───▲──┘ └──────▲─────┘ │
└──────┼───────────────┼───────┘
│ │
┌───▼───────────────▼───┐
│ USER │
├──────┬──────┬─────────┤
│MOBILE│ WEB │3rd PARTY│
│CLIENT│CLIENT│OAUTH APP│
└──────┴──────┴─────────┘

Since our cloud application is in Python, all the implementation details mentioned here are for it, but the same approaches could be taken in other languages/stacks. Also, this post is limited to MQTT v3.1 and v3.1.1.

A little background

As mentioned above, we had a similar setup in our IoT project - devices connecting to MQTT broker and users (via mobile apps) connecting to our web server. We wanted to send “commands” to the devices from the web server. The commands could either - be fire-and-forget or request-response types, executed in-process (of web request) or by the background workers, sent to one or multiple devices at a time.

Considering those use-cases, we were able to jot down the core utilities required for it:

  • a simple blocking function to publish any message
  • a simple blocking function to subscribe to topics, wait for n number of messages or until timeout, returning the messages received
  • a simple blocking function to achieve request-response kind of functionality - could be a wrapper of the above two

After that, we started looking for an MQTT client to connect the webserver to the broker. We decided to go with paho-mqtt - the python module part of the Eclipse Paho project, which also maintains MQTT libraries for other popular languages. The module is stable, feature-rich as well as well designed. However, it took us some time and iteration to work with it.

This article goes through that journey of making MQTT work with our web server like a charm - it puts forward various iterations, learnings, and know-hows we had during a year with this project.

I write this post as a series of actions, events, and decisions to give a complete picture. If you want some code snippets to work with, you can find the link to the code snippet below.

MQTT helper functions and classes in Python [Snippets]
GitLab
gitlab.com

[v1] Simple wrapper functions

We thought this would be easy - as paho mostly did the heavy lifting and had the blocking APIs we needed(well, almost, you will see!).

Publish function was simple. It was a “wrapper” around the paho.mqtt.publish.single function.

def publish_to_mqtt(
topic: str, payload: str, qos: int = 0, retain: bool = False
) -> MQTTMessageInfo:
return publish.single(
...
)

Similarly, we found paho.mqtt.subscribe.simple, but this one was not enough. It did not have any “timeout”, meaning it will block forever if no message on that topic is received. We decided to copy the code of it and add a check for timeout (the bold part below). That did the job.

def subscribe_to_mqtt(
topics: Union[str, List[str]],
qos: int = 0,
msg_count: int = 1,
retained: bool = False,
timeout: int = 5,
userdata: dict = None
) -> Union[MQTTMessage, List[MQTTMessage]]:
"""
modified paho.mqtt.subscribe.simple() method to support timeout
"""
# callback for on_connect
def _on_connect(..., userdata: dict, ...):
# raise Exception if did not connect
...
# subscribe to topics in userdata
...
# callback for on_message
def _on_message_simple(..., userdata: dict, message: MQTTMessage):
# ignore if msg_count already 0
...
# ignore if message is "retained" and user does not want it
...
# decrement msg_count and set/add it to messages in userdata
...
# validate msg_count and qos
...
# Set ourselves up to return a single message if msg_count == 1, or a list if > 1
...
# make userdata
...
# create client instance and set callbacks and user credentials
...
# connect to broker, mark start_time and start "threaded network loop" with client
...
# wait till all message received or timeout
while True:
should_timeout = time.time() > start_time + timeout
if userdata["msg_count"] == 0 or should_timeout:
client.loop_stop()
client.disconnect()
break
time.sleep(0.2)
# raise error if no or less messages found
messages = userdata["messages"]
if not messages or len(messages) < msg_count:
raise TimeoutError(...)
return messages

We naively thought that combining the above two functions would do the job for request-response. But that did not work very well, as client instances were created and connected twice, making it really slow and practically useless - as most of the time, our devices will send responses before the server even subscribes.

So instead of using the publish_to_mqtt function, we modified the above subscribe_to_mqtt such that the “publish arguments” are passed as userdata. When the client is connected, it publishes this data after subscribing. This optimization made sure that the publish happens in the same connection as subscribe, and also subscription is done before publishing so that the response is not lost.

def subscribe_to_mqtt(...):
...
# callback for on_connect
def _on_connect(client: MQTTClient, userdata: dict, flags: int, rc: int):
...
# subscribe to topics in userdata
...
# publish data based on pub_on_connect in userdata
if "pub_on_connect" in userdata:
...
def request_response_to_mqtt(
topic: str,
payload: str,
response_topic: str,
qos: int = 0,
response_msg_count: int = 1,
response_timeout: int = 5,
retained: bool = False
) -> Union[MQTTMessage, List[MQTTMessage]]:
# create userdata with `pub_on_connect`
userdata = {
"pub_on_connect": {
"topic": topic, "payload": payload, "qos": qos
}
}
# subscribe with created userdata and
# wait for messages on response_topic
return subscribe_to_mqtt(
userdata=userdata,
...
)

You can find the full code for this under mqtt_v1.py file in the code snippet.

>>> # simple demo
>>> for i in range(5):
... start=time.time(); rpc(device, "RPC.Ping"); end=time.time()
... print(f"[{i}] Time taken: {end - start}")
...
RPC: RPC.Ping responses: b'{"result": "pong"}'
[0] Time taken: 1.220142126083374
RPC: RPC.Ping responses: b'{"result": "pong"}'
[1] Time taken: 0.8168301582336426
RPC: RPC.Ping responses: b'{"result": "pong"}'
[2] Time taken: 1.0094208717346191
RPC: RPC.Ping responses: b'{"result": "pong"}'
[3] Time taken: 1.0069520473480225
RPC: RPC.Ping responses: b'{"result": "pong"}'
[4] Time taken: 1.003695011138916

And that’s it. We were up and running as it passed the basic tests, and that was enough until our users started complaining about latency - more about that in the next section.

[v2] Singleton that re-uses the connection

nce after rolling out the system in beta with about 20–25 devices and its users, we got some feedback from them. They were experiencing noticeable latency (more than 3 seconds) when running commands to control their devices.

After doing some basic profiling, it was clear that the bulk of the latency is because of the TLS handshake and connection establishment with the broker. Each time the sever interacts with the broker, it has to establish a new TCP+TLS connection. We decided to optimize this in the same way Database connections in web servers usually do — persisiting the connection and re-using it. Since most webservers (at least the WSGI once) re-use the processes created for serving web requests, it is possible to have an MQTT connection that can outlive the HTTP request-response cycle.

To accomplish this, we decided to use a Singleton class that holds the client and its connection. This class would then expose those utilities and under the hood use the persisted connection. Most of the work was to re-organize the existing code from the previous version to support this pattern. The singleton class (we called it MQTTAgent) looked something like the below.

class MQTTAgent:
_instance: "MQTTAgent" = None
_client: Optional[MQTTClient] = None
@staticmethod
def get_instance() -> "MQTTAgent":
""" Static access method. """
...
def __init__(self):
""" Virtually private constructor. """
...
def clean_up(self):
""" Cleanup network connection and client. """
...
def _connect_async(self):
""" Setup client and connection asynchronously. """
...
def connect(self, timeout: int = 5):
""" Connect synchronously with timeout. """
...
def disconnect(self):
""" Disconnect from broker synchronously. """
...
def __on_connect(self,
client: MQTTClient,
userdata: dict,
flags: int,
rc: int):
""" Callback called when client connects. """
...
def __on_message(self,
client: MQTTClient,
userdata: dict,
message: MQTTMessage):
""" Callback called when message received. """
...
def is_connected(self) -> bool:
""" Check if connected to broker. """
...
def publish(self,
topic: str,
payload: str,
qos: int = 1,
retain: bool = False) -> MQTTMessageInfo:
""" Publish message synchronously. """
...
def subscribe(self,
topics: _Topics,
qos: int = 1,
retained: bool = False,
msg_count: int = 1,
timeout: int = 5) -> _Messages:
...
def request_response(self,
topic: str,
payload: str,
response_topics: _Topics,
qos: int = 1,
response_msg_count: int = 1,
response_timeout: int = 5,
retained=False) -> _Messages:
...
# convenient global instance that could be
# directly imported and used.
mqtt_agent = MQTTAgent.get_instance()

You can find the full code for this under mqtt_v2.py file in the code snippet.

>>> # simple demo
>>> for i in range(5):
... start=time.time(); device.rpc("RPC.Ping"); end=time.time()
... print(f"[{i}] Time taken: {end - start}")
...
Connection Accepted.[0] mqtt client connected to mqtt-broker:1883.
RPC: RPC.Ping responses: b'{"result": "pong"}'
[0] Time taken: 1.0129196643829346
RPC: RPC.Ping responses: b'{"result": "pong"}'
[1] Time taken: 0.10076355934143066
RPC: RPC.Ping responses: b'{"result": "pong"}'
[2] Time taken: 0.10155487060546875
RPC: RPC.Ping responses: b'{"result": "pong"}'
[3] Time taken: 0.10111713409423828
RPC: RPC.Ping responses: b'{"result": "pong"}'
[4] Time taken: 0.1007685661315918

The MQTT utility is now clean and elegant to use as well as efficient. The latency improved by a factor of 10 (check the demo code output above). Users noticed the difference and appreciated it. A win for the day!

[v3] Making it thread-safe

When we got the requirement to make concurrent command calls to multiple devices, we had two options — execute them in various threads or use a distributed task queue system (we already had a Celery setup with our server).

The former looked like the apt solution but would require us to make the v2 solution “thread-safe”, as mqtt_agent.subscribe (and therefore mqtt_agent.request_response) was not designed keeping concurrency in mind. Instead of doing that, we lazily went with the “task queue” approach. It would schedule the concurrent command calls and hands them to distributed background workers for execution. It did the job, but because the tasks were “queued” and “distributed-ly executed”, the performance was not consistent. It would take more than expected time — when the queue is long, or workers are busy. It often happened as the number of devices per user increased. It looked like we had no other option but to make MQTTAgent thread-safe.

In v2, the userdata (set as an attribute of MQTTClient and passed to on_message handler) keeps the “subscription state”; the key to making MQTTAgent thread-safe is to make it thread-safe. The idea is to have a “store” of all “subscription states”, and when a message is received, it checks the store, and if any “subscription” had requested to receive messages from that topic, then we add this message to that “subscription state”. We also need to handle subscriptions to the topics smartly — if two thread subscribes to the same topic then internally it should only subscribe once and give the same message to both.

Firstly, we created _MQTTSubscription class for storing “subscription state” instead of using dict. Also moved the logic of adding new messages inside the class.

class _MQTTSubscription:
def __init__(self,
topics: _Topics,
msg_count: int = 1,
qos: int = 1,
retained: bool = False):
self.thread_id: int = current_thread().ident
self.topics: List[str] = topics if isinstance(topics, list) else [topics]
self.messages: _Messages = list() if msg_count > 1 else None
self.pending_msg_count: int = msg_count
self._msg_count: int = msg_count
self.qos: int = qos
self.retained: bool = retained
def add_message(self, message: MQTTMessage):
""" Handle new message for this subscription. """
# ignore - pending_msg_count <= 0 or message is retained but the request is not for retained
...
# decrement pending_msg_count and add it to messages
...

After that, we added the “subscription store” and “topic reference count” attributes to MQTTAgent. The “subscription store” is a set of all subscription instances that are waiting for messages. The “topic reference count” is a map of the topic and the number of subscription instances subscribed to it.

class MQTTAgent:
...
_subscriptions: Set[_MQTTSubscription] = set()
_topics_count: Dict[str, int] = dict()
...
def _disconnect(self)
...
# reset _subscriptions, _topic_count
self._subscriptions = set()
self._topics_count = dict()
...

When a new “subscription” is requested, a subscription instance is created and added to _subscriptions set, “topic reference count” is incremented for those topics, the _client subscribes to the topics that are not already subscribed (with reference count as 1). The reverse happens when the subscription needs to be “closed or unsubscribed”, the reference count is decremented, and the _client unsubscribes to the topics that are not referenced anymore (with reference count as 0). In __on_message callback, we check all _subscriptions and call subscription.add_message for the once where the subscription topic and message topic match.

class MQTTAgent:
...
def __on_message(self, ...):
...
# iterate through subscriptions and find any that has this topic
for subscription in tuple(self._subscriptions):
...
# check if any topic matches for this subscription
...
# add message to subscription
subscription.add_message(message=message)
...
...
def _subscribe(self, subscription: _MQTTSubscription):
""" Subscribe based on subscription. """
# put it in _subscriptions
...
# increment topic count - make list of topics to subscribe (once that are not yet subscribed)
...
# subscribe to those topics
...
def _unsubscribe(self, subscription: _MQTTSubscription):
""" Unsubscribe based on subscription. """
# pop subscription from _subscriptions
...
# decrement topic count - make list of topics to unsubscribe (once that are not needed)
...
# unsubscribe to those topics
...
def subscribe(self, ...) -> _Messages:
...
# connect to client
...
# validate msg_count and qos
...
# make subscription object
subscription = _MQTTSubscription(
topics=topics,
msg_count=msg_count,
qos=qos,
retained=retained
)
# subscribe
self._subscribe(subscription=subscription)
# wait till all message received or timeout
start_time = time.time()
while True:
should_timeout = time.time() > start_time + timeout
if subscription.pending_msg_count==0 or should_timeout:
# unsubscribe
self._unsubscribe(subscription=subscription)
break
# sleep
time.sleep(0.1)
return subscription.messages
...

You can find the full code for this under mqtt_v3.py file in the code snippet.

>>> # simple demo
>>> print(f"Calling RPC to {len(devices)} device concurrently.")
Calling RPC to 5 device concurrently.
>>> with ThreadPoolExecutor(len(devices)) as executor:
... start = time.time()
... # make sure mqtt_agent is connected
... mqtt_agent.connect()
... tuple(executor.map(
... lambda device: rpc(device, "RPC.Ping"),
... devices
... ))
... end = time.time()
... print(f"Total time taken: {end - start}")
...
Connection Accepted.[0] mqtt client connected to mqtt-broker:1883.
RPC: RPC.Ping responses: b'{"result": "pong"}'
[140217424803584] Time taken: 0.14078974723815918
RPC: RPC.Ping responses: b'{"result": "pong"}'
[140217441588992] Time taken: 0.1967012882232666
RPC: RPC.Ping responses: b'{"result": "pong"}'
[140217433196288] Time taken: 0.19601941108703613
RPC: RPC.Ping responses: b'{"result": "pong"}'
[140217416410880] Time taken: 0.19498801231384277
RPC: RPC.Ping responses: b'{"result": "pong"}'
[140217449981696] Time taken: 0.6972782611846924
Total time taken: 1.314225673675537

It was now possible to call MQTTAgent functionalities from multiple threads safely. We started calling commands to multiple-device concurrently with the help of Python ThreadPoolExecutor (as shown in the above demo code) instead of distributing it to background workers. It has not just made it faster and efficient, but we could now scale it to make simultaneous calls to a higher number of devices (tested with 100 devices at a time).


What next?

We have some ideas that could take the utility to the next level. Some of them are as follows.

  • “Iter/Stream” like API for getting messages on subscription as they are received. Currently, all messages are “collected” and then returned.
  • “Future/Promise” like API for having both async and sync options at the user level. Currently, all functionality is blocking.
  • Multiple Broker support.

The current implementation is enough for our use cases, so we might not work on it. But if we get good responses to this post, we could think of working on it and releasing it as an open-source python package. While Eclipse Paho is solid and stable, it requires an “elegance layer” on top of it to make it work like a charm, like how the requests package does it for urllib3. This package could exactly be that — MQTT for Humans™!