service/library/melody/hub.go

127 lines
2.6 KiB
Go
Raw Normal View History

2023-12-21 22:17:40 +08:00
package melody
import (
2024-10-12 16:28:53 +08:00
"fmt"
2023-12-21 22:17:40 +08:00
"sync"
)
type hub struct {
sessions map[*Session]bool
2024-10-12 16:28:53 +08:00
sess map[string]*Session // sid: session
2023-12-21 22:17:40 +08:00
midsessions map[int64]*Session // mid: session
didsessions map[string]*Session // did: session
roomsessions map[int64]map[int64]bool // roomid: {sid: true}
2024-10-12 16:28:53 +08:00
broadcast chan *CMsg
2023-12-21 22:17:40 +08:00
register chan *Session
unregister chan *Session
2024-10-12 16:28:53 +08:00
exit chan *CMsg
2023-12-21 22:17:40 +08:00
open bool
rwmutex *sync.RWMutex
}
func newHub() *hub {
return &hub{
sessions: make(map[*Session]bool),
2024-10-12 16:28:53 +08:00
sess: make(map[string]*Session),
2023-12-21 22:17:40 +08:00
midsessions: make(map[int64]*Session),
didsessions: make(map[string]*Session),
roomsessions: make(map[int64]map[int64]bool),
2024-10-12 16:28:53 +08:00
broadcast: make(chan *CMsg),
2023-12-21 22:17:40 +08:00
register: make(chan *Session),
unregister: make(chan *Session),
2024-10-12 16:28:53 +08:00
exit: make(chan *CMsg),
2023-12-21 22:17:40 +08:00
open: true,
rwmutex: &sync.RWMutex{},
}
}
func (h *hub) run() {
loop:
for {
select {
case s := <-h.register:
h.rwmutex.Lock()
h.sessions[s] = true
h.sess[s.Sid] = s
h.midsessions[s.Mid] = s
h.didsessions[s.Did] = s
h.rwmutex.Unlock()
case s := <-h.unregister:
if _, ok := h.sessions[s]; ok {
h.rwmutex.Lock()
delete(h.sessions, s)
delete(h.sess, s.Sid)
delete(h.midsessions, s.Mid)
delete(h.didsessions, s.Did)
h.rwmutex.Unlock()
}
case m := <-h.broadcast:
h.rwmutex.RLock()
for s := range h.sessions {
if m.filter != nil {
if m.filter(s) {
s.writeMessage(m)
}
} else {
s.writeMessage(m)
}
}
h.rwmutex.RUnlock()
case m := <-h.exit:
h.rwmutex.Lock()
for s := range h.sessions {
s.writeMessage(m)
delete(h.sessions, s)
s.Close()
}
h.open = false
h.rwmutex.Unlock()
break loop
}
}
}
func (h *hub) closed() bool {
h.rwmutex.RLock()
defer h.rwmutex.RUnlock()
return !h.open
}
func (h *hub) len() int {
h.rwmutex.RLock()
defer h.rwmutex.RUnlock()
return len(h.sessions)
}
func (h *hub) all() []*Session {
h.rwmutex.RLock()
defer h.rwmutex.RUnlock()
s := make([]*Session, 0, len(h.sessions))
for k := range h.sessions {
s = append(s, k)
}
return s
}
2024-10-12 16:28:53 +08:00
func (h *hub) getSessionBySid(sid string) *Session {
return h.sess[sid]
}
func (h *hub) getSessionByMid(mid int64) *Session {
return h.midsessions[mid]
}
func (h *hub) mustGetSession(sid string, mid int64) (*Session, error) {
sessSid := h.getSessionBySid(sid)
if sessSid != nil {
return sessSid, nil
}
sessMid := h.getSessionByMid(mid)
if sessMid != nil {
return sessMid, nil
}
return nil, fmt.Errorf("get session fail")
}