API Reference
Enumerated types
- enum mqttproto.PropertyType(value)
- Member Type:
Valid values are as follows:
- PAYLOAD_FORMAT_INDICATOR = <PropertyType.PAYLOAD_FORMAT_INDICATOR: 1>
- MESSAGE_EXPIRY_INTERVAL = <PropertyType.MESSAGE_EXPIRY_INTERVAL: 2>
- CONTENT_TYPE = <PropertyType.CONTENT_TYPE: 3>
- RESPONSE_TOPIC = <PropertyType.RESPONSE_TOPIC: 8>
- CORRELATION_DATA = <PropertyType.CORRELATION_DATA: 9>
- SUBSCRIPTION_IDENTIFIER = <PropertyType.SUBSCRIPTION_IDENTIFIER: 11>
- SESSION_EXPIRY_INTERVAL = <PropertyType.SESSION_EXPIRY_INTERVAL: 17>
- ASSIGNED_CLIENT_IDENTIFIER = <PropertyType.ASSIGNED_CLIENT_IDENTIFIER: 18>
- SERVER_KEEP_ALIVE = <PropertyType.SERVER_KEEP_ALIVE: 19>
- AUTHENTICATION_METHOD = <PropertyType.AUTHENTICATION_METHOD: 21>
- AUTHENTICATION_DATA = <PropertyType.AUTHENTICATION_DATA: 22>
- REQUEST_PROBLEM_INFORMATION = <PropertyType.REQUEST_PROBLEM_INFORMATION: 23>
- WILL_DELAY_INTERVAL = <PropertyType.WILL_DELAY_INTERVAL: 24>
- REQUEST_RESPONSE_INFORMATION = <PropertyType.REQUEST_RESPONSE_INFORMATION: 25>
- RESPONSE_INFORMATION = <PropertyType.RESPONSE_INFORMATION: 26>
- SERVER_REFERENCE = <PropertyType.SERVER_REFERENCE: 28>
- REASON_STRING = <PropertyType.REASON_STRING: 31>
- RECEIVE_MAXIMUM = <PropertyType.RECEIVE_MAXIMUM: 33>
- TOPIC_ALIAS_MAXIMUM = <PropertyType.TOPIC_ALIAS_MAXIMUM: 34>
- TOPIC_ALIAS = <PropertyType.TOPIC_ALIAS: 35>
- MAXIMUM_QOS = <PropertyType.MAXIMUM_QOS: 36>
- RETAIN_AVAILABLE = <PropertyType.RETAIN_AVAILABLE: 37>
- USER_PROPERTY = <PropertyType.USER_PROPERTY: 38>
- MAXIMUM_PACKET_SIZE = <PropertyType.MAXIMUM_PACKET_SIZE: 39>
- WILDCARD_SUBSCRIPTION_AVAILABLE = <PropertyType.WILDCARD_SUBSCRIPTION_AVAILABLE: 40>
- SUBSCRIPTION_IDENTIFIER_AVAILABLE = <PropertyType.SUBSCRIPTION_IDENTIFIER_AVAILABLE: 41>
- SHARED_SUBSCRIPTION_AVAILABLE = <PropertyType.SHARED_SUBSCRIPTION_AVAILABLE: 42>
- enum mqttproto.QoS(value)
- Member Type:
Valid values are as follows:
- AT_MOST_ONCE = <QoS.AT_MOST_ONCE: 0>
- AT_LEAST_ONCE = <QoS.AT_LEAST_ONCE: 1>
- EXACTLY_ONCE = <QoS.EXACTLY_ONCE: 2>
- enum mqttproto.ReasonCode(value)
- Member Type:
Valid values are as follows:
- SUCCESS = <ReasonCode.SUCCESS: 0>
- GRANTED_QOS_1 = <ReasonCode.GRANTED_QOS_1: 1>
- GRANTED_QOS_2 = <ReasonCode.GRANTED_QOS_2: 2>
- DISCONNECT_WITH_WILL_MESSAGE = <ReasonCode.DISCONNECT_WITH_WILL_MESSAGE: 4>
- NO_MATCHING_SUBSCRIBERS = <ReasonCode.NO_MATCHING_SUBSCRIBERS: 16>
- NO_SUBSCRIPTION_EXISTED = <ReasonCode.NO_SUBSCRIPTION_EXISTED: 17>
- CONTINUE_AUTHENTICATION = <ReasonCode.CONTINUE_AUTHENTICATION: 24>
- REAUTHENTICATE = <ReasonCode.REAUTHENTICATE: 25>
- UNSPECIFIED_ERROR = <ReasonCode.UNSPECIFIED_ERROR: 128>
- MALFORMED_PACKET = <ReasonCode.MALFORMED_PACKET: 129>
- PROTOCOL_ERROR = <ReasonCode.PROTOCOL_ERROR: 130>
- IMPLEMENTATION_SPECIFIC_ERROR = <ReasonCode.IMPLEMENTATION_SPECIFIC_ERROR: 131>
- UNSUPPORTED_PROTOCOL_VERSION = <ReasonCode.UNSUPPORTED_PROTOCOL_VERSION: 132>
- CLIENT_IDENTIFIER_NOT_VALID = <ReasonCode.CLIENT_IDENTIFIER_NOT_VALID: 133>
- BAD_USERNAME_OR_PASSWORD = <ReasonCode.BAD_USERNAME_OR_PASSWORD: 134>
- NOT_AUTHORIZED = <ReasonCode.NOT_AUTHORIZED: 135>
- SERVER_UNAVAILABLE = <ReasonCode.SERVER_UNAVAILABLE: 136>
- SERVER_BUSY = <ReasonCode.SERVER_BUSY: 137>
- BANNED = <ReasonCode.BANNED: 138>
- SERVER_SHUTTING_DOWN = <ReasonCode.SERVER_SHUTTING_DOWN: 139>
- BAD_AUTHENTICATION_METHOD = <ReasonCode.BAD_AUTHENTICATION_METHOD: 140>
- KEEP_ALIVE_TIMEOUT = <ReasonCode.KEEP_ALIVE_TIMEOUT: 141>
- SESSION_TAKEN_OVER = <ReasonCode.SESSION_TAKEN_OVER: 142>
- TOPIC_FILTER_INVALID = <ReasonCode.TOPIC_FILTER_INVALID: 143>
- TOPIC_NAME_INVALID = <ReasonCode.TOPIC_NAME_INVALID: 144>
- PACKET_IDENTIFIER_IN_USE = <ReasonCode.PACKET_IDENTIFIER_IN_USE: 145>
- PACKET_IDENTIFIER_NOT_FOUND = <ReasonCode.PACKET_IDENTIFIER_NOT_FOUND: 146>
- RECEIVE_MAXIMUM_EXCEEDED = <ReasonCode.RECEIVE_MAXIMUM_EXCEEDED: 147>
- TOPIC_ALIAS_INVALID = <ReasonCode.TOPIC_ALIAS_INVALID: 148>
- PACKET_TOO_LARGE = <ReasonCode.PACKET_TOO_LARGE: 149>
- MESSAGE_RATE_TOO_HIGH = <ReasonCode.MESSAGE_RATE_TOO_HIGH: 150>
- QUOTA_EXCEEDED = <ReasonCode.QUOTA_EXCEEDED: 151>
- ADMINISTRATIVE_ACTION = <ReasonCode.ADMINISTRATIVE_ACTION: 152>
- PAYLOAD_FORMAT_INVALID = <ReasonCode.PAYLOAD_FORMAT_INVALID: 153>
- RETAIN_NOT_SUPPORTED = <ReasonCode.RETAIN_NOT_SUPPORTED: 154>
- QOS_NOT_SUPPORTED = <ReasonCode.QOS_NOT_SUPPORTED: 155>
- USE_ANOTHER_SERVER = <ReasonCode.USE_ANOTHER_SERVER: 156>
- SERVER_MOVED = <ReasonCode.SERVER_MOVED: 157>
- SHARED_SUBSCRIPTIONS_NOT_SUPPORTED = <ReasonCode.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED: 158>
- CONNECTION_RATE_EXCEEDED = <ReasonCode.CONNECTION_RATE_EXCEEDED: 159>
- MAXIMUM_CONNECT_TIME = <ReasonCode.MAXIMUM_CONNECT_TIME: 160>
- SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED = <ReasonCode.SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED: 161>
- WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = <ReasonCode.WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED: 162>
Supporting classes
- class mqttproto.Subscription(pattern, *, max_qos=QoS.EXACTLY_ONCE, no_local=False, retain_as_published=True, retain_handling=RetainHandling.SEND_RETAINED)
- matches(publish)
Check if a published message matches this subscription.
- Parameters:
publish (
MQTTPublishPacket) – an MQTTPUBLISHpacket- Return type:
- Returns:
Trueif the published message matches this pattern,Falseif not
- class mqttproto.Will(*, properties=NOTHING, user_properties=NOTHING, topic, payload, retain=False, qos=QoS.AT_MOST_ONCE)
MQTT packet classes
- class mqttproto.MQTTPacket
Abstract base class for all MQTT packets
- class mqttproto.MQTTConnectPacket(*, properties=NOTHING, user_properties=NOTHING, client_id, will=None, username=None, password=None, clean_start=False, keep_alive=0)
Connection request
- class mqttproto.MQTTConnAckPacket(*, properties=NOTHING, user_properties=NOTHING, reason_code, session_present)
Connection acknowledgment
- class mqttproto.MQTTPublishPacket(*, properties=NOTHING, user_properties=NOTHING, topic, payload, packet_id=None, retain=False, qos=QoS.AT_MOST_ONCE, duplicate=False, ack_state=PublishAckState.UNACKNOWLEDGED)
Publish message
- class mqttproto.MQTTPublishAckPacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, reason_code)
Publish acknowledgment (QoS 1)
- class mqttproto.MQTTPublishReceivePacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, reason_code)
Publish received (QoS 2 delivery part 1)
- class mqttproto.MQTTPublishReleasePacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, reason_code)
Publish release (QoS 2 delivery part 2)
- class mqttproto.MQTTPublishCompletePacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, reason_code)
Publish complete (QoS 2 delivery part 3)
- class mqttproto.MQTTSubscribePacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, subscriptions)
Subscribe request
- class mqttproto.MQTTSubscribeAckPacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, reason_codes)
Subscribe acknowledgment
- class mqttproto.MQTTUnsubscribePacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, patterns)
Unsubscribe request
- class mqttproto.MQTTUnsubscribeAckPacket(*, properties=NOTHING, user_properties=NOTHING, packet_id, reason_codes)
Unsubscribe acknowledgment
- class mqttproto.MQTTPingRequestPacket
PING request
- class mqttproto.MQTTPingResponsePacket
PING response
- class mqttproto.MQTTAuthPacket(*, properties=NOTHING, user_properties=NOTHING, reason_code)
Authentication exchange
- class mqttproto.MQTTDisconnectPacket(*, properties=NOTHING, user_properties=NOTHING, reason_code)
Disconnect notification
Client-side state machine
- class mqttproto.client_state_machine.MQTTClientStateMachine(client_id=None)
State machine for a client’s session with an MQTT broker.
- feed_bytes(data)
Input bytes received from the transport stream to the state machine.
The state machine parses and validates any packets found in the stream and alters its state accordingly. If the last packet is incomplete, it will remain in the inbound data buffer until a later call which completes the packet.
- Parameters:
data (
bytes) – bytes received from the transport stream- Return type:
- Returns:
a sequence of complete packets parsed from the bytes
- get_outbound_data()
Retrieve any bytes to be sent to the peer.
This method is not idempotent, as the returned bytes are removed from the outbound data buffer.
- Return type:
- publish(topic, payload, *, qos=QoS.AT_MOST_ONCE, retain=False)
Send a
PUBLISHrequest.- Parameters:
- Return type:
- Returns:
the packet ID if
qoswas higher than 0
A QoS that’s not supported by the server is silently downgraded. If Retain is not supported, the message is sent as-is because the server is free to accept it anyway.
- property state: MQTTClientState
The current state of the client.
- subscribe(subscriptions)
Subscribe to one or more topic patterns.
If any of the given subscription was Send a
SUBSCRIBErequest, containing one of more subscriptions.- Parameters:
subscriptions (
Sequence[Subscription]) – a sequence of subscriptions- Return type:
- Returns:
packet ID of the
SUBSCRIBErequest, orNoneif a request did not need to be sent
- unsubscribe(patterns)
Unsubscribe from one or more topic patterns.
If the internal subscription for any of the given patterns goes down to 0, an
UNSUBSCRIBErequest is sent for those patterns.
Broker-side state machines
- class mqttproto.broker_state_machine.MQTTBrokerStateMachine
State machine for an MQTT broker.
- client_disconnected(client_id, packet)
Handle a client disconnection.
- Parameters:
client_id (
str) – ID of the client that disconnectedpacket (
MQTTDisconnectPacket|None) – theDISCONNECTpacket sent by the client, orNoneif the transport stream was closed
- Return type:
- publish(source_client_id, packet)
Publish a message from the given client to all the appropriate subscribers.
- Parameters:
source_client_id (
str) – ID of the client that published the messagepacket (
MQTTPublishPacket) – thePUBLISHpacket sent by the client
- Return type:
- Returns:
a collection of client IDs to send the publish packet to
- class mqttproto.broker_state_machine.MQTTBrokerClientStateMachine
State machine for the MQTT broker’s view of a client session.
- acknowledge_connect(reason_code, username, session_present)
Respond to a
CONNECTrequest by the client.- Parameters:
reason_code (
ReasonCode) – the reason code indicating either success or failureusername (
str|None) – the username the client authenticated assession_present (
bool) –Trueif a previously existing session was resumed,Falseif not
- Return type:
- acknowledge_subscribe(packet_id, reason_codes)
Respond to a
SUBSCRIBErequest by the client.- Parameters:
packet_id (
int) – the packet ID from theSUBSCRIBEpacketreason_codes (
Sequence[ReasonCode]) – the reason code indicating either success or failure for the corresponding subscriptions in the original request (MUST be in the same order to be matched against the correct subscriptions)
- Return type:
- deliver_publish(source_client_id, packet)
Deliver a
PUBLISHmessage to this client if the current state allows it.- Parameters:
source_client_id (
str) – ID of the client that sent the messagepacket (
MQTTPublishPacket) – thePUBLISHpacket sent by the client
- Return type:
- Returns:
Trueif this client accepted the message for delivery,Falseif not
- feed_bytes(data)
Input bytes received from the transport stream to the state machine.
The state machine parses and validates any packets found in the stream and alters its state accordingly. If the last packet is incomplete, it will remain in the inbound data buffer until a later call which completes the packet.
- Parameters:
data (
bytes) – bytes received from the transport stream- Return type:
- Returns:
a sequence of complete packets parsed from the bytes
- get_outbound_data()
Retrieve any bytes to be sent to the peer.
This method is not idempotent, as the returned bytes are removed from the outbound data buffer.
- Return type:
- property state: MQTTClientState
The current state of the client.
Concrete client implementations
- class mqttproto.async_client.AsyncMQTTClient(host_or_path=None, port=None, *, transport='tcp', websocket_path=None, ssl=False, client_id=None, username=None, password=None, clean_start=True, receive_maximum=65535, max_packet_size=None, will=None)
An asynchronous MQTT client.
Must be used as an asynchronous context manager.
- Parameters:
host_or_path (
str|None) – host name or UNIX socket path on the file system (defaults tolocalhostfor TCP based connections; must be provided for UNIX socket connections)port (
int|None) – port number to connect to in case of thetcptransport (defaults to 1883 or 8883 for direct MQTT depending on thesslparameter; 80 or 443 for websocket connections)transport (
Literal['tcp','unix']) – eithertcp(TCP) orunix(UNIX domain sockets)websocket_path (
str|None) – the URL path in the HTTP request when using websockets to connect to the MQTT broker (must be set to use websockets instead of direct MQTT)ssl (
bool|SSLContext) – to use TLS (SSL), set to eitherTrue(to use the default SSL context), or provide your own custom SSL context hereclient_id (
str|None) – custom client ID to use when connecting to the MQTT broker (needs to stay constant across restarts for session resumption to work)username (
str|None) – user name to use for authenticating against the MQTT brokerpassword (
str|None) – password to use for authenticating against the MQTT brokerclean_start (
bool) – ifFalse, try to resume a previous session (tied to the client ID)receive_maximum (
int) – number of unconfirmed QoS 1/2 messages that the client is willing to store at oncemax_packet_size (
int|None) – maximum packet size in bytes, orNonefor no limitwill (
Will|None) – message that will be published by the broker on the client’s behalf if the client disconnects unexpectedly or fails to communicate within the keepalive time
- async publish(topic, payload, *, qos=QoS.AT_MOST_ONCE, retain=False)
Publish a message to the given topic.
- subscribe(*patterns, maximum_qos=QoS.EXACTLY_ONCE, no_local=False, retain_as_published=True, retain_handling=RetainHandling.SEND_RETAINED)
Subscribe to the given topics or topic patterns.
- Parameters:
patterns (
str) – either exact topic names, or patterns containing wildcards (+or#)maximum_qos (
QoS) – maximum QoS to allow (messages matching the given patterns but with higher QoS will be downgraded to that QoS)no_local (
bool) – ifTrue, messages published by this client will not be sent back to it via this subscriptionretain_as_published (
bool) – ifFalse, the broker will clear theretainedflag of all published messages sent to the clientretain_handling (
RetainHandling) –If set to
SEND_RETAINED(the default), the broker will send any retained messages matching this subscription to this clientIf set to
SEND_RETAINED_IF_NOT_SUBSCRIBED, then the broker will only send retained messages if this subscription did not already existIf set to
NO_RETAINED, then retained messages will not be sent to this client
- Return type:
- Returns:
an async context manager that will yield messages matching the subscribed topics/patterns
- class mqttproto.async_client.AsyncMQTTSubscription(subscriptions)
- class mqttproto.sync_client.MQTTClient(host_or_path=None, port=None, *, transport='tcp', websocket_path=None, ssl=False, client_id=None, username=None, password=None, clean_start=True, receive_maximum=65535, max_packet_size=None, will=None)
- publish(topic, payload, *, qos=QoS.AT_MOST_ONCE, retain=False)
Publish a message to the given topic.
- subscribe(*patterns, maximum_qos=QoS.EXACTLY_ONCE, no_local=False, retain_as_published=True, retain_handling=RetainHandling.SEND_RETAINED)
Subscribe to the given topics or topic patterns.
- Parameters:
patterns (
str) – either exact topic names, or patterns containing wildcards (+or#)maximum_qos (
QoS) – maximum QoS to allow (messages matching the given patterns but with higher QoS will be downgraded to that QoS)no_local (
bool) – ifTrue, messages published by this client will not be sent back to it via this subscriptionretain_as_published (
bool) – ifFalse, the broker will clear theretainedflag of all published messages sent to the clientretain_handling (
RetainHandling) –If set to
SEND_RETAINED(the default), the broker will send any retained messages matching this subscription to this clientIf set to
SEND_RETAINED_IF_NOT_SUBSCRIBED, then the broker will only send retained messages if this subscription did not already existIf set to
NO_RETAINED, then retained messages will not be sent to this client
- Return type:
- Returns:
an async context manager that will yield messages matching the subscribed topics/patterns
- class mqttproto.sync_client.MQTTSubscription(async_subscription, portal)
Concrete broker implementation
- class mqttproto.async_broker.AsyncMQTTBroker(bind_address=('127.0.0.1', 1883), ssl_context=None, authenticators=NOTHING, authorizers=NOTHING)
A simple MQTT broker implementation.
- Parameters:
bind_address (
tuple[str,int] |str|bytes|PathLike[str] |PathLike[bytes]) – either a tuple of (address, port) or a path to a local UNIX socket to bind tossl_context (
SSLContext|None) – an SSL context which, if given, will be used to do the TLS handshake with incoming connectionsauthenticators (
Sequence[MQTTAuthenticator]) – a sequence of authenticators that will be called, starting from the first one, to authenticate a login attemptauthorizers (
Sequence[MQTTAuthorizer]) – a sequence of authorizers that will be called, starting from the first one, to authenticatePUBLISHandSUBSCRIBErequests
- async handle_packet(packet, client_state_machine, stream)
Called by
serve_client()to handle an MQTT packet received from the client.- Parameters:
packet (
MQTTPacket) – the received packetclient_state_machine (
MQTTBrokerClientStateMachine) – the client session state machinestream (
ByteStream) – the client’s transport stream
- Return type:
- async serve_client(stream)
Called to handle a connected client.
- Parameters:
stream (
ByteStream) – the byte stream for the client.- Return type:
- class mqttproto.async_broker.MQTTAuthenticator
- abstractmethod async authenticate(client_id, username, password, stream)
Determine whether the given client should be authenticated.
- Parameters:
client_id (
str) – the client IDusername (
str|None) – the username presented in theCONNECTpacketpassword (
str|None) – the password presented in theCONNECTpacketstream (
ByteStream) –the client’s transport stream; could be used, for example, for:
IP whitelist/blacklist checking
strong UNIX authentication
TLS client certificate checking
- Return type:
- Returns:
a reason code to either allow or deny the login, or
Noneto continue with the next authenticator
- class mqttproto.async_broker.MQTTAuthorizer
- abstractmethod async authorize_publish(topic, client_id, username, stream)
Determine whether the given client should be authenticated.
- Parameters:
topic (
str) – the topic in thePUBLISHpacketclient_id (
str) – the client IDusername (
str|None) – the username presented in theCONNECTpacketstream (
ByteStream) – the client’s transport stream
- Return type:
- Returns:
a reason code to either allow or deny the publish, or
Noneto continue with the next authorizer
- abstractmethod async authorize_subscribe(pattern, client_id, username, stream)
Determine whether the given client should be allowed to subscribe to the given topic pattern.
- Parameters:
pattern (
str) – the topic filterclient_id (
str) – the client IDusername (
str|None) – the username presented in theCONNECTpacketstream (
ByteStream) – the client’s transport stream
- Return type:
- Returns:
a reason code to either allow or deny the subscription, or
Noneto continue with the next authorizer