add multi online

This commit is contained in:
lwl0608 2024-12-10 14:49:57 +08:00
parent d8f7045bff
commit 2b884bd3fb
5 changed files with 29 additions and 39 deletions

View File

@ -13,7 +13,7 @@ func main() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
u := url.URL{Scheme: "ws", Host: "192.168.0.105:7890", Path: "/ws"} // 使用公共的 echo WebSocket 服务器进行测试
u := url.URL{Scheme: "ws", Host: "127.0.0.1:7890", Path: "/ws", RawQuery: "b_mid=111&b_did=abc&b_dt=1&b_token=abcd&b_ch=a2"} // 使用公共的 echo WebSocket 服务器进行测试
//u := url.URL{Scheme: "ws", Host: "你的WebSocket服务器地址", Path: "/"} // 替换成你的服务器地址
log.Printf("connecting to %s", u.String())

Binary file not shown.

View File

@ -1,11 +1,12 @@
package melody
import (
"fmt"
"service/library/logger"
"sync"
)
type hub struct {
sidSessionMap map[string]*Session
sessionMap map[*Session]bool
midSessionsMap map[int64]map[*Session]struct{} // mid: map[session]{}
didSessionMap map[string]*Session // did: session
@ -19,6 +20,7 @@ type hub struct {
func newHub() *hub {
return &hub{
sidSessionMap: make(map[string]*Session),
sessionMap: make(map[*Session]bool),
midSessionsMap: make(map[int64]map[*Session]struct{}, 1024*20),
didSessionMap: make(map[string]*Session),
@ -45,6 +47,10 @@ loop:
h.midSessionsMap[s.Mid][s] = struct{}{}
}
h.rwmutex.Unlock()
for mid, sMap := range h.midSessionsMap {
logger.Info("reg, mid: %v, len(s): %v", mid, len(sMap))
}
case s := <-h.unregister:
if _, ok := h.sessionMap[s]; ok {
h.rwmutex.Lock()
@ -57,6 +63,10 @@ loop:
}
}
h.rwmutex.Unlock()
for mid, sMap := range h.midSessionsMap {
logger.Info("unreg, mid: %v, len(s): %v", mid, len(sMap))
}
}
case m := <-h.broadcast:
h.rwmutex.RLock()
@ -108,34 +118,13 @@ func (h *hub) all() []*Session {
return s
}
func (h *hub) getSessionBySid(sid string) *Session {
return h.sidSessionMap[sid]
}
func (h *hub) getSessionByMid(mid int64) *Session {
return h.midSessionsMap[mid]
}
func (h *hub) mustGetSession(sid string, mid int64) (*Session, error) {
sessSid := h.getSessionBySid(sid)
if sessSid != nil {
return sessSid, nil
func (h *hub) updateSession(s *Session) {
if len(s.Sid) > 0 {
h.rwmutex.Lock()
h.sidSessionMap[s.Sid] = s
h.rwmutex.Unlock()
}
sessMid := h.getSessionByMid(mid)
if sessMid != nil {
return sessMid, nil
for i, v := range h.sidSessionMap {
logger.Info("%d, %v", i, v.Sid)
}
return nil, fmt.Errorf("get session fail")
}
func (h *hub) midSessions(mid int64) ([]*Session, error) {
sids := h.midSidsMap[mid]
if len(sids) > 0 {
sessions := make([]*Session, 0)
for sid := range sids {
sessions = append(sessions, h.sidSessionMap[sid])
}
return sessions, nil
}
return nil, fmt.Errorf("get mid sessionMap fail")
}

View File

@ -414,10 +414,10 @@ func (m *Melody) LiveMids() []int64 {
}
func (m *Melody) SendBizMsg(req *firenzeproto.SendBizMsgParam) error {
sess, err := m.hub.mustGetSession(req.Sid, req.Mid)
if err != nil {
logger.Error("mustGetSession fail, req: %v, err: %v", util.ToJson(req), err)
return err
sessions := m.hub.midSessionsMap[req.Mid]
if len(sessions) <= 0 {
logger.Warn("no sessions for mid: %v", req.Mid)
return fmt.Errorf("no sessions")
}
msg := &CMsg{
@ -426,10 +426,11 @@ func (m *Melody) SendBizMsg(req *firenzeproto.SendBizMsgParam) error {
Msg: req.Msg,
}
bs, _ := json.Marshal(msg)
err = sess.WriteBinary(bs)
if err != nil {
logger.Error("WriteBinary fail, req: %v, err: %v", util.ToJson(req), err)
return err
for sess := range sessions {
_err := sess.WriteBinary(bs)
if _err != nil {
logger.Error("WriteBinary fail, req: %v, err: %v", util.ToJson(req), _err)
}
}
logger.Info("SendBizMsg success, req: %v", util.ToJson(req))
return nil

View File

@ -183,7 +183,7 @@ func (s *Session) readPump() {
logger.Error("failed to init conn: %s - %s", s.String(), err.Error())
break
}
s.melody.hub.register <- s
s.melody.hub.updateSession(s)
default:
// TODO: 处理其他类型消息
}