This commit is contained in:
lwl0608 2024-12-23 14:41:27 +08:00
commit 808aed04d1
6 changed files with 117 additions and 104 deletions

View File

@ -27,7 +27,7 @@ func main() {
// 加载配置
// 1.默认配置
configPath := "/root/liwanglin/service/etc/firenze/firenze-local.yaml"
configPath := "/root/liwanglin/liwanglin/service/etc/firenze/firenze-local.yaml"
//configPath := "/Users/erwin/wishpalv2/service/etc/firenze/firenze-local-lwl.yaml"
//if os.Getenv("PROJECT_ENV") == "production" {

View File

@ -8,5 +8,5 @@ log:
app:
app_name: "firenze-test"
# ip: "127.0.0.1"
ip: "172.31.37.71"
ip: "172.31.37.66"
port: 7890

View File

@ -8,7 +8,7 @@ type Config struct {
PongWait time.Duration // Timeout for waiting on pong.
PingPeriod time.Duration // Milliseconds between pings.
MaxMessageSize int64 // Maximum size in bytes of a message.
MessageBufferSize int // The max amount of messages that can be in a sessions buffer before it starts dropping them.
MessageBufferSize int // The max amount of messages that can be in a sessionMap buffer before it starts dropping them.
}
func newConfig() *Config {

View File

@ -1,37 +1,35 @@
package melody
import (
"fmt"
"service/library/logger"
"sync"
)
type hub struct {
sessions map[*Session]bool
sess map[string]*Session // sid: session
midsessions map[int64]*Session // mid: session
didsessions map[string]*Session // did: session
roomsessions map[int64]map[int64]bool // roomid: {sid: true}
broadcast chan *CMsg
register chan *Session
unregister chan *Session
exit chan *CMsg
open bool
rwmutex *sync.RWMutex
sidSessionMap map[string]*Session
sessionMap map[*Session]bool
midSessionsMap map[int64]map[*Session]struct{} // mid: map[session]{}
didSessionMap map[string]*Session // did: session
broadcast chan *CMsg
register chan *Session
unregister chan *Session
exit chan *CMsg
open bool
rwmutex *sync.RWMutex
}
func newHub() *hub {
return &hub{
sessions: make(map[*Session]bool),
sess: make(map[string]*Session),
midsessions: make(map[int64]*Session),
didsessions: make(map[string]*Session),
roomsessions: make(map[int64]map[int64]bool),
broadcast: make(chan *CMsg),
register: make(chan *Session),
unregister: make(chan *Session),
exit: make(chan *CMsg),
open: true,
rwmutex: &sync.RWMutex{},
sidSessionMap: make(map[string]*Session),
sessionMap: make(map[*Session]bool),
midSessionsMap: make(map[int64]map[*Session]struct{}, 1024*20),
didSessionMap: make(map[string]*Session),
broadcast: make(chan *CMsg),
register: make(chan *Session),
unregister: make(chan *Session),
exit: make(chan *CMsg),
open: true,
rwmutex: &sync.RWMutex{},
}
}
@ -41,23 +39,38 @@ loop:
select {
case s := <-h.register:
h.rwmutex.Lock()
h.sessions[s] = true
h.sess[s.Sid] = s
h.midsessions[s.Mid] = s
h.didsessions[s.Did] = s
h.sessionMap[s] = true
h.didSessionMap[s.Did] = s
if h.midSessionsMap[s.Mid] == nil {
h.midSessionsMap[s.Mid] = map[*Session]struct{}{s: {}}
} else {
h.midSessionsMap[s.Mid][s] = struct{}{}
}
h.rwmutex.Unlock()
for mid, sMap := range h.midSessionsMap {
logger.Info("reg, mid: %v, len(s): %v", mid, len(sMap))
}
case s := <-h.unregister:
if _, ok := h.sessions[s]; ok {
if _, ok := h.sessionMap[s]; ok {
h.rwmutex.Lock()
delete(h.sessions, s)
delete(h.sess, s.Sid)
delete(h.midsessions, s.Mid)
delete(h.didsessions, s.Did)
delete(h.sessionMap, s)
delete(h.didSessionMap, s.Did)
if len(h.midSessionsMap[s.Mid]) > 0 {
delete(h.midSessionsMap[s.Mid], s)
if len(h.midSessionsMap[s.Mid]) <= 0 {
delete(h.midSessionsMap, s.Mid)
}
}
h.rwmutex.Unlock()
for mid, sMap := range h.midSessionsMap {
logger.Info("unreg, mid: %v, len(s): %v", mid, len(sMap))
}
}
case m := <-h.broadcast:
h.rwmutex.RLock()
for s := range h.sessions {
for s := range h.sessionMap {
if m.filter != nil {
if m.filter(s) {
s.writeMessage(m)
@ -69,9 +82,9 @@ loop:
h.rwmutex.RUnlock()
case m := <-h.exit:
h.rwmutex.Lock()
for s := range h.sessions {
for s := range h.sessionMap {
s.writeMessage(m)
delete(h.sessions, s)
delete(h.sessionMap, s)
s.Close()
}
h.open = false
@ -91,36 +104,53 @@ func (h *hub) len() int {
h.rwmutex.RLock()
defer h.rwmutex.RUnlock()
return len(h.sessions)
return len(h.sessionMap)
}
func (h *hub) all() []*Session {
if h == nil {
return nil
}
h.rwmutex.RLock()
defer h.rwmutex.RUnlock()
s := make([]*Session, 0, len(h.sessions))
for k := range h.sessions {
s := make([]*Session, 0, len(h.sessionMap))
for k := range h.sessionMap {
s = append(s, k)
}
return s
}
func (h *hub) getSessionBySid(sid string) *Session {
return h.sess[sid]
func (h *hub) GetDidSession(did string) *Session {
if h == nil {
return nil
}
if len(did) <= 0 {
return nil
}
h.rwmutex.RLock()
s := h.didSessionMap[did]
h.rwmutex.RUnlock()
return s
}
func (h *hub) getSessionByMid(mid int64) *Session {
return h.midsessions[mid]
}
func (h *hub) updateSession(s *Session) {
if h == nil {
return
}
if s == nil {
return
}
func (h *hub) mustGetSession(sid string, mid int64) (*Session, error) {
sessSid := h.getSessionBySid(sid)
if sessSid != nil {
return sessSid, nil
if len(s.Sid) > 0 {
h.rwmutex.Lock()
h.sidSessionMap[s.Sid] = s
h.rwmutex.Unlock()
}
sessMid := h.getSessionByMid(mid)
if sessMid != nil {
return sessMid, nil
for i, v := range h.sidSessionMap {
logger.Info("%d, %v", i, v.Sid)
}
return nil, fmt.Errorf("get session fail")
}

View File

@ -198,6 +198,16 @@ func (m *Melody) HandleRequestWithKeys(w http.ResponseWriter, r *http.Request, k
_, _ = w.Write(bs)
return err
}
if len(baseReq.Did) == 0 {
baseReq.Did = baseReq.Channel
}
// 检查设备
oldSession := m.hub.GetDidSession(baseReq.Did)
if oldSession != nil {
logger.Warn("device reconnect: %d - %s", baseReq.Mid, baseReq.Did)
oldSession.close()
}
// 升级协议
conn, err := m.Upgrader.Upgrade(w, r, w.Header())
@ -247,7 +257,7 @@ func (m *Melody) HandleRequestWithKeys(w http.ResponseWriter, r *http.Request, k
return nil
}
// Broadcast broadcasts a text message to all sessions.
// Broadcast broadcasts a text message to all sessionMap.
func (m *Melody) Broadcast(msg []byte) error {
if m.hub.closed() {
return ErrClosed
@ -259,7 +269,7 @@ func (m *Melody) Broadcast(msg []byte) error {
return nil
}
// BroadcastFilter broadcasts a text message to all sessions that fn returns true for.
// BroadcastFilter broadcasts a text message to all sessionMap that fn returns true for.
func (m *Melody) BroadcastFilter(msg []byte, fn func(*Session) bool) error {
if m.hub.closed() {
return ErrClosed
@ -271,14 +281,14 @@ func (m *Melody) BroadcastFilter(msg []byte, fn func(*Session) bool) error {
return nil
}
// BroadcastOthers broadcasts a text message to all sessions except session s.
// BroadcastOthers broadcasts a text message to all sessionMap except session s.
func (m *Melody) BroadcastOthers(msg []byte, s *Session) error {
return m.BroadcastFilter(msg, func(q *Session) bool {
return s != q
})
}
// BroadcastMultiple broadcasts a text message to multiple sessions given in the sessions slice.
// BroadcastMultiple broadcasts a text message to multiple sessionMap given in the sessionMap slice.
func (m *Melody) BroadcastMultiple(msg []byte, sessions []*Session) error {
for _, sess := range sessions {
if writeErr := sess.Write(msg); writeErr != nil {
@ -288,7 +298,7 @@ func (m *Melody) BroadcastMultiple(msg []byte, sessions []*Session) error {
return nil
}
// BroadcastBinary broadcasts a binary message to all sessions.
// BroadcastBinary broadcasts a binary message to all sessionMap.
func (m *Melody) BroadcastBinary(msg []byte) error {
if m.hub.closed() {
return ErrClosed
@ -300,7 +310,7 @@ func (m *Melody) BroadcastBinary(msg []byte) error {
return nil
}
// BroadcastBinaryFilter broadcasts a binary message to all sessions that fn returns true for.
// BroadcastBinaryFilter broadcasts a binary message to all sessionMap that fn returns true for.
func (m *Melody) BroadcastBinaryFilter(msg []byte, fn func(*Session) bool) error {
if m.hub.closed() {
return ErrClosed
@ -312,14 +322,14 @@ func (m *Melody) BroadcastBinaryFilter(msg []byte, fn func(*Session) bool) error
return nil
}
// BroadcastBinaryOthers broadcasts a binary message to all sessions except session s.
// BroadcastBinaryOthers broadcasts a binary message to all sessionMap except session s.
func (m *Melody) BroadcastBinaryOthers(msg []byte, s *Session) error {
return m.BroadcastBinaryFilter(msg, func(q *Session) bool {
return s != q
})
}
// Sessions returns all sessions. An error is returned if the melody session is closed.
// Sessions returns all sessionMap. An error is returned if the melody session is closed.
func (m *Melody) Sessions() ([]*Session, error) {
if m.hub.closed() {
return nil, ErrClosed
@ -327,7 +337,7 @@ func (m *Melody) Sessions() ([]*Session, error) {
return m.hub.all(), nil
}
// Close closes the melody instance and all connected sessions.
// Close closes the melody instance and all connected sessionMap.
func (m *Melody) Close() error {
if m.hub.closed() {
return ErrClosed
@ -338,7 +348,7 @@ func (m *Melody) Close() error {
return nil
}
// CloseWithMsg closes the melody instance with the given close payload and all connected sessions.
// CloseWithMsg closes the melody instance with the given close payload and all connected sessionMap.
// Use the FormatCloseMessage function to format a proper close message payload.
func (m *Melody) CloseWithMsg(msg []byte) error {
if m.hub.closed() {
@ -350,7 +360,7 @@ func (m *Melody) CloseWithMsg(msg []byte) error {
return nil
}
// Len return the number of connected sessions.
// Len return the number of connected sessionMap.
func (m *Melody) Len() int {
return m.hub.len()
}
@ -404,19 +414,17 @@ func parseBaseReq(values url.Values) (*base.BaseRequest, error) {
func (m *Melody) LiveMids() []int64 {
liveMids := make([]int64, 0)
for k := range m.hub.midsessions {
for k := range m.hub.midSessionsMap {
liveMids = append(liveMids, k)
}
return liveMids
}
func (m *Melody) SendBizMsg(req *firenzeproto.SendBizMsgParam) error {
logger.Info("alive mids: %v", m.LiveMids())
sess, err := m.hub.mustGetSession(req.Sid, req.Mid)
if err != nil {
logger.Error("mustGetSession fail, req: %v, err: %v", util.ToJson(req), err)
return err
sessions := m.hub.midSessionsMap[req.Mid]
if len(sessions) <= 0 {
logger.Warn("no sessions for mid: %v", req.Mid)
return fmt.Errorf("no sessions")
}
msg := &CMsg{
@ -425,10 +433,11 @@ func (m *Melody) SendBizMsg(req *firenzeproto.SendBizMsgParam) error {
Msg: req.Msg,
}
bs, _ := json.Marshal(msg)
err = sess.WriteBinary(bs)
if err != nil {
logger.Error("WriteBinary fail, req: %v, err: %v", util.ToJson(req), err)
return err
for sess := range sessions {
_err := sess.WriteBinary(bs)
if _err != nil {
logger.Error("WriteBinary fail, req: %v, err: %v", util.ToJson(req), _err)
}
}
logger.Info("SendBizMsg success, req: %v", util.ToJson(req))
return nil

View File

@ -183,33 +183,7 @@ func (s *Session) readPump() {
logger.Error("failed to init conn: %s - %s", s.String(), err.Error())
break
}
//case CMsgTypeBiz:
// if c.BizMsgHandler != nil {
// err = c.BizMsgHandler(c, &msg)
// if err != nil {
// logger.Error("failed to proc msg, conn: %s, msg: %s, err: %s", c.Str(), msg.String(), err.Error())
// }
// continue
// }
//
// poster := c.Hub.GetPoster(msg.ChannelId)
// if poster == nil {
// logger.Error("channel poster not found: %s", msg.ChannelId)
// continue
// }
// chanmsg = lib.ChanMsg{
// Id: msg.Id,
// SessionId: c.Session,
// Mid: c.Mid,
// Data: msg.Data,
// IP: c.IP,
// }
// err = poster.SendMsg(chanmsg)
// if err != nil {
// logger.Error("failed to Send msg to channel: msg: %s, %s", msg.String(), err.Error())
// }
s.melody.hub.updateSession(s)
default:
// TODO: 处理其他类型消息
}
@ -383,6 +357,6 @@ func (s *Session) String() string {
}
func genSid(s *Session) string {
str := fmt.Sprintf("%s_%d_%s", s.Did, s.Mid, time.Now().Format(time.RFC3339))
str := fmt.Sprintf("%s_%d_%d_%s", s.Did, s.Mid, s.DevType, time.Now().Format(time.RFC3339))
return fmt.Sprintf("%x", md5.Sum([]byte(str)))[:16]
}