websocket 链接问题
老师好! 我尝试着用websocket 实现了类似于直播间的一个小demo 目前可以正常的广播消息,我做了一个这样的校验: 当我自己的链接上发送消息的时候 不给自己发送,只会广播给其他的链接,但是此时出现了一个奇怪的bug, 当一条链接连上时,给自己发送消息 是校验成功的,但是当第二次的链接上的时候,此时校验就会失效,我怀疑是多个goroutine的问题,我尝试着做了几次修复,但是还是会有这样的问题。能麻烦老师抽时间帮忙给看一下吗? 感谢老师!
项目地址:https://github.com/fangsinan/muke_goods/blob/main/mukeGoodsApi/live_ws/api
部分代码如下:
global.go
type ClientT map[*websocket.Conn]*forms.Client var ( Ping = make(chan struct{}, 1024) PushMsg = make(chan struct{}, 1024) WsClients = make(map[int]ClientT) )
forms.go
// Msg read msg type Msg struct { Action string `json:"action"` Data interface{} `json:"data"` } // Client 用户client type Client struct { Id string Ws *websocket.Conn IsValid bool Mux *sync.Mutex }
live_ws.go
package api import ( "encoding/json" "fmt" "github.com/gorilla/websocket" uuid "github.com/satori/go.uuid" "strconv" "sync" "time" "webApi/live_ws/api/handler" "webApi/live_ws/forms" "webApi/live_ws/global" "github.com/gin-gonic/gin" "go.uber.org/zap" ) var Msg = forms.Msg{} // WsPushInter 定义ws接口 type WsPushInter interface { WsPing() WsPushMsg(forms.Msg) } // WsService 定义ws接口 type WsService struct { WsPush WsPushInter RC *forms.Client //Mux sync.RWMutex } // WsHandler websocket应用 func WsHandler(c *gin.Context) { // 处理参数 liveIdStr := c.DefaultQuery("live_id", "0") liveId, _ := strconv.Atoi(liveIdStr) if liveId <= 0 { zap.S().Errorf("直播间不存在") return } //初始化liveId if _, ok := global.WsClients[liveId]; !ok { global.WsClients[liveId] = make(global.ClientT) } // 升级net 为websocket var ( conn *websocket.Conn err error ) conn, err = global.WsUpgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { zap.S().Errorf("ws upgrader error: %v", err) return } // 用户链接基础信息 userC := &forms.Client{ Id: fmt.Sprintf("%s_%v", liveIdStr, uuid.NewV4()), Ws: conn, IsValid: true, Mux: &sync.Mutex{}, } // 配置推送服务的接口 Ws := WsService{ WsPush: &handler.Clients{ Fc: userC, LiveId: liveId, }, RC: userC, } defer func() { userC.IsValid = false userC.Ws.Close() zap.S().Infof("退出 =====关闭连接") }() // 写入连接池进行广播 global.WsClients[liveId][conn] = userC zap.S().Infof("[用户池] :%v", global.WsClients) for k, v := range global.WsClients { zap.S().Infof("直播间%d有 %d链接", k, len(v)) for _, client := range v { zap.S().Infof("直播间id: %d ", client.Id) } } // 监控发送「 go Ws.Send() // 监控接收消息 Ws.Receive() } // Receive 循环接收消息 func (s *WsService) Receive() { for { //校验当前链接是否有效 if s.RC.IsValid == true { s.RC.Mux.Lock() _, b, err := s.RC.Ws.ReadMessage() s.RC.Mux.Unlock() if err != nil { //zap.S().Infof("info read error %v", err) if !websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived, ) { zap.S().Infof("unexpected read error %v", err) } break } err = json.Unmarshal(b, &Msg) if err != nil { zap.S().Infof("unexpected read unmarshal error %v", err) return } // 转发调用不同动作 switch Msg.Action { case "Ping": global.Ping <- struct{}{} case "Comment": global.PushMsg <- struct{}{} } } } } func (s *WsService) Send() { // 监控client发送的 json 消息 // Ping消息体 { action: "Ping", data: {id: 123}} // Ping消息体 controller: "Push", action: "Comment", data: {content: "你好",} for { select { case <-global.Ping: //每5秒发一次ping s.WsPush.WsPing() case <-global.PushMsg: //每收到一次评论 广播一次 s.WsPush.WsPushMsg(Msg) case <-time.After(3600 * time.Second): // 设置10秒超时机制 未收到任何消息 关闭当前链接 zap.S().Infof("==10===关闭连接") s.RC.IsValid = false s.RC.Ws.Close() default: } } }
handler/ws.go
package handler import ( "encoding/json" "github.com/gorilla/websocket" "webApi/live_ws/forms" "webApi/live_ws/global" "go.uber.org/zap" ) type Clients struct { Fc *forms.Client LiveId int } func (wsC *Clients) WsPing() { PingStruct := struct { Controller string `json:"controller"` Action string `json:"action"` Data map[string]int `json:"data"` }{ Controller: "Index", Action: "Ping", Data: map[string]int{ "live_id": wsC.LiveId, }, } wsC.Fc.Mux.Lock() // 返回当前链接 wsErr := wsC.Fc.Ws.WriteJSON(PingStruct) wsC.Fc.Mux.Unlock() if wsErr != nil { if !websocket.IsCloseError(wsErr, websocket.CloseGoingAway, websocket.CloseNormalClosure) { zap.S().Errorf("[WsPing] unexpected read error %v", wsErr) } } } func (wsC *Clients) WsPushMsg(msg forms.Msg) { //校验当前goroutine的链接 是否有效 如果无效 无需广播 if wsC.Fc.IsValid == false { return } // 解析评论结构 b, err := json.Marshal(&msg.Data) if err != nil { zap.S().Errorf("cannot json Marshal:%v", err) } PingStruct := struct { Status int `json:"status"` Action string `json:"action"` Data string `json:"data"` }{ Action: "WsPushMsg", Status: 200, Data: "[我是服务器]: " + string(b), } // 开辟新内存 接收uid uid := wsC.Fc.Id // 消息广播 i := 0 ids := "" for _, v := range global.WsClients[wsC.LiveId] { if v.IsValid == true { ids += "------- " + v.Id if v.Id == uid { // id 相同 continue } if v.Ws == wsC.Fc.Ws { // conn 相同 continue } i++ wsC.Fc.Mux.Lock() wsErr := v.Ws.WriteJSON(PingStruct) wsC.Fc.Mux.Unlock() if wsErr != nil { zap.S().Errorf("[WsPushMsg] Write Marshal:%v", wsErr) break } } } zap.S().Infof("当前id为%v \n 执行的live_id:%v 循环了%d次: \n 内容是:%v", uid, wsC.LiveId, i, ids) }
15
收起
正在回答 回答被采纳积分+1
1回答
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星