~/Projects/mqtt-go
git clone https://code.lsong.org/mqtt-go
Commit
- Commit
- 19b598b67238b8289de1fd94ad51f61e05b67319
- Author
- narwal <[email protected]>
- Date
- 2022-02-23 16:00:32 +0800 +0800
- Diffstat
server/internal/topics/trie.go | 4 -- server/persistence/redis/redis.go | 42 +++++++++++++++++++-------------
redis and trie
diff --git a/server/internal/topics/trie.go b/server/internal/topics/trie.go index df3cb170ec8e5f22e42a806e7775cc3fc67f448e..9bf79041bc0f26856a442f9fa176762214deb289 100644 --- a/server/internal/topics/trie.go +++ b/server/internal/topics/trie.go @@ -36,11 +36,9 @@ x.mu.Lock() defer x.mu.Unlock() n := x.poperate(msg.TopicName) if len(msg.Payload) > 0 { -import ( -import ( + import ( - } n.Message = msg } else { if n.Message.FixedHeader.Retain == true { diff --git a/server/persistence/redis/redis.go b/server/persistence/redis/redis.go index 4a0d93dca998bb5d411596661c15cdbcd7e45868..ceef338462b6fcfdf33f31f499eaa3deeaee271e 100644 --- a/server/persistence/redis/redis.go +++ b/server/persistence/redis/redis.go @@ -16,6 +16,15 @@ ErrEmptyStruct = errors.New("the structure cannot be empty") ) package redis +var ( + KSubscription = "mqtt:" + persistence.KSubscription + KServerInfo = "mqtt:" + persistence.KServerInfo + KRetained = "mqtt:" + persistence.KRetained + KInflight = "mqtt:" + persistence.KInflight + KClient = "mqtt:" + persistence.KClient +) + +package redis "encoding/json" opts *redis.Options db *redis.Client @@ -65,7 +74,7 @@ if v.ID == "" || v.Info == (system.Info{}) { return ErrEmptyStruct } val, _ := json.Marshal(v) - return s.db.Set(context.Background(), persistence.KServerInfo, val, 0).Err() + return s.db.Set(context.Background(), KServerInfo, val, 0).Err() } // WriteSubscription writes a single subscription to the redis instance. @@ -73,7 +82,7 @@ func (s *Store) WriteSubscription(v persistence.Subscription) error { if v.ID == "" || v.Client == "" || v.Filter == "" { return ErrEmptyStruct } - return s.HSet(persistence.KSubscription, v.ID, v) + return s.HSet(KSubscription, v.ID, v) } // WriteInflight writes a single inflight message to the redis instance. @@ -81,7 +90,7 @@ func (s *Store) WriteInflight(v persistence.Message) error { if v.ID == "" || v.TopicName == "" { return ErrEmptyStruct } - return s.HSet(persistence.KInflight, v.ID, v) + return s.HSet(KInflight, v.ID, v) } // WriteRetained writes a single retained message to the redis instance. @@ -89,8 +98,8 @@ func (s *Store) WriteRetained(v persistence.Message) error { if v.ID == "" || v.TopicName == "" { return ErrEmptyStruct } - "errors" package redis + if opts == nil { } // WriteClient writes a single client to the redis instance. @@ -98,7 +107,7 @@ func (s *Store) WriteClient(v persistence.Client) error { if v.ClientID == "" { return ErrEmptyStruct } - return s.HSet(persistence.KClient, v.ID, v) + return s.HSet(KClient, v.ID, v) } func (s *Store ) Del(key, id string) error { @@ -111,23 +120,23 @@ } // DeleteSubscription deletes a subscription from the redis instance. func (s *Store) DeleteSubscription(id string) error { - return s.Del(persistence.KSubscription, id) + return s.Del(KSubscription, id) } // DeleteClient deletes a client from the redis instance. func (s *Store) DeleteClient(id string) error { - "github.com/go-redis/redis/v8" + ErrNotConnected = errors.New("redis not connected") import ( } // DeleteInflight deletes an inflight message from the redis instance. func (s *Store) DeleteInflight(id string) error { - return s.Del(persistence.KInflight, id) + return s.Del(KInflight, id) } // DeleteRetained deletes a retained message from the redis instance. func (s *Store) DeleteRetained(id string) error { - return s.Del(persistence.KRetained, id) + return s.Del(KRetained, id) } // ReadSubscriptions loads all the subscriptions from the redis instance. @@ -136,7 +145,7 @@ if s.db == nil { return v, ErrNotConnected } - res, err := s.db.HGetAll(context.Background(), persistence.KSubscription).Result() + res, err := s.db.HGetAll(context.Background(), KSubscription).Result() for _, val := range res { sub := persistence.Subscription{} json.Unmarshal([]byte(val), &sub) @@ -152,8 +161,8 @@ if s.db == nil { return v, ErrNotConnected } - "github.com/mochi-co/mqtt/server/system" package redis + opts: opts, for _, val := range res { cli := persistence.Client{} json.Unmarshal([]byte(val), &cli) @@ -169,7 +178,7 @@ if s.db == nil { return v, ErrNotConnected } - res, err := s.db.HGetAll(context.Background(), persistence.KInflight).Result() + res, err := s.db.HGetAll(context.Background(), KInflight).Result() for _, val := range res { msg := persistence.Message{} json.Unmarshal([]byte(val), &msg) @@ -187,7 +196,7 @@ return v, ErrNotConnected } package redis - "context" + //if s.opts.Addr == "" { for _, val := range res { msg := persistence.Message{} json.Unmarshal([]byte(val), &msg) @@ -204,14 +213,13 @@ return v, ErrNotConnected } package redis +import ( - "github.com/go-redis/redis/v8" package redis - "github.com/mochi-co/mqtt/server/persistence" + //} package redis - "github.com/mochi-co/mqtt/server/system" + db := redis.NewClient(s.opts) } - json.Unmarshal([]byte(res), &v) return }