Developing new I/O implementations
This project contains both the appropriate state machines implementing the MQTT v5 protocol, and also asynchronous and synchronous implementations of both the MQTT client and broker. While the client implementation is intended for production use, the broker should only be used in very lightweight scenarios where high performance or a broad feature set are not requirements. In most cases, implementations from vendors like EMQX or HiveMQ are recommended instead.
What the state machines handle for you
There are 3 state machines provided by this project:
MQTTClientStateMachine: the MQTT client’s state machineMQTTBrokerStateMachine: the MQTT broker’s state machineMQTTBrokerClientStateMachine: the state machine representing an MQTT client session on the broker
The client and broker state machines handle as much of the MQTT protocol for you as possible without getting in the way:
Parsing inbound packets from bytes
Encoding outbound packets into bytes
Buffering bytes containing any incomplete packets
Raising the appropriate exceptions when malformed packets or protocol violations are detected
Transitioning to different states based on the received packets and the previous state
Sending automatic replies to certain packets (e.g.
PINGREQ), when this does not need a decision from the I/O implementation (this is why you must make sure you always flush the output too after receiving a packet)Sending retained messages to clients when they subscribe to matching topics
Keeping track of used packet IDs
Keeping track of the client’s active subscriptions
Keeping track of pending subscribe, unsubscribe and QoS 1/2 publish requests
Enforcing maximum message sizes
Enforcing the receive maximum (max number of unacknowledged QoS 1/2 messages)
What the state machines will NOT handle for you
The state machines don’t interact with the “real world”, meaning:
They cannot use any form of sockets or networking
They will not deal with timing or clocks
The broker state machine will not do any authentication or authorization
These restrictions also have the following practical consequences:
They will not send periodic
MQTTPingRequestPacketpackets to prevent keepalive-based disconnectionsThey will not send
Willmessages automatically, as this may require waiting for the delay to expireThey will not disconnect clients due to expiring keepalive timers
The I/O implementations will need to provide all this functionality.
Responsibilities of client I/O implementations
Responsibilities for client I/O implementations are:
Continuously read data from the transport stream, and pass it to the state machine using its
feed_bytes()methodAfter receiving any packet from the broker:
Fetch any outbound data to the transport stream using
get_outbound_data(), and send it to the transport stream. This is important, as the protocol might send automatic replies for certain messages.Check the session state. If the session has transitioned to the
DISCONNECTEDstate, then close the transport stream.
Responsibilities of broker I/O implementations
Responsibilities for broker I/O implementations are:
Continuously read data from the transport stream of each connected client, and pass it to the appropriate state machine using its
feed_bytes()methodAfter receiving any packet from a client:
Fetch any outbound data to the transport stream using
get_outbound_data(), and send it to the transport stream. This is important, as the protocol might send automatic replies for certain messages.Check the session state. If the session has transitioned to the
DISCONNECTEDstate, then close the transport stream.
When receiving an
MQTTConnectPacket, perform whatever authentication checks necessary for your implementation, and then useMQTTConnectPacket(respond usingacknowledge_connect())MQTTSubscribePacket(respond withacknowledge_subscribe())
If a message is published to the broker, attempt delivery to all client sessions. The protocol state machine will ensure that only clients with matching subscriptions actually receive the message.
If a client disconnects abrubtly (without sending a
DISCONNECTmessage with code 0x00, and it had aWill, publish that will after its specified delay interval, provided that the client does not resume its session before the delay expires.If a client has
keepalivegreater than 0, close the client’s transport stream after the configured period of inactivity (and remember to send anyWillmessage that was requested by the client
Debugging
The state machines log both inbound and outbound packets using the mqttproto logger,
on the DEBUG level. Configuring logging for
this logger will enable you to see exactly what’s being received and transmitted by the
state machines.
The concrete I/O implementations of client and broker use their respective loggers,
mqttproto.client and mqttproto.broker, respectively, also on the
DEBUG level.