User guide

Installing

To install the library without any extras:

pip install mqttproto

If you need websockets connectivity, you will want to include the websockets extra:

pip install mqttproto[websockets]

Using the async client

The asynchronous client needs to be used as an asynchronous context manager to manage its life cycle, as per the principles of structured concurrency. Exiting the context manager will clean up any resources used, like terminating active connections any active subscriptions and cancelling pending publish operations.

Connecting to the broker

This connects to localhost:1883:

from __future__ import annotations

import asyncio

from mqttproto.async_client import AsyncMQTTClient


async def main() -> None:
    async with AsyncMQTTClient(host_or_path="localhost") as client:
        ...


asyncio.run(main())

When you need to use TLS (formerly SSL) for securing your connections, you can simply pass ssl=True to AsyncMQTTClient, or you can provide your own SSLContext if you need to provide a client certificate or add a custom root certificate.

Note

Enabling TLS changes the default ports as follows:

  • raw MQTT: default port changes to 8883

  • websockets: default port changes to 443

This connects to localhost:8883:

from __future__ import annotations

import asyncio

from mqttproto.async_client import AsyncMQTTClient


async def main() -> None:
    async with AsyncMQTTClient(host_or_path="localhost", ssl=True) as client:
        ...


asyncio.run(main())

Subscribing to topics

Subscribing is also done via context managers. Here’s an example of how to subscribe to the my/sample/topic topic:

from __future__ import annotations

import asyncio

from mqttproto.async_client import AsyncMQTTClient


async def main() -> None:
    async with AsyncMQTTClient() as client:
        async with client.subscribe("topic") as sub:
            async for message in sub:
                print(f"Received a message: {message.payload!r}")


asyncio.run(main())

You can subscribe to multiple topics or topic patterns at once:

from __future__ import annotations

import asyncio

from mqttproto.async_client import AsyncMQTTClient


async def main() -> None:
    async with AsyncMQTTClient() as client:
        async with client.subscribe("topic", "more/+/topics", "other/#") as sub:
            async for message in sub:
                print(f"Received a message from {message.topic}: {message.payload!r}")


asyncio.run(main())

Often, you will find yourself needing to subscribe from several tasks at once. This is fine too:

from __future__ import annotations

import asyncio

from anyio import create_task_group
from anyio.abc import TaskStatus

from mqttproto.async_client import AsyncMQTTClient


async def subscriber1(
    client: AsyncMQTTClient, *, task_status: TaskStatus[None]
) -> None:
    async with client.subscribe("more/+/topics") as sub:
        task_status.started()
        async for message in sub:
            print(
                f"subscriber1: received a message from {message.topic}: "
                f"{message.payload!r}"
            )


async def subscriber2(
    client: AsyncMQTTClient, *, task_status: TaskStatus[None]
) -> None:
    async with client.subscribe("other/#") as sub:
        task_status.started()
        async for message in sub:
            print(
                f"subscriber2: received a message from {message.topic}: "
                f"{message.payload!r}"
            )


async def main() -> None:
    async with AsyncMQTTClient() as client, create_task_group() as task_group:
        await task_group.start(subscriber1, client)
        await task_group.start(subscriber2, client)


asyncio.run(main())

Tip

If you find yourself needing so many nested context managers that this actually hurts code readability, try using AsyncExitStack to manage the context managers.

See also

subscribe()

Publishing messages

To publish a message to the broker, call the publish() method:

from __future__ import annotations

import asyncio

from mqttproto.async_client import AsyncMQTTClient


async def main() -> None:
    async with AsyncMQTTClient() as client:
        await client.publish("topic", "message")


asyncio.run(main())

Using the sync client

The synchronous client (MQTTClient) is a wrapper for the asynchronous client, allowing users to not care about the details of the asynchronous implementation. Otherwise, the same principles apply to the synchronous client.

Connecting to the broker:

This connects to localhost:1883:

from __future__ import annotations

from mqttproto.sync_client import MQTTClient

with MQTTClient(host_or_path="localhost") as client:
    ...

Connecting with TLS:

This connects to localhost:8883:

from __future__ import annotations

from mqttproto.sync_client import MQTTClient

with MQTTClient(host_or_path="localhost", ssl=True) as client:
    ...