User guide
Installing
To install the library without any extras:
pip install mqttproto
If you need websockets connectivity, you will want to include the websockets extra:
pip install mqttproto[websockets]
Using the async client
The asynchronous client needs to be used as an asynchronous context manager to manage its life cycle, as per the principles of structured concurrency. Exiting the context manager will clean up any resources used, like terminating active connections any active subscriptions and cancelling pending publish operations.
Connecting to the broker
This connects to localhost:1883:
from __future__ import annotations
import asyncio
from mqttproto.async_client import AsyncMQTTClient
async def main() -> None:
async with AsyncMQTTClient(host_or_path="localhost") as client:
...
asyncio.run(main())
This connects to a UNIX domain socket at /path/to/broker.sock:
from __future__ import annotations
import asyncio
from mqttproto.async_client import AsyncMQTTClient
async def main() -> None:
async with AsyncMQTTClient(
host_or_path="/path/to/broker.sock", transport="unix"
) as client:
...
asyncio.run(main())
This connects to http://localhost/ws:
from __future__ import annotations
import asyncio
from mqttproto.async_client import AsyncMQTTClient
async def main() -> None:
async with AsyncMQTTClient(
host_or_path="localhost", websocket_path="/ws"
) as client:
...
asyncio.run(main())
When you need to use TLS (formerly SSL) for securing your connections, you can simply
pass ssl=True to AsyncMQTTClient, or you can provide your own
SSLContext if you need to provide a client certificate or add a custom
root certificate.
Note
Enabling TLS changes the default ports as follows:
raw MQTT: default port changes to 8883
websockets: default port changes to 443
This connects to localhost:8883:
from __future__ import annotations
import asyncio
from mqttproto.async_client import AsyncMQTTClient
async def main() -> None:
async with AsyncMQTTClient(host_or_path="localhost", ssl=True) as client:
...
asyncio.run(main())
This connects to a UNIX domain socket at /path/to/broker.sock:
from __future__ import annotations
import asyncio
from mqttproto.async_client import AsyncMQTTClient
async def main() -> None:
async with AsyncMQTTClient(
host_or_path="/path/to/broker.sock", transport="unix", ssl=True
) as client:
...
asyncio.run(main())
This connects to https://localhost/ws:
from __future__ import annotations
import asyncio
from mqttproto.async_client import AsyncMQTTClient
async def main() -> None:
async with AsyncMQTTClient(
host_or_path="localhost", websocket_path="/ws", ssl=True
) as client:
...
asyncio.run(main())
Subscribing to topics
Subscribing is also done via context managers. Here’s an example of how to subscribe to
the my/sample/topic topic:
from __future__ import annotations
import asyncio
from mqttproto.async_client import AsyncMQTTClient
async def main() -> None:
async with AsyncMQTTClient() as client:
async with client.subscribe("topic") as sub:
async for message in sub:
print(f"Received a message: {message.payload!r}")
asyncio.run(main())
You can subscribe to multiple topics or topic patterns at once:
from __future__ import annotations
import asyncio
from mqttproto.async_client import AsyncMQTTClient
async def main() -> None:
async with AsyncMQTTClient() as client:
async with client.subscribe("topic", "more/+/topics", "other/#") as sub:
async for message in sub:
print(f"Received a message from {message.topic}: {message.payload!r}")
asyncio.run(main())
Often, you will find yourself needing to subscribe from several tasks at once. This is fine too:
from __future__ import annotations
import asyncio
from anyio import create_task_group
from anyio.abc import TaskStatus
from mqttproto.async_client import AsyncMQTTClient
async def subscriber1(
client: AsyncMQTTClient, *, task_status: TaskStatus[None]
) -> None:
async with client.subscribe("more/+/topics") as sub:
task_status.started()
async for message in sub:
print(
f"subscriber1: received a message from {message.topic}: "
f"{message.payload!r}"
)
async def subscriber2(
client: AsyncMQTTClient, *, task_status: TaskStatus[None]
) -> None:
async with client.subscribe("other/#") as sub:
task_status.started()
async for message in sub:
print(
f"subscriber2: received a message from {message.topic}: "
f"{message.payload!r}"
)
async def main() -> None:
async with AsyncMQTTClient() as client, create_task_group() as task_group:
await task_group.start(subscriber1, client)
await task_group.start(subscriber2, client)
asyncio.run(main())
Tip
If you find yourself needing so many nested context managers that this actually
hurts code readability, try using AsyncExitStack to manage the
context managers.
See also
Publishing messages
To publish a message to the broker, call the publish() method:
from __future__ import annotations
import asyncio
from mqttproto.async_client import AsyncMQTTClient
async def main() -> None:
async with AsyncMQTTClient() as client:
await client.publish("topic", "message")
asyncio.run(main())
Using the sync client
The synchronous client (MQTTClient) is a wrapper for the asynchronous client,
allowing users to not care about the details of the asynchronous implementation.
Otherwise, the same principles apply to the synchronous client.
Connecting to the broker:
This connects to localhost:1883:
from __future__ import annotations
from mqttproto.sync_client import MQTTClient
with MQTTClient(host_or_path="localhost") as client:
...
This connects to a UNIX domain socket at /path/to/broker.sock:
from __future__ import annotations
from mqttproto.sync_client import MQTTClient
with MQTTClient(host_or_path="/path/to/broker.sock", transport="unix") as client:
...
This connects to http://localhost/ws:
from __future__ import annotations
from mqttproto.sync_client import MQTTClient
with MQTTClient(host_or_path="localhost", websocket_path="/ws") as client:
...
Connecting with TLS:
This connects to localhost:8883:
from __future__ import annotations
from mqttproto.sync_client import MQTTClient
with MQTTClient(host_or_path="localhost", ssl=True) as client:
...
This connects to a UNIX domain socket at /path/to/broker.sock:
from __future__ import annotations
from mqttproto.sync_client import MQTTClient
with MQTTClient(
host_or_path="/path/to/broker.sock", transport="unix", ssl=True
) as client:
...
This connects to https://localhost/ws:
from __future__ import annotations
from mqttproto.sync_client import MQTTClient
with MQTTClient(host_or_path="localhost", websocket_path="/ws", ssl=True) as client:
...