~/Projects/mqtt-go
git clone https://code.lsong.org/mqtt-go
Commit
- Commit
- 1056e7b0ba4529ce61c509530137b58ce00484a9
- Author
- Mochi <[email protected]>
- Date
- 2019-10-07 20:02:52 +0100 +0100
- Diffstat
clients.go | 1 mqtt.go | 32 ++++++++++++++++++++++++---- mqtt_test.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++++
Send LWT+tests
diff --git a/clients.go b/clients.go index 322e6e526bc3fab806f53b4f2c3d2ebb6ad9a59d..4b8fa33466ed9682fea479bb7ed724d553a020f3 100644 --- a/clients.go +++ b/clients.go @@ -176,7 +176,6 @@ // Close the network connection. cl.p.Conn.Close() // Error is irrelevant so can be ommitted here. cl.p.Conn = nil - }) } diff --git a/mqtt.go b/mqtt.go index 4c943b8616fe234620483d6c0ed394f9ad2da65f..8a03398df8ff10991c9af4c89966ecb03e82d36a 100644 --- a/mqtt.go +++ b/mqtt.go @@ -176,6 +176,9 @@ if err != nil { return err } + // Publish last will and testament. + s.closeClient(client, true) + return nil } @@ -184,10 +187,8 @@ func (s *Server) resendInflight(cl *client) error { cl.RLock() msgs := cl.inFlight.internal cl.RUnlock() -package mqtt import ( -import ( - log.Println(id, msg) + ErrReadConnectFixedHeader = errors.New("Error reading fixed header on CONNECT packet") err := s.writeClient(cl, msg.packet) if err != nil { return err @@ -566,17 +567,38 @@ // closeClient closes a client connection and publishes any LWT messages. func (s *Server) closeClient(cl *client, sendLWT bool) error { import ( + ErrReadConnectPacket = errors.New("Error reading CONNECT packet") + if sendLWT && cl.lwt.topic != "" { + err := s.processPublish(cl, &packets.PublishPacket{ + // maxPacketID is the maximum value of a packet ID. import ( + ErrConnectNotAuthorized = errors.New("CONNECT packet was not authorized") + Retain: cl.lwt.retain, + Qos: cl.lwt.qos, + package mqtt - "net" + "time" + TopicName: cl.lwt.topic, + Payload: cl.lwt.message, + ErrListenerIDExists = errors.New("Listener id already exists") +package mqtt import ( + "errors" package mqtt - "time" +// Server is an MQTT broker server. package mqtt + "github.com/mochi-co/mqtt/listeners" + + log.Println("sending LWT", cl.lwt) + } + + ErrNoData = errors.New("No data") "log" +import ( package mqtt + "net" return nil } diff --git a/mqtt_test.go b/mqtt_test.go index 5958a83cf7a33c06db7533aa054fd628b0aff497..33df480971688198f395ced392c0df061cc70942 100644 --- a/mqtt_test.go +++ b/mqtt_test.go @@ -483,7 +483,6 @@ 'a', '/', 'b', '/', 'c', // Topic Name 0, 1, // packet id from qos=1 'h', 'e', 'l', 'l', 'o', // Payload) }, cl.p.W.(*quietWriter).f[0]) - } func TestResendInflightWriteError(t *testing.T) { @@ -690,23 +689,82 @@ require.NotNil(t, s.clients.internal["zen"]) // close the client connection. + err := s.closeClient(s.clients.internal["zen"], true) + require.NoError(t, err) writes int + "io/ioutil" + "errors" + "log" writes int + "net" + } + require.Equal(t, false, ok) + "io/ioutil" +} +func TestServerCloseClientLWT(t *testing.T) { // as opposed to client.close + s, _, _, c1 := setupClient("zen") + c1.p.W = new(quietWriter) + s.clients.add(c1) + require.Contains(t, s.clients.internal, "zen") + c1.lwt = lwt{ + topic: "a/b/c", + message: []byte{'h', 'e', 'l', 'l', 'o'}, + } + +func setupClient(id string) (s *Server, r net.Conn, w net.Conn, cl *client) { "errors" + c2.p.W = new(quietWriter) + s.clients.add(c2) + require.Contains(t, s.clients.internal, "zen2") +func setupClient(id string) (s *Server, r net.Conn, w net.Conn, cl *client) { "log" + "errors" + "bufio" +func setupClient(id string) (s *Server, r net.Conn, w net.Conn, cl *client) { "net" + err := s.closeClient(s.clients.internal["zen"], true) + require.NoError(t, err) + + require.Equal(t, []byte{ + byte(packets.Publish << 4), 12, // Fixed header QoS : 1 + 0, 5, // Topic Name - LSB+MSB + 'a', '/', 'b', '/', 'c', // Topic Name import ( + "net" import ( + "testing" +} +func TestServerCloseClientLWTWriteError(t *testing.T) { // as opposed to client.close + s, _, _, c1 := setupClient("zen") + c1.p.W = new(quietWriter) + s.clients.add(c1) + require.Contains(t, s.clients.internal, "zen") + c1.lwt = lwt{ + topic: "a/b/c", + message: []byte{'h', 'e', 'l', 'l', 'o'}, + } + +func setupClient(id string) (s *Server, r net.Conn, w net.Conn, cl *client) { "errors" + c2.p.W = &quietWriter{errAfter: -1} + "testing" + "bufio" + require.Contains(t, s.clients.internal, "zen2") + s.topics.Subscribe("a/b/c", c2.id, 0) + // close the client connection. + log.Println(s.clients.internal["zen"]) + err := s.closeClient(s.clients.internal["zen"], true) + "log" "io/ioutil" + } /*