API Reference

Enumerated types

enum mqttproto.PropertyType(value)
Member Type:

int

Valid values are as follows:

PAYLOAD_FORMAT_INDICATOR = <PropertyType.PAYLOAD_FORMAT_INDICATOR: 1>
MESSAGE_EXPIRY_INTERVAL = <PropertyType.MESSAGE_EXPIRY_INTERVAL: 2>
CONTENT_TYPE = <PropertyType.CONTENT_TYPE: 3>
RESPONSE_TOPIC = <PropertyType.RESPONSE_TOPIC: 8>
CORRELATION_DATA = <PropertyType.CORRELATION_DATA: 9>
SUBSCRIPTION_IDENTIFIER = <PropertyType.SUBSCRIPTION_IDENTIFIER: 11>
SESSION_EXPIRY_INTERVAL = <PropertyType.SESSION_EXPIRY_INTERVAL: 17>
ASSIGNED_CLIENT_IDENTIFIER = <PropertyType.ASSIGNED_CLIENT_IDENTIFIER: 18>
SERVER_KEEP_ALIVE = <PropertyType.SERVER_KEEP_ALIVE: 19>
AUTHENTICATION_METHOD = <PropertyType.AUTHENTICATION_METHOD: 21>
AUTHENTICATION_DATA = <PropertyType.AUTHENTICATION_DATA: 22>
REQUEST_PROBLEM_INFORMATION = <PropertyType.REQUEST_PROBLEM_INFORMATION: 23>
WILL_DELAY_INTERVAL = <PropertyType.WILL_DELAY_INTERVAL: 24>
REQUEST_RESPONSE_INFORMATION = <PropertyType.REQUEST_RESPONSE_INFORMATION: 25>
RESPONSE_INFORMATION = <PropertyType.RESPONSE_INFORMATION: 26>
SERVER_REFERENCE = <PropertyType.SERVER_REFERENCE: 28>
REASON_STRING = <PropertyType.REASON_STRING: 31>
RECEIVE_MAXIMUM = <PropertyType.RECEIVE_MAXIMUM: 33>
TOPIC_ALIAS_MAXIMUM = <PropertyType.TOPIC_ALIAS_MAXIMUM: 34>
TOPIC_ALIAS = <PropertyType.TOPIC_ALIAS: 35>
MAXIMUM_QOS = <PropertyType.MAXIMUM_QOS: 36>
RETAIN_AVAILABLE = <PropertyType.RETAIN_AVAILABLE: 37>
USER_PROPERTY = <PropertyType.USER_PROPERTY: 38>
MAXIMUM_PACKET_SIZE = <PropertyType.MAXIMUM_PACKET_SIZE: 39>
WILDCARD_SUBSCRIPTION_AVAILABLE = <PropertyType.WILDCARD_SUBSCRIPTION_AVAILABLE: 40>
SUBSCRIPTION_IDENTIFIER_AVAILABLE = <PropertyType.SUBSCRIPTION_IDENTIFIER_AVAILABLE: 41>
SHARED_SUBSCRIPTION_AVAILABLE = <PropertyType.SHARED_SUBSCRIPTION_AVAILABLE: 42>
enum mqttproto.QoS(value)
Member Type:

int

Valid values are as follows:

AT_MOST_ONCE = <QoS.AT_MOST_ONCE: 0>
AT_LEAST_ONCE = <QoS.AT_LEAST_ONCE: 1>
EXACTLY_ONCE = <QoS.EXACTLY_ONCE: 2>
enum mqttproto.ReasonCode(value)
Member Type:

int

Valid values are as follows:

