websocket 链接问题
老师好! 我尝试着用websocket 实现了类似于直播间的一个小demo 目前可以正常的广播消息,我做了一个这样的校验: 当我自己的链接上发送消息的时候 不给自己发送,只会广播给其他的链接,但是此时出现了一个奇怪的bug, 当一条链接连上时,给自己发送消息 是校验成功的,但是当第二次的链接上的时候,此时校验就会失效,我怀疑是多个goroutine的问题,我尝试着做了几次修复,但是还是会有这样的问题。能麻烦老师抽时间帮忙给看一下吗? 感谢老师!
项目地址:https://github.com/fangsinan/muke_goods/blob/main/mukeGoodsApi/live_ws/api
部分代码如下:
global.go
1 2 3 4 5 6 7 | type ClientT map[*websocket.Conn]*forms.Client var ( Ping = make(chan struct{}, 1024) PushMsg = make(chan struct{}, 1024) WsClients = make(map[int]ClientT) ) |
forms.go
1 2 3 4 5 6 7 8 9 10 11 12 13 | // Msg read msg type Msg struct { Action string `json: "action" ` Data interface {} `json: "data" ` } // Client 用户client type Client struct { Id string Ws *websocket.Conn IsValid bool Mux *sync.Mutex } |
live_ws.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | package api import ( "encoding/json" "fmt" "github.com/gorilla/websocket" uuid "github.com/satori/go.uuid" "strconv" "sync" "time" "webApi/live_ws/api/handler" "webApi/live_ws/forms" "webApi/live_ws/global" "github.com/gin-gonic/gin" "go.uber.org/zap" ) var Msg = forms.Msg{} // WsPushInter 定义ws接口 type WsPushInter interface { WsPing() WsPushMsg(forms.Msg) } // WsService 定义ws接口 type WsService struct { WsPush WsPushInter RC *forms.Client //Mux sync.RWMutex } // WsHandler websocket应用 func WsHandler(c *gin.Context) { // 处理参数 liveIdStr := c.DefaultQuery( "live_id" , "0" ) liveId, _ := strconv.Atoi(liveIdStr) if liveId <= 0 { zap.S().Errorf( "直播间不存在" ) return } //初始化liveId if _, ok := global .WsClients[liveId]; !ok { global .WsClients[liveId] = make( global .ClientT) } // 升级net 为websocket var ( conn *websocket.Conn err error ) conn, err = global .WsUpgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { zap.S().Errorf( "ws upgrader error: %v" , err) return } // 用户链接基础信息 userC := &forms.Client{ Id: fmt.Sprintf( "%s_%v" , liveIdStr, uuid.NewV4()), Ws: conn, IsValid: true, Mux: &sync.Mutex{}, } // 配置推送服务的接口 Ws := WsService{ WsPush: &handler.Clients{ Fc: userC, LiveId: liveId, }, RC: userC, } defer func() { userC.IsValid = false userC.Ws.Close() zap.S().Infof( "退出 =====关闭连接" ) }() // 写入连接池进行广播 global .WsClients[liveId][conn] = userC zap.S().Infof( "[用户池] :%v" , global .WsClients) for k, v := range global .WsClients { zap.S().Infof( "直播间%d有 %d链接" , k, len(v)) for _, client := range v { zap.S().Infof( "直播间id: %d " , client.Id) } } // 监控发送「 go Ws.Send() // 监控接收消息 Ws.Receive() } // Receive 循环接收消息 func (s *WsService) Receive() { for { //校验当前链接是否有效 if s.RC.IsValid == true { s.RC.Mux.Lock() _, b, err := s.RC.Ws.ReadMessage() s.RC.Mux.Unlock() if err != nil { //zap.S().Infof("info read error %v", err) if !websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived, ) { zap.S().Infof( "unexpected read error %v" , err) } break } err = json.Unmarshal(b, &Msg) if err != nil { zap.S().Infof( "unexpected read unmarshal error %v" , err) return } // 转发调用不同动作 switch Msg.Action { case "Ping" : global .Ping <- struct{}{} case "Comment" : global .PushMsg <- struct{}{} } } } } func (s *WsService) Send() { // 监控client发送的 json 消息 // Ping消息体 { action: "Ping", data: {id: 123}} // Ping消息体 controller: "Push", action: "Comment", data: {content: "你好",} for { select { case <- global .Ping: //每5秒发一次ping s.WsPush.WsPing() case <- global .PushMsg: //每收到一次评论 广播一次 s.WsPush.WsPushMsg(Msg) case <-time.After(3600 * time.Second): // 设置10秒超时机制 未收到任何消息 关闭当前链接 zap.S().Infof( "==10===关闭连接" ) s.RC.IsValid = false s.RC.Ws.Close() default : } } } |
handler/ws.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | package handler import ( "encoding/json" "github.com/gorilla/websocket" "webApi/live_ws/forms" "webApi/live_ws/global" "go.uber.org/zap" ) type Clients struct { Fc *forms.Client LiveId int } func (wsC *Clients) WsPing() { PingStruct := struct { Controller string `json: "controller" ` Action string `json: "action" ` Data map[string]int `json: "data" ` }{ Controller: "Index" , Action: "Ping" , Data: map[string]int{ "live_id" : wsC.LiveId, }, } wsC.Fc.Mux.Lock() // 返回当前链接 wsErr := wsC.Fc.Ws.WriteJSON(PingStruct) wsC.Fc.Mux.Unlock() if wsErr != nil { if !websocket.IsCloseError(wsErr, websocket.CloseGoingAway, websocket.CloseNormalClosure) { zap.S().Errorf( "[WsPing] unexpected read error %v" , wsErr) } } } func (wsC *Clients) WsPushMsg(msg forms.Msg) { //校验当前goroutine的链接 是否有效 如果无效 无需广播 if wsC.Fc.IsValid == false { return } // 解析评论结构 b, err := json.Marshal(&msg.Data) if err != nil { zap.S().Errorf( "cannot json Marshal:%v" , err) } PingStruct := struct { Status int `json: "status" ` Action string `json: "action" ` Data string `json: "data" ` }{ Action: "WsPushMsg" , Status: 200, Data: "[我是服务器]: " + string(b), } // 开辟新内存 接收uid uid := wsC.Fc.Id // 消息广播 i := 0 ids := "" for _, v := range global .WsClients[wsC.LiveId] { if v.IsValid == true { ids += "------- " + v.Id if v.Id == uid { // id 相同 continue } if v.Ws == wsC.Fc.Ws { // conn 相同 continue } i++ wsC.Fc.Mux.Lock() wsErr := v.Ws.WriteJSON(PingStruct) wsC.Fc.Mux.Unlock() if wsErr != nil { zap.S().Errorf( "[WsPushMsg] Write Marshal:%v" , wsErr) break } } } zap.S().Infof( "当前id为%v \n 执行的live_id:%v 循环了%d次: \n 内容是:%v" , uid, wsC.LiveId, i, ids) } |
15
收起
正在回答 回答被采纳积分+1
1回答
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