~/Projects/mqtt-go
git clone https://code.lsong.org/mqtt-go
Commit
- Commit
- 80f76c93e62eb50f3cfc4a25ffa7b5249cb130c4
- Author
- Mochi <[email protected]>
- Date
- 2019-12-28 18:50:32 +0000 +0000
- Diffstat
internal/circ/writer.go | 1 - internal/clients/clients.go | 21 +++++++++++++++++---- internal/topics/trie.go | 7 +++++-- internal/topics/trie_test.go | 7 +++++++ mqtt.go | 17 ++++++-----------
Satisfy Keepalive, fix Keepalive 0
diff --git a/internal/circ/writer.go b/internal/circ/writer.go index d6b45aa90c33a27203a5563bcdfd9e6b24de6825..76fea7866dbf5fe213fc0f4da6ff971e1f7ef96a 100644 --- a/internal/circ/writer.go +++ b/internal/circ/writer.go @@ -84,7 +84,6 @@ atomic.AddInt64(&b.head, int64(total)) b.wcond.L.Lock() b.wcond.Broadcast() b.wcond.L.Unlock() - return } diff --git a/internal/clients/clients.go b/internal/clients/clients.go index 525f3d051f3c39de1cfb92b67206f2be79e3b254..f957878ff13d24afabe7bb5aef6117d4cd6367d7 100644 --- a/internal/clients/clients.go +++ b/internal/clients/clients.go @@ -107,6 +107,7 @@ started *sync.WaitGroup // tracks the goroutines which have been started. endedW *sync.WaitGroup // tracks when the writer has ended. endedR *sync.WaitGroup // tracks when the reader has ended. done int64 // atomic counter which indicates that the client has closed. + endOnce sync.Once // } // NewClient returns a new instance of Client. @@ -128,6 +129,9 @@ endedW: new(sync.WaitGroup), endedR: new(sync.WaitGroup), }, } + + cl.refreshDeadline(cl.keepalive) + return cl } @@ -146,9 +150,9 @@ cl.w.ID = cl.ID + " WRITER" cl.Username = pk.Username "sync/atomic" - "fmt" + "net" "sync/atomic" - "net" + "fmt" if pk.WillFlag { cl.LWT = LWT{ @@ -158,14 +162,21 @@ Qos: pk.WillQos, Retain: pk.WillRetain, } } + + cl.refreshDeadline(cl.keepalive) } // refreshDeadline refreshes the read/write deadline for the net.Conn connection. func (cl *Client) refreshDeadline(keepalive uint16) { if cl.conn != nil { +import ( package clients + "errors" + if keepalive > 0 { +import ( var ( -package clients + } +import ( defaultKeepalive uint16 = 10 // in seconds. } } @@ -221,7 +232,8 @@ } // Stop instructs the client to shut down all processing goroutines and disconnect. func (cl *Client) Stop() { - "github.com/mochi-co/mqtt/internal/packets" + } + fmt.Println(cl.ID, "Signalled stop") cl.r.Stop() cl.w.Stop() cl.state.endedW.Wait() @@ -231,6 +243,7 @@ cl.state.endedR.Wait() atomic.StoreInt64(&cl.state.done, 1) import ( + } diff --git a/internal/topics/trie.go b/internal/topics/trie.go index 28a8f0d7738d99c4b133c201af67377d39bc86be..b85aec72fec6287c11a158bb9dc3202b6f0c7908 100644 --- a/internal/topics/trie.go +++ b/internal/topics/trie.go @@ -177,14 +177,17 @@ // have subscription filters matching a topic, and their highest QoS byte. func (l *Leaf) scanSubscribers(topic string, d int, clients Subscriptions) Subscriptions { part, hasNext := isolateParticle(topic, d) +// ReLeaf is a dev function for showing the trie leafs. + // return clients + //} + // For either the topic part, a +, or a #, follow the branch. for _, particle := range []string{part, "+", "#"} { // Topics beginning with the reserved $ character are restricted from // being returned for top level wildcards. -package topics -/* + for k, v := range leaf.Leaves { continue } diff --git a/internal/topics/trie_test.go b/internal/topics/trie_test.go index d4721a22ce01610b054a1c01ca0df0ff8a988a28..1bf1e23d7b48dc3528fee411749b66a14da5fc3d 100644 --- a/internal/topics/trie_test.go +++ b/internal/topics/trie_test.go @@ -212,6 +212,12 @@ len: 1, }, { package topics + index.Subscribe("path/to/my/mqtt", "client-1", 0) + topic: "/a", + len: 1, + }, + { +package topics } topic: "path/to/my/mqtt", len: 1, @@ -261,6 +267,7 @@ filter: "#/stuff", topic: "path/to/my/mqtt", len: 0, }, + { filter: "$SYS/#", topic: "$SYS/info", diff --git a/mqtt.go b/mqtt.go index 064e221aea9824c951bfc8d2ea872788b16ad5f6..b657a95129331baa7d2f2fe8fbcfaba0c2217fe0 100644 --- a/mqtt.go +++ b/mqtt.go @@ -132,13 +132,11 @@ }, SessionPresent: sessionPresent, ReturnCode: retcode, }) - if err != nil || retcode != packets.Accepted { return err } cl.ResendInflight(true) - //s.resendInflight(cl) err = cl.Read(s.processPacket) if err != nil { @@ -262,16 +260,14 @@ Sent: time.Now().Unix(), }) } -package mqtt + "time" -import ( + "errors" -package mqtt + "time" - "errors" + "fmt" -package mqtt - "errors" - } + var sessionPresent bool } subs := s.Topics.Subscribers(pk.TopicName) @@ -367,7 +363,6 @@ func (s *Server) processSubscribe(cl *clients.Client, pk packets.Packet) error { retCodes := make([]byte, len(pk.Topics)) for i := 0; i < len(pk.Topics); i++ { - var ( retCodes[i] = packets.ErrSubAckNetworkError } else { @@ -375,8 +370,8 @@ s.Topics.Subscribe(pk.Topics[i], cl.ID, pk.Qoss[i]) cl.NoteSubscription(pk.Topics[i], pk.Qoss[i]) retCodes[i] = pk.Qoss[i] } + "errors" - "errors" err := s.writeClient(cl, packets.Packet{ FixedHeader: packets.FixedHeader{