SUCCESS = <ReasonCode.SUCCESS: 0>
GRANTED_QOS_1 = <ReasonCode.GRANTED_QOS_1: 1>
GRANTED_QOS_2 = <ReasonCode.GRANTED_QOS_2: 2>
DISCONNECT_WITH_WILL_MESSAGE = <ReasonCode.DISCONNECT_WITH_WILL_MESSAGE: 4>
NO_MATCHING_SUBSCRIBERS = <ReasonCode.NO_MATCHING_SUBSCRIBERS: 16>
NO_SUBSCRIPTION_EXISTED = <ReasonCode.NO_SUBSCRIPTION_EXISTED: 17>
CONTINUE_AUTHENTICATION = <ReasonCode.CONTINUE_AUTHENTICATION: 24>
REAUTHENTICATE = <ReasonCode.REAUTHENTICATE: 25>
UNSPECIFIED_ERROR = <ReasonCode.UNSPECIFIED_ERROR: 128>
MALFORMED_PACKET = <ReasonCode.MALFORMED_PACKET: 129>
PROTOCOL_ERROR = <ReasonCode.PROTOCOL_ERROR: 130>
IMPLEMENTATION_SPECIFIC_ERROR = <ReasonCode.IMPLEMENTATION_SPECIFIC_ERROR: 131>
UNSUPPORTED_PROTOCOL_VERSION = <ReasonCode.UNSUPPORTED_PROTOCOL_VERSION: 132>
CLIENT_IDENTIFIER_NOT_VALID = <ReasonCode.CLIENT_IDENTIFIER_NOT_VALID: 133>
BAD_USERNAME_OR_PASSWORD = <ReasonCode.BAD_USERNAME_OR_PASSWORD: 134>
NOT_AUTHORIZED = <ReasonCode.NOT_AUTHORIZED: 135>
SERVER_UNAVAILABLE = <ReasonCode.SERVER_UNAVAILABLE: 136>
SERVER_BUSY = <ReasonCode.SERVER_BUSY: 137>
BANNED = <ReasonCode.BANNED: 138>
SERVER_SHUTTING_DOWN = <ReasonCode.SERVER_SHUTTING_DOWN: 139>
BAD_AUTHENTICATION_METHOD = <ReasonCode.BAD_AUTHENTICATION_METHOD: 140>
KEEP_ALIVE_TIMEOUT = <ReasonCode.KEEP_ALIVE_TIMEOUT: 141>
SESSION_TAKEN_OVER = <ReasonCode.SESSION_TAKEN_OVER: 142>
TOPIC_FILTER_INVALID = <ReasonCode.TOPIC_FILTER_INVALID: 143>
TOPIC_NAME_INVALID = <ReasonCode.TOPIC_NAME_INVALID: 144>
PACKET_IDENTIFIER_IN_USE = <ReasonCode.PACKET_IDENTIFIER_IN_USE: 145>
PACKET_IDENTIFIER_NOT_FOUND = <ReasonCode.PACKET_IDENTIFIER_NOT_FOUND: 146>
RECEIVE_MAXIMUM_EXCEEDED = <ReasonCode.RECEIVE_MAXIMUM_EXCEEDED: 147>
TOPIC_ALIAS_INVALID = <ReasonCode.TOPIC_ALIAS_INVALID: 148>
PACKET_TOO_LARGE = <ReasonCode.PACKET_TOO_LARGE: 149>
MESSAGE_RATE_TOO_HIGH = <ReasonCode.MESSAGE_RATE_TOO_HIGH: 150>
QUOTA_EXCEEDED = <ReasonCode.QUOTA_EXCEEDED: 151>
ADMINISTRATIVE_ACTION = <ReasonCode.ADMINISTRATIVE_ACTION: 152>
PAYLOAD_FORMAT_INVALID = <ReasonCode.PAYLOAD_FORMAT_INVALID: 153>
RETAIN_NOT_SUPPORTED = <ReasonCode.RETAIN_NOT_SUPPORTED: 154>
QOS_NOT_SUPPORTED = <ReasonCode.QOS_NOT_SUPPORTED: 155>
USE_ANOTHER_SERVER = <ReasonCode.USE_ANOTHER_SERVER: 156>
SERVER_MOVED = <ReasonCode.SERVER_MOVED: 157>
SHARED_SUBSCRIPTIONS_NOT_SUPPORTED = <ReasonCode.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED: 158>
CONNECTION_RATE_EXCEEDED = <ReasonCode.CONNECTION_RATE_EXCEEDED: 159>
MAXIMUM_CONNECT_TIME = <ReasonCode.MAXIMUM_CONNECT_TIME: 160>
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = <ReasonCode.SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED: 161>
WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = <ReasonCode.WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED: 162>
enum mqttproto.RetainHandling(value)
Member Type:

