Introduction
MQTT 是設計給 IOT 的 messaging protocal (pub/sub), 因為它很輕量且效率好, 在 client 實作上它比 AMQP 需要更少的資源, 另外 mqtt 本身沒有支援 authorization (這段需要在 server 端自已實作)
Install client command
node.js 的 mqtt tool
npm install mqtt --save
npm install mqtt -g
Command
Subscribe
mqtt sub -h my-mqtt-server.com -t my_topic -u mqtt_username -P mqtt_password
Publish
mqtt pub -h my-mqtt-server.com -t my_topic -u mqtt_username -P mqtt_password -m 'Hello world'
Via SSL
mqtt pub -h my-mqtt-server.com -p 8883 -C mqtts ...(略)
Install broker (mqtt server)
Pull docker image - eclipse-mosquitto
docker pull eclipse-mosquitto
Create a folder for persistent data
Create a config: mosquitto.conf
persistence true
persistence_location /{ABSOLUTE_PATH}/mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
Run container
docker run -it -p 1883:1883 -p 9001:9001 -v /{ABSOLUTE_PATH}/mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Connect to tcp://localhost:1883
, no username/password needed
QoS
- QoS 0 : received at most once : The packet is sent, and that’s it. There is no validation about whether it has been received.
- QoS 1 : received at least once : The packet is sent and stored as long as the client has not received a confirmation from the server. MQTT ensures that it will be received, but there can be duplicates.
- QoS 2 : received exactly once : Same as QoS 1 but there is no duplicates.
介紹
這是 golang 實作的 mqtt package, 目前只有 client, 沒有實作 broker
操作
New & Connect & Close
func New(params map[string]string) (MQTT.Client, error) {
opts := MQTT.NewClientOptions()
opts.SetKeepAlive(4 * time.Second)
opts.SetPingTimeout(2 * time.Second)
opts.AddBroker(params["broker"])
opts.SetClientID(params["client_id"])
opts.SetUsername(params["username"])
opts.SetPassword(params["password"])
opts.SetAutoReconnect(true)
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
return client, nil
}
params := map[string]string{
"broker": "ssl://mqtt.example.com:8883",
"client_id": "Client ID",
"username": "Username",
"password": "Password",
}
mqtt_client, err := New(params)
if err != nil {
return errors.New("New mqtt err: " + err.Error())
}
// Close
defer mqtt_client.Disconnect(250)
Publish
topic := "test/mqtt"
for {
msg := strconv.FormatInt(time.Now().UnixNano(), 10)
if token := mqtt_client.Publish(pub_topic, 0, false, "Hello"); token.Wait() && token.Error() != nil {
log.Println(token.Error())
} else {
log.Printf("Successfully published `%s` to `%s`\n", msg, topic)
}
}
publish 建議用 token.WaitTimeout, 避免極小機率造成 deadlock, ref: https://github.com/eclipse/paho.mqtt.golang/issues/185
Subscribe
var msg_payload, msg_topic string
if token := mqtt_client.Subscribe("test/mqtt", 0, func(client MQTT.Client, msg MQTT.Message) {
msg_payload = string(msg.Payload())
msg_topic = string(msg.Topic())
}); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
var pre_msg string
for {
time.Sleep(300 * time.Millisecond)
if msg_payload != pre_msg {
fmt.Printf("Successfully received `%s` from `%s`\n", msg_payload, msg_topic)
}
pre_msg = msg_payload
}
Unsubscribe
if token := mqtt_client.Unsubscribe(sub_topic); token.Wait() && token.Error() != nil {
return token.Error()
}