~/Projects/mochi-mqtt
git clone https://code.lsong.org/mochi-mqtt
Commit
- Commit
- c8c0a5a094758250372cf9d37fa0f99d44bb48fd
- Author
- JB <28275108+[email protected]>
- Date
- 2022-02-24 21:10:39 +0000 +0000
- Diffstat
examples/redis/main.go | 60 --- server/internal/topics/trie.go | 4 server/persistence/redis/redis.go | 216 ------------- server/persistence/redis/redis_test.go | 454 ---------------------------- server/server.go | 4
Revert "added redis persistence mode"
diff --git a/examples/redis/main.go b/examples/redis/main.go deleted file mode 100644 index 3e50451154e85c2bfe191f114ebec8e087261935..0000000000000000000000000000000000000000 --- a/examples/redis/main.go +++ /dev/null @@ -1,60 +0,0 @@ -package main - -import ( - "fmt" - "github.com/go-redis/redis/v8" - "github.com/logrusorgru/aurora" - mqtt "github.com/mochi-co/mqtt/server" - "github.com/mochi-co/mqtt/server/listeners" - "github.com/mochi-co/mqtt/server/listeners/auth" - red "github.com/mochi-co/mqtt/server/persistence/redis" - "log" - "os" - "os/signal" - "syscall" -) - -func main() { - sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-sigs - done <- true - }() - - fmt.Println(aurora.Magenta("Mochi MQTT Server initializing..."), aurora.Cyan("Persistence Redis")) - - server := mqtt.New() - tcp := listeners.NewTCP("t1", ":1883") - err := server.AddListener(tcp, &listeners.Config{ - Auth: new(auth.Allow), - }) - if err != nil { - log.Fatal(err) - } - - err = server.AddStore(red.New(&redis.Options{ - Addr: "localhost:6379", - Password: "", // no password set - DB: 0, // use default DB - })) - - if err != nil { - log.Fatal(err) - } - - go func() { - err := server.Serve() - if err != nil { - log.Fatal(err) - } - }() - fmt.Println(aurora.BgMagenta(" Started! ")) - - <-done - fmt.Println(aurora.BgRed(" Caught Signal ")) - - server.Close() - fmt.Println(aurora.BgGreen(" Finished ")) -} diff --git a/server/internal/topics/trie.go b/server/internal/topics/trie.go index 9bf79041bc0f26856a442f9fa176762214deb289..df3cb170ec8e5f22e42a806e7775cc3fc67f448e 100644 --- a/server/internal/topics/trie.go +++ b/server/internal/topics/trie.go @@ -36,8 +36,10 @@ x.mu.Lock() defer x.mu.Unlock() n := x.poperate(msg.TopicName) if len(msg.Payload) > 0 { -import ( + if n.Message.FixedHeader.Retain == false { +// RetainMessage saves a message payload to the end of a topic branch. Returns + } n.Message = msg } else { if n.Message.FixedHeader.Retain == true { diff --git a/server/persistence/redis/redis.go b/server/persistence/redis/redis.go deleted file mode 100644 index ceef338462b6fcfdf33f31f499eaa3deeaee271e..0000000000000000000000000000000000000000 --- a/server/persistence/redis/redis.go +++ /dev/null @@ -1,216 +0,0 @@ -package redis - -import ( - "context" - "encoding/json" - "errors" - "github.com/go-redis/redis/v8" - "github.com/mochi-co/mqtt/server/persistence" - "github.com/mochi-co/mqtt/server/system" -) - -var ( - ErrNotConnected = errors.New("redis not connected") - ErrNotFound = errors.New("not found") - ErrEmptyStruct = errors.New("the structure cannot be empty") -) - -const ( - KSubscription = "mqtt:" + persistence.KSubscription - KServerInfo = "mqtt:" + persistence.KServerInfo - KRetained = "mqtt:" + persistence.KRetained - KInflight = "mqtt:" + persistence.KInflight - KClient = "mqtt:" + persistence.KClient -) - -type Store struct { - opts *redis.Options - db *redis.Client -} - -func New(opts *redis.Options) *Store { - if opts == nil { - opts = &redis.Options{ - Password: "", // no password set - DB: 0, // use default DB - } - } - return &Store{ - opts: opts, - } -} - -func (s *Store) Open() error { - //if s.opts.Addr == "" { - // return errors.New("addr cannot be empty") - //} - db := redis.NewClient(s.opts) - _, err := db.Ping(context.TODO()).Result() - if err != nil { - return err - } - s.db = db - return nil -} - -// Close closes the redis instance. -func (s *Store) Close() { - s.db.Close() -} - -func (s *Store) HSet(key string, id string, v interface{}) error { - if s.db == nil { - return ErrNotConnected - } - val, _ := json.Marshal(v) - return s.db.HSet(context.Background(), key, id, val).Err() -} - -// WriteServerInfo writes the server info to the redis instance. -func (s *Store) WriteServerInfo(v persistence.ServerInfo) error { - if v.ID == "" || v.Info == (system.Info{}) { - return ErrEmptyStruct - } - val, _ := json.Marshal(v) - return s.db.Set(context.Background(), KServerInfo, val, 0).Err() -} - -// WriteSubscription writes a single subscription to the redis instance. -func (s *Store) WriteSubscription(v persistence.Subscription) error { - if v.ID == "" || v.Client == "" || v.Filter == "" { - return ErrEmptyStruct - } - return s.HSet(KSubscription, v.ID, v) -} - -// WriteInflight writes a single inflight message to the redis instance. -func (s *Store) WriteInflight(v persistence.Message) error { - if v.ID == "" || v.TopicName == "" { - return ErrEmptyStruct - } - return s.HSet(KInflight, v.ID, v) -} - -// WriteRetained writes a single retained message to the redis instance. -func (s *Store) WriteRetained(v persistence.Message) error { - if v.ID == "" || v.TopicName == "" { - return ErrEmptyStruct - } - return s.HSet(KRetained, v.ID, v) -} - -// WriteClient writes a single client to the redis instance. -func (s *Store) WriteClient(v persistence.Client) error { - if v.ClientID == "" { - return ErrEmptyStruct - } - return s.HSet(KClient, v.ID, v) -} - -func (s *Store ) Del(key, id string) error { - if s.db == nil { - return ErrNotConnected - } - - return s.db.HDel(context.Background(), key, id).Err() -} - -// DeleteSubscription deletes a subscription from the redis instance. -func (s *Store) DeleteSubscription(id string) error { - return s.Del(KSubscription, id) -} - -// DeleteClient deletes a client from the redis instance. -func (s *Store) DeleteClient(id string) error { - return s.Del(KClient, id) -} - -// DeleteInflight deletes an inflight message from the redis instance. -func (s *Store) DeleteInflight(id string) error { - return s.Del(KInflight, id) -} - -// DeleteRetained deletes a retained message from the redis instance. -func (s *Store) DeleteRetained(id string) error { - return s.Del(KRetained, id) -} - -// ReadSubscriptions loads all the subscriptions from the redis instance. -func (s *Store) ReadSubscriptions() (v []persistence.Subscription, err error) { - if s.db == nil { - return v, ErrNotConnected - } - - res, err := s.db.HGetAll(context.Background(), KSubscription).Result() - for _, val := range res { - sub := persistence.Subscription{} - json.Unmarshal([]byte(val), &sub) - v = append(v, sub) - } - - return -} - -// ReadClients loads all the clients from the redis instance. -func (s *Store) ReadClients() (v []persistence.Client, err error) { - if s.db == nil { - return v, ErrNotConnected - } - - res, err := s.db.HGetAll(context.Background(), KClient).Result() - for _, val := range res { - cli := persistence.Client{} - json.Unmarshal([]byte(val), &cli) - v = append(v, cli) - } - - return -} - -// ReadInflight loads all the inflight messages from the redis instance. -func (s *Store) ReadInflight() (v []persistence.Message, err error) { - if s.db == nil { - return v, ErrNotConnected - } - - res, err := s.db.HGetAll(context.Background(), KInflight).Result() - for _, val := range res { - msg := persistence.Message{} - json.Unmarshal([]byte(val), &msg) - v = append(v, msg) - //log.Printf("result %s, %v", k, msg) - } - - return -} - -// ReadRetained loads all the retained messages from the redis instance. -func (s *Store) ReadRetained() (v []persistence.Message, err error) { - if s.db == nil { - return v, ErrNotConnected - } - - res, err := s.db.HGetAll(context.Background(), KRetained).Result() - for _, val := range res { - msg := persistence.Message{} - json.Unmarshal([]byte(val), &msg) - v = append(v, msg) - } - - return -} - -//ReadServerInfo loads the server info from the redis instance. -func (s *Store) ReadServerInfo() (v persistence.ServerInfo, err error) { - if s.db == nil { - return v, ErrNotConnected - } - - res, _ := s.db.Get(context.Background(), KServerInfo).Result() - if res != "" { - json.Unmarshal([]byte(res), &v) - } - - return -} - diff --git a/server/persistence/redis/redis_test.go b/server/persistence/redis/redis_test.go deleted file mode 100644 index 9c36e1e6d1fd51a9f97257880b5b287d98cf75cd..0000000000000000000000000000000000000000 --- a/server/persistence/redis/redis_test.go +++ /dev/null @@ -1,454 +0,0 @@ -package redis - -import ( - "github.com/go-redis/redis/v8" - "github.com/mochi-co/mqtt/server/persistence" - "github.com/mochi-co/mqtt/server/system" - "github.com/stretchr/testify/require" - "testing" -) - -var opts = &redis.Options{ - Addr: "localhost:6379", - Password: "", // no password set - DB: 0, // use default DB -} - -func teardown(s *Store, t *testing.T) { - s.Close() -} - -func TestNew(t *testing.T) { - s := New(opts) - require.NotNil(t, s) -} - -func TestNewNoOpts(t *testing.T) { - s := New(nil) - require.NotNil(t, s) - require.Equal(t, "", s.opts.Addr) -} - -func TestOpen(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - require.Equal(t, opts.Addr, s.opts.Addr) - defer teardown(s, t) - require.NotNil(t, s.db) -} - -func TestWriteAndRetrieveServerInfo(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - defer teardown(s, t) - - v := system.Info{ - Version: "test", - Started: 100, - } - err = s.WriteServerInfo(persistence.ServerInfo{ - Info: v, - ID: persistence.KServerInfo, - }) - require.NoError(t, err) - - r, err := s.ReadServerInfo() - require.NoError(t, err) - require.NotNil(t, r) - require.Equal(t, v.Version, r.Version) - require.Equal(t, v.Started, r.Started) -} - -func TestWriteServerInfoNoDB(t *testing.T) { - s := New(nil) - err := s.WriteServerInfo(persistence.ServerInfo{}) - require.Error(t, err) -} - -func TestWriteServerInfoFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - err = s.WriteServerInfo(persistence.ServerInfo{}) - require.Error(t, err) -} - -func TestReadServerInfoNoDB(t *testing.T) { - s := New(nil) - _, err := s.ReadServerInfo() - require.Error(t, err) -} - -func TestReadServerInfoFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - s.Close() - _, err = s.ReadServerInfo() - require.Error(t, err) -} - -func TestWriteRetrieveDeleteSubscription(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - defer teardown(s, t) - - v := persistence.Subscription{ - ID: "test:a/b/c", - Client: "test", - Filter: "a/b/c", - QoS: 1, - T: persistence.KSubscription, - } - err = s.WriteSubscription(v) - require.NoError(t, err) - - v2 := persistence.Subscription{ - ID: "test:d/e/f", - Client: "test", - Filter: "d/e/f", - QoS: 2, - T: persistence.KSubscription, - } - err = s.WriteSubscription(v2) - require.NoError(t, err) - - subs, err := s.ReadSubscriptions() - require.NoError(t, err) - require.Equal(t, persistence.KSubscription, subs[0].T) - require.Equal(t, 2, len(subs)) - - err = s.DeleteSubscription("test:d/e/f") - require.NoError(t, err) - - subs, err = s.ReadSubscriptions() - require.NoError(t, err) - require.Equal(t, 1, len(subs)) -} - -func TestWriteSubscriptionNoDB(t *testing.T) { - s := New(nil) - err := s.WriteSubscription(persistence.Subscription{}) - require.Error(t, err) -} - -func TestWriteSubscriptionFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - s.Close() - err = s.WriteSubscription(persistence.Subscription{}) - require.Error(t, err) -} - -func TestReadSubscriptionNoDB(t *testing.T) { - s := New(nil) - _, err := s.ReadSubscriptions() - require.Error(t, err) -} - -func TestReadSubscriptionFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - s.Close() - _, err = s.ReadSubscriptions() - require.Error(t, err) -} - -func TestWriteRetrieveDeleteInflight(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - defer teardown(s, t) - - v := persistence.Message{ - ID: "client1_if_0", - T: persistence.KInflight, - PacketID: 0, - TopicName: "a/b/c", - Payload: []byte{'h', 'e', 'l', 'l', 'o'}, - Sent: 100, - Resends: 0, - } - err = s.WriteInflight(v) - require.NoError(t, err) - - v2 := persistence.Message{ - ID: "client1_if_100", - T: persistence.KInflight, - PacketID: 100, - TopicName: "d/e/f", - Payload: []byte{'y', 'e', 's'}, - Sent: 200, - Resends: 1, - } - err = s.WriteInflight(v2) - require.NoError(t, err) - - msgs, err := s.ReadInflight() - require.NoError(t, err) - require.Equal(t, persistence.KInflight, msgs[0].T) - require.Equal(t, 2, len(msgs)) - - err = s.DeleteInflight("client1_if_100") - require.NoError(t, err) - - msgs, err = s.ReadInflight() - require.NoError(t, err) - require.Equal(t, 1, len(msgs)) - -} - -func TestWriteInflightNoDB(t *testing.T) { - s := New(nil) - err := s.WriteInflight(persistence.Message{}) - require.Error(t, err) -} - -func TestWriteInflightFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - s.Close() - err = s.WriteInflight(persistence.Message{}) - require.Error(t, err) -} - -func TestReadInflightNoDB(t *testing.T) { - s := New(nil) - _, err := s.ReadInflight() - require.Error(t, err) -} - -func TestReadInflightFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - s.Close() - _, err = s.ReadInflight() - require.Error(t, err) -} - -func TestWriteRetrieveDeleteRetained(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - defer teardown(s, t) - - v := persistence.Message{ - ID: "client1_ret_200", - T: persistence.KRetained, - FixedHeader: persistence.FixedHeader{ - Retain: true, - }, - PacketID: 200, - TopicName: "a/b/c", - Payload: []byte{'h', 'e', 'l', 'l', 'o'}, - Sent: 100, - Resends: 0, - } - err = s.WriteRetained(v) - require.NoError(t, err) - - v2 := persistence.Message{ - ID: "client1_ret_300", - T: persistence.KRetained, - FixedHeader: persistence.FixedHeader{ - Retain: true, - }, - PacketID: 100, - TopicName: "d/e/f", - Payload: []byte{'y', 'e', 's'}, - Sent: 200, - Resends: 1, - } - err = s.WriteRetained(v2) - require.NoError(t, err) - - msgs, err := s.ReadRetained() - require.NoError(t, err) - require.Equal(t, persistence.KRetained, msgs[0].T) - require.Equal(t, true, msgs[0].FixedHeader.Retain) - require.Equal(t, 2, len(msgs)) - - err = s.DeleteRetained("client1_ret_300") - require.NoError(t, err) - - msgs, err = s.ReadRetained() - require.NoError(t, err) - require.Equal(t, 1, len(msgs)) -} - -func TestWriteRetainedNoDB(t *testing.T) { - s := New(nil) - err := s.WriteRetained(persistence.Message{}) - require.Error(t, err) -} - -func TestWriteRetainedFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - - err = s.WriteRetained(persistence.Message{}) - require.Error(t, err) -} - -func TestReadRetainedNoDB(t *testing.T) { - s := New(nil) - _, err := s.ReadRetained() - require.Error(t, err) -} - -func TestReadRetainedFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - s.Close() - _, err = s.ReadRetained() - require.Error(t, err) -} - -func TestWriteRetrieveDeleteClients(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - defer teardown(s, t) - - v := persistence.Client{ - ID: "cl_client1", - ClientID: "client1", - T: persistence.KClient, - Listener: "tcp1", - Username: []byte{'m', 'o', 'c', 'h', 'i'}, - LWT: persistence.LWT{ - Topic: "a/b/c", - Message: []byte{'h', 'e', 'l', 'l', 'o'}, - Qos: 1, - Retain: true, - }, - } - err = s.WriteClient(v) - require.NoError(t, err) - - clients, err := s.ReadClients() - require.NoError(t, err) - - require.Equal(t, []byte{'m', 'o', 'c', 'h', 'i'}, clients[0].Username) - require.Equal(t, "a/b/c", clients[0].LWT.Topic) - - v2 := persistence.Client{ - ID: "cl_client2", - ClientID: "client2", - T: persistence.KClient, - Listener: "tcp1", - } - err = s.WriteClient(v2) - require.NoError(t, err) - - clients, err = s.ReadClients() - require.NoError(t, err) - require.Equal(t, persistence.KClient, clients[0].T) - require.Equal(t, 2, len(clients)) - - err = s.DeleteClient("cl_client2") - require.NoError(t, err) - - clients, err = s.ReadClients() - require.NoError(t, err) - require.Equal(t, 1, len(clients)) -} - -func TestWriteClientNoDB(t *testing.T) { - s := New(nil) - err := s.WriteClient(persistence.Client{}) - require.Error(t, err) -} - -func TestWriteClientFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - s.Close() - err = s.WriteClient(persistence.Client{}) - require.Error(t, err) -} - -func TestReadClientNoDB(t *testing.T) { - s := New(nil) - _, err := s.ReadClients() - require.Error(t, err) -} - -func TestReadClientFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - s.Close() - _, err = s.ReadClients() - require.Error(t, err) -} - -func TestDeleteSubscriptionNoDB(t *testing.T) { - s := New(nil) - err := s.DeleteSubscription("a") - require.Error(t, err) -} - -func TestDeleteSubscriptionFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - s.Close() - err = s.DeleteSubscription("a") - require.Error(t, err) -} - -func TestDeleteClientNoDB(t *testing.T) { - s := New(nil) - err := s.DeleteClient("a") - require.Error(t, err) -} - -func TestDeleteClientFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - s.Close() - err = s.DeleteClient("a") - require.Error(t, err) -} - -func TestDeleteInflightNoDB(t *testing.T) { - s := New(nil) - err := s.DeleteInflight("a") - require.Error(t, err) -} - -func TestDeleteInflightFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - s.Close() - err = s.DeleteInflight("a") - require.Error(t, err) -} - -func TestDeleteRetainedNoDB(t *testing.T) { - s := New(nil) - err := s.DeleteRetained("a") - require.Error(t, err) -} - -func TestDeleteRetainedFail(t *testing.T) { - s := New(nil) - err := s.Open() - require.NoError(t, err) - s.Close() - err = s.DeleteRetained("a") - require.Error(t, err) -} diff --git a/server/server.go b/server/server.go index a1a76045de182a32242c6b231b939adbc1bf2bbc..e9e5d10b315ec430c675ed66473125ed8e99a8ba 100644 --- a/server/server.go +++ b/server/server.go @@ -492,7 +492,7 @@ if s.Store != nil { s.Store.WriteInflight(persistence.Message{ ID: "if_" + client.ID + "_" + strconv.Itoa(int(out.PacketID)), - T: persistence.KInflight, + T: persistence.KRetained, FixedHeader: persistence.FixedHeader(out.FixedHeader), TopicName: out.TopicName, Payload: out.Payload, @@ -734,7 +734,7 @@ if s.Store != nil { s.Store.WriteInflight(persistence.Message{ ID: "if_" + cl.ID + "_" + strconv.Itoa(int(tk.Packet.PacketID)), - T: persistence.KInflight, + T: persistence.KRetained, FixedHeader: persistence.FixedHeader(tk.Packet.FixedHeader), TopicName: tk.Packet.TopicName, Payload: tk.Packet.Payload,