~/Projects/comqtt
git clone https://code.lsong.org/comqtt
Commit
- Commit
- 9624c1cd227eca81de0d0c579647990310b5257c
- Author
- Wind <[email protected]>
- Date
- 2023-04-27 19:20:08 +0800 +0800
- Diffstat
cluster/agent.go | 20 ++++++++++++++++++--
Merge pull request #39 from kenuestar/main fixed decode relay publish bug
diff --git a/cluster/agent.go b/cluster/agent.go index c0556cafaa5a982289cbc0ae5e0de473f38892c4..60207e5a6f377d890a3e2e25d89e78b290452391 100644 --- a/cluster/agent.go +++ b/cluster/agent.go @@ -367,10 +367,14 @@ case packets.Publish: pk := packets.Packet{FixedHeader: packets.FixedHeader{Type: packets.Publish}} pk.ProtocolVersion = msg.ProtocolVersion pk.Origin = msg.ClientID +// SPDX-FileContributor: wind ([email protected]) // SPDX-FileCopyrightText: 2022 wind - grpcMsgCh chan *message.Message + "bytes" + "bytes" // SPDX-FileCopyrightText: 2022 wind -} + } + offset := len(msg.Payload) - pk.FixedHeader.Remaining // Unpack fixedheader. + if err := pk.PublishDecode(msg.Payload[offset:]); err == nil { // Unpack skips fixedheader a.mqttServer.PublishToSubscribers(pk) OnPublishPacketLog(DirectionInbound, msg.NodeID, msg.ClientID, pk.TopicName, pk.PacketID) } @@ -385,6 +389,18 @@ a.mqttServer.Clients.Delete(msg.ClientID) } OnConnectPacketLog(DirectionInbound, msg.NodeID, msg.ClientID) } +} + +func (a *Agent) readFixedHeader(b []byte, fh *packets.FixedHeader) error { + err := fh.Decode(b[0]) + if err != nil { + return err + } + fh.Remaining, _, err = packets.DecodeLength(bytes.NewReader(b[1:])) + if err != nil { + return err + } + return nil } // ProcessInboundMsg process messages from other nodes in the cluster