int

Valid values are as follows:

SEND_RETAINED = <RetainHandling.SEND_RETAINED: 0>
SEND_RETAINED_IF_NOT_SUBSCRIBED = <RetainHandling.SEND_RETAINED_IF_NOT_SUBSCRIBED: 1>
NO_RETAINED = <RetainHandling.NO_RETAINED: 2>
enum mqttproto.MQTTClientState(value)

Valid values are as follows:

DISCONNECTED = <MQTTClientState.DISCONNECTED: 1>
CONNECTING = <MQTTClientState.CONNECTING: 2>
CONNECTED = <MQTTClientState.CONNECTED: 3>

Supporting classes

class mqttproto.Subscription(pattern, *, max_qos=QoS.EXACTLY_ONCE, no_local=False, retain_as_published=True, retain_handling=RetainHandling.SEND_RETAINED)
matches(publish)

Check if a published message matches this subscription.

Parameters:

publish (MQTTPublishPacket) – an MQTT PUBLISH packet

Return type:

bool

Returns:

True if the published message matches this pattern, False if not

class mqttproto.Will(*, properties=NOTHING, user_properties=NOTHING, topic, payload, retain=False, qos=QoS.AT_MOST_ONCE)

MQTT packet classes

class mqttproto.MQTTPacket

Abstract base class for all MQTT packets

class mqttproto.MQTTConnectPacket(*, properties=NOTHING, user_properties=NOTHING, client_id, will=None, username=None, password=None, clean_start=False, keep_alive=0)

Connection request

class mqttproto.MQTTConnAckPacket(*, properties=NOTHING, user_properties=NOTHING, reason_code, session_present)

Connection acknowledgment

class mqttproto.MQTTPublishPacket(*, properties=NOTHING, user_properties=NOTHING, topic, payload, packet_id=None, retain=False, qos=QoS.AT_MOST_ONCE, duplicate=False, ack_state=PublishAckState.UNACKNOWLEDGED)

Publish message

class mqttproto.MQTTPublishAckPacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, reason_code)

Publish acknowledgment (QoS 1)

class mqttproto.MQTTPublishReceivePacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, reason_code)

Publish received (QoS 2 delivery part 1)

class mqttproto.MQTTPublishReleasePacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, reason_code)

Publish release (QoS 2 delivery part 2)

class mqttproto.MQTTPublishCompletePacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, reason_code)

Publish complete (QoS 2 delivery part 3)

class mqttproto.MQTTSubscribePacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, subscriptions)

Subscribe request

class mqttproto.MQTTSubscribeAckPacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, reason_codes)

Subscribe acknowledgment

class mqttproto.MQTTUnsubscribePacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, patterns)

Unsubscribe request

class mqttproto.MQTTUnsubscribeAckPacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, reason_codes)

Unsubscribe acknowledgment

class mqttproto.MQTTPingRequestPacket

PING request

class mqttproto.MQTTPingResponsePacket

PING response

class mqttproto.MQTTAuthPacket(*, properties=NOTHING, user_properties=NOTHING, reason_code)

Authentication exchange

class mqttproto.MQTTDisconnectPacket(*, properties=NOTHING, user_properties=NOTHING, reason_code)

Disconnect notification

Client-side state machine

class mqttproto.client_state_machine.MQTTClientStateMachine(client_id=None)

State machine for a client’s session with an MQTT broker.

property auth_method: str | None

The authentication method chosen by the client.

feed_bytes(data)

Input bytes received from the transport stream to the state machine.

The state machine parses and validates any packets found in the stream and alters its state accordingly. If the last packet is incomplete, it will remain in the inbound data buffer until a later call which completes the packet.

Parameters:

data (bytes) – bytes received from the transport stream

Return type:

Sequence[MQTTPacket]

Returns:

a sequence of complete packets parsed from the bytes

get_outbound_data()

Retrieve any bytes to be sent to the peer.

This method is not idempotent, as the returned bytes are removed from the outbound data buffer.

Return type:

bytes

property maximum_qos: QoS

Returns the maximum QoS level that the broker supports.

property may_retain: bool

Does the server support RETAINed messages?

publish(topic, payload, *, qos=QoS.AT_MOST_ONCE, retain=False)

Send a PUBLISH request.

Parameters:
  • topic (str) – topic to publish the message on

  • payload (str | bytes) – the actual message to publish

  • qos (QoS)

  • retain (bool) – True to send the message to any future subscribers of the topic too

Return type:

int | None

Returns:

the packet ID if qos was higher than 0

A QoS that’s not supported by the server is silently downgraded. If Retain is not supported, the message is sent as-is because the server is free to accept it anyway.

property state: MQTTClientState

The current state of the client.

subscribe(subscriptions)

Subscribe to one or more topic patterns.

If any of the given subscription was Send a SUBSCRIBE request, containing one of more subscriptions.

Parameters:

subscriptions (Sequence[Subscription]) – a sequence of subscriptions

Return type:

int | None

Returns:

packet ID of the SUBSCRIBE request, or None if a request did not need to be sent

unsubscribe(patterns)

Unsubscribe from one or more topic patterns.

If the internal subscription for any of the given patterns goes down to 0, an UNSUBSCRIBE request is sent for those patterns.

Parameters:

patterns (Sequence[str]) – topic patterns to unsubscribe from

Return type:

int | None

Returns:

packet ID of the UNSUBSCRIBE request, or None if a request did not need to be sent

Broker-side state machines

class mqttproto.broker_state_machine.MQTTBrokerStateMachine

State machine for an MQTT broker.

client_disconnected(client_id, packet)

Handle a client disconnection.

Parameters:
  • client_id (str) – ID of the client that disconnected

  • packet (MQTTDisconnectPacket | None) – the DISCONNECT packet sent by the client, or None if the transport stream was closed

Return type:

None

publish(source_client_id, packet)

Publish a message from the given client to all the appropriate subscribers.

Parameters:
  • source_client_id (str) – ID of the client that published the message

  • packet (MQTTPublishPacket) – the PUBLISH packet sent by the client

Return type:

Collection[str]

Returns:

a collection of client IDs to send the publish packet to

subscribe(client_id, packet)

Add a new subscription from the given client.

Return type:

None

class mqttproto.broker_state_machine.MQTTBrokerClientStateMachine

State machine for the MQTT broker’s view of a client session.

acknowledge_connect(reason_code, username, session_present)

Respond to a CONNECT request by the client.

Parameters:
  • reason_code (ReasonCode) – the reason code indicating either success or failure

  • username (str | None) – the username the client authenticated as

  • session_present (bool) – True if a previously existing session was resumed, False if not

Return type:

None

acknowledge_subscribe(packet_id, reason_codes)

Respond to a SUBSCRIBE request by the client.

Parameters:
  • packet_id (int) – the packet ID from the SUBSCRIBE packet

  • reason_codes (Sequence[ReasonCode]) – the reason code indicating either success or failure for the corresponding subscriptions in the original request (MUST be in the same order to be matched against the correct subscriptions)

Return type:

None

property auth_method: str | None

The authentication method chosen by the client.

deliver_publish(source_client_id, packet)

Deliver a PUBLISH message to this client if the current state allows it.

Parameters:
  • source_client_id (str) – ID of the client that sent the message

  • packet (MQTTPublishPacket) – the PUBLISH packet sent by the client

Return type:

bool

Returns:

True if this client accepted the message for delivery, False if not

feed_bytes(data)

Input bytes received from the transport stream to the state machine.

The state machine parses and validates any packets found in the stream and alters its state accordingly. If the last packet is incomplete, it will remain in the inbound data buffer until a later call which completes the packet.

Parameters:

data (bytes) – bytes received from the transport stream

Return type:

Sequence[MQTTPacket]

Returns:

a sequence of complete packets parsed from the bytes

get_outbound_data()

Retrieve any bytes to be sent to the peer.

This method is not idempotent, as the returned bytes are removed from the outbound data buffer.

Return type:

bytes

property state: MQTTClientState

The current state of the client.

property username: str | None

The username the client authenticated as.

Concrete client implementations

class mqttproto.async_client.AsyncMQTTClient(host_or_path=None, port=None, *, transport='tcp', websocket_path=None, ssl=False, client_id=None, username=None, password=None, clean_start=True, receive_maximum=65535, max_packet_size=None, will=None)

An asynchronous MQTT client.

Must be used as an asynchronous context manager.

Parameters:
  • host_or_path (str | None) – host name or UNIX socket path on the file system (defaults to localhost for TCP based connections; must be provided for UNIX socket connections)

  • port (int | None) – port number to connect to in case of the tcp transport (defaults to 1883 or 8883 for direct MQTT depending on the ssl parameter; 80 or 443 for websocket connections)

  • transport (Literal['tcp', 'unix']) – either tcp (TCP) or unix (UNIX domain sockets)

  • websocket_path (str | None) – the URL path in the HTTP request when using websockets to connect to the MQTT broker (must be set to use websockets instead of direct MQTT)

  • ssl (bool | SSLContext) – to use TLS (SSL), set to either True (to use the default SSL context), or provide your own custom SSL context here

  • client_id (str | None) – custom client ID to use when connecting to the MQTT broker (needs to stay constant across restarts for session resumption to work)

  • username (str | None) – user name to use for authenticating against the MQTT broker

  • password (str | None) – password to use for authenticating against the MQTT broker

  • clean_start (bool) – if False, try to resume a previous session (tied to the client ID)

  • receive_maximum (int) – number of unconfirmed QoS 1/2 messages that the client is willing to store at once

  • max_packet_size (int | None) – maximum packet size in bytes, or None for no limit

  • will (Will | None) – message that will be published by the broker on the client’s behalf if the client disconnects unexpectedly or fails to communicate within the keepalive time

property maximum_qos: QoS

Returns the maximum QoS level that the broker supports.

async publish(topic, payload, *, qos=QoS.AT_MOST_ONCE, retain=False)

Publish a message to the given topic.

Parameters:
  • topic (str) – the topic to publish the message to

  • payload (bytes | str) – the message to publish

  • qos (QoS) – the QoS of the message

  • retain (bool) – if True, the message will be always be sent to clients that subscribe to the given topic even after the message was already published before the subscription happened

Return type:

None

subscribe(*patterns, maximum_qos=QoS.EXACTLY_ONCE, no_local=False, retain_as_published=True, retain_handling=RetainHandling.SEND_RETAINED)

Subscribe to the given topics or topic patterns.

Parameters:
  • patterns (str) – either exact topic names, or patterns containing wildcards (+ or #)

  • maximum_qos (QoS) – maximum QoS to allow (messages matching the given patterns but with higher QoS will be downgraded to that QoS)

  • no_local (bool) – if True, messages published by this client will not be sent back to it via this subscription

  • retain_as_published (bool) – if False, the broker will clear the retained flag of all published messages sent to the client

  • retain_handling (RetainHandling) –

    • If set to SEND_RETAINED (the default), the broker will send any retained messages matching this subscription to this client

    • If set to SEND_RETAINED_IF_NOT_SUBSCRIBED, then the broker will only send retained messages if this subscription did not already exist

    • If set to NO_RETAINED, then retained messages will not be sent to this client

Return type:

AsyncGenerator[AsyncMQTTSubscription, None]

Returns:

an async context manager that will yield messages matching the subscribed topics/patterns

class mqttproto.async_client.AsyncMQTTSubscription(subscriptions)
class mqttproto.sync_client.MQTTClient(host_or_path=None, port=None, *, transport='tcp', websocket_path=None, ssl=False, client_id=None, username=None, password=None, clean_start=True, receive_maximum=65535, max_packet_size=None, will=None)
publish(topic, payload, *, qos=QoS.AT_MOST_ONCE, retain=False)

Publish a message to the given topic.

Parameters:
  • topic (str) – the topic to publish the message to

  • payload (bytes | str) – the message to publish

  • qos (QoS) – the QoS of the message

  • retain (bool) – if True, the message will be always be sent to clients that subscribe to the given topic even after the message was already published before the subscription happened

Return type:

None

subscribe(*patterns, maximum_qos=QoS.EXACTLY_ONCE, no_local=False, retain_as_published=True, retain_handling=RetainHandling.SEND_RETAINED)

Subscribe to the given topics or topic patterns.

Parameters:
  • patterns (str) – either exact topic names, or patterns containing wildcards (+ or #)

  • maximum_qos (QoS) – maximum QoS to allow (messages matching the given patterns but with higher QoS will be downgraded to that QoS)

  • no_local (bool) – if True, messages published by this client will not be sent back to it via this subscription

  • retain_as_published (bool) – if False, the broker will clear the retained flag of all published messages sent to the client

  • retain_handling (RetainHandling) –

    • If set to SEND_RETAINED (the default), the broker will send any retained messages matching this subscription to this client

    • If set to SEND_RETAINED_IF_NOT_SUBSCRIBED, then the broker will only send retained messages if this subscription did not already exist

    • If set to NO_RETAINED, then retained messages will not be sent to this client

Return type:

Generator[MQTTSubscription, None, None]

Returns:

an async context manager that will yield messages matching the subscribed topics/patterns

class mqttproto.sync_client.MQTTSubscription(async_subscription, portal)

Concrete broker implementation

class mqttproto.async_broker.AsyncMQTTBroker(bind_address=('127.0.0.1', 1883), ssl_context=None, authenticators=NOTHING, authorizers=NOTHING)

A simple MQTT broker implementation.

Parameters:
  • bind_address (tuple[str, int] | str | bytes | PathLike[str] | PathLike[bytes]) – either a tuple of (address, port) or a path to a local UNIX socket to bind to

  • ssl_context (SSLContext | None) – an SSL context which, if given, will be used to do the TLS handshake with incoming connections

  • authenticators (Sequence[MQTTAuthenticator]) – a sequence of authenticators that will be called, starting from the first one, to authenticate a login attempt

  • authorizers (Sequence[MQTTAuthorizer]) – a sequence of authorizers that will be called, starting from the first one, to authenticate PUBLISH and SUBSCRIBE requests

async handle_packet(packet, client_state_machine, stream)

Called by serve_client() to handle an MQTT packet received from the client.

Parameters:
Return type:

None

async serve_client(stream)

Called to handle a connected client.

Parameters:

stream (ByteStream) – the byte stream for the client.

Return type:

None

class mqttproto.async_broker.MQTTAuthenticator
abstractmethod async authenticate(client_id, username, password, stream)

Determine whether the given client should be authenticated.

Parameters:
  • client_id (str) – the client ID

  • username (str | None) – the username presented in the CONNECT packet

  • password (str | None) – the password presented in the CONNECT packet

  • stream (ByteStream) –

    the client’s transport stream; could be used, for example, for:

    • IP whitelist/blacklist checking

    • strong UNIX authentication

    • TLS client certificate checking

Return type:

ReasonCode | None

Returns:

a reason code to either allow or deny the login, or None to continue with the next authenticator

class mqttproto.async_broker.MQTTAuthorizer
abstractmethod async authorize_publish(topic, client_id, username, stream)

Determine whether the given client should be authenticated.

Parameters:
  • topic (str) – the topic in the PUBLISH packet

  • client_id (str) – the client ID

  • username (str | None) – the username presented in the CONNECT packet

  • stream (ByteStream) – the client’s transport stream

Return type:

ReasonCode | None

Returns:

a reason code to either allow or deny the publish, or None to continue with the next authorizer

abstractmethod async authorize_subscribe(pattern, client_id, username, stream)

Determine whether the given client should be allowed to subscribe to the given topic pattern.

Parameters:
  • pattern (str) – the topic filter

  • client_id (str) – the client ID

  • username (str | None) – the username presented in the CONNECT packet

  • stream (ByteStream) – the client’s transport stream

Return type:

ReasonCode | None

Returns:

a reason code to either allow or deny the subscription, or None to continue with the next authorizer