websocket 链接问题
老师好! 我尝试着用websocket 实现了类似于直播间的一个小demo 目前可以正常的广播消息,我做了一个这样的校验: 当我自己的链接上发送消息的时候 不给自己发送,只会广播给其他的链接,但是此时出现了一个奇怪的bug, 当一条链接连上时,给自己发送消息 是校验成功的,但是当第二次的链接上的时候,此时校验就会失效,我怀疑是多个goroutine的问题,我尝试着做了几次修复,但是还是会有这样的问题。能麻烦老师抽时间帮忙给看一下吗? 感谢老师!
项目地址:https://github.com/fangsinan/muke_goods/blob/main/mukeGoodsApi/live_ws/api

部分代码如下:
global.go
type ClientT map[*websocket.Conn]*forms.Client
var (
Ping = make(chan struct{}, 1024)
PushMsg = make(chan struct{}, 1024)
WsClients = make(map[int]ClientT)
)forms.go
// Msg read msg
type Msg struct {
Action string `json:"action"`
Data interface{} `json:"data"`
}
// Client 用户client
type Client struct {
Id string
Ws *websocket.Conn
IsValid bool
Mux *sync.Mutex
}live_ws.go
package api
import (
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
uuid "github.com/satori/go.uuid"
"strconv"
"sync"
"time"
"webApi/live_ws/api/handler"
"webApi/live_ws/forms"
"webApi/live_ws/global"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
var Msg = forms.Msg{}
// WsPushInter 定义ws接口
type WsPushInter interface {
WsPing()
WsPushMsg(forms.Msg)
}
// WsService 定义ws接口
type WsService struct {
WsPush WsPushInter
RC *forms.Client
//Mux sync.RWMutex
}
// WsHandler websocket应用
func WsHandler(c *gin.Context) {
// 处理参数
liveIdStr := c.DefaultQuery("live_id", "0")
liveId, _ := strconv.Atoi(liveIdStr)
if liveId <= 0 {
zap.S().Errorf("直播间不存在")
return
}
//初始化liveId
if _, ok := global.WsClients[liveId]; !ok {
global.WsClients[liveId] = make(global.ClientT)
}
// 升级net 为websocket
var (
conn *websocket.Conn
err error
)
conn, err = global.WsUpgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
zap.S().Errorf("ws upgrader error: %v", err)
return
}
// 用户链接基础信息
userC := &forms.Client{
Id: fmt.Sprintf("%s_%v", liveIdStr, uuid.NewV4()),
Ws: conn,
IsValid: true,
Mux: &sync.Mutex{},
}
// 配置推送服务的接口
Ws := WsService{
WsPush: &handler.Clients{
Fc: userC,
LiveId: liveId,
},
RC: userC,
}
defer func() {
userC.IsValid = false
userC.Ws.Close()
zap.S().Infof("退出 =====关闭连接")
}()
// 写入连接池进行广播
global.WsClients[liveId][conn] = userC
zap.S().Infof("[用户池] :%v", global.WsClients)
for k, v := range global.WsClients {
zap.S().Infof("直播间%d有 %d链接", k, len(v))
for _, client := range v {
zap.S().Infof("直播间id: %d ", client.Id)
}
}
// 监控发送「
go Ws.Send()
// 监控接收消息
Ws.Receive()
}
// Receive 循环接收消息
func (s *WsService) Receive() {
for {
//校验当前链接是否有效
if s.RC.IsValid == true {
s.RC.Mux.Lock()
_, b, err := s.RC.Ws.ReadMessage()
s.RC.Mux.Unlock()
if err != nil {
//zap.S().Infof("info read error %v", err)
if !websocket.IsCloseError(err,
websocket.CloseGoingAway,
websocket.CloseNormalClosure,
websocket.CloseNoStatusReceived,
) {
zap.S().Infof("unexpected read error %v", err)
}
break
}
err = json.Unmarshal(b, &Msg)
if err != nil {
zap.S().Infof("unexpected read unmarshal error %v", err)
return
}
// 转发调用不同动作
switch Msg.Action {
case "Ping":
global.Ping <- struct{}{}
case "Comment":
global.PushMsg <- struct{}{}
}
}
}
}
func (s *WsService) Send() {
// 监控client发送的 json 消息
// Ping消息体 { action: "Ping", data: {id: 123}}
// Ping消息体 controller: "Push", action: "Comment", data: {content: "你好",}
for {
select {
case <-global.Ping: //每5秒发一次ping
s.WsPush.WsPing()
case <-global.PushMsg: //每收到一次评论 广播一次
s.WsPush.WsPushMsg(Msg)
case <-time.After(3600 * time.Second): // 设置10秒超时机制 未收到任何消息 关闭当前链接
zap.S().Infof("==10===关闭连接")
s.RC.IsValid = false
s.RC.Ws.Close()
default:
}
}
}handler/ws.go
package handler
import (
"encoding/json"
"github.com/gorilla/websocket"
"webApi/live_ws/forms"
"webApi/live_ws/global"
"go.uber.org/zap"
)
type Clients struct {
Fc *forms.Client
LiveId int
}
func (wsC *Clients) WsPing() {
PingStruct := struct {
Controller string `json:"controller"`
Action string `json:"action"`
Data map[string]int `json:"data"`
}{
Controller: "Index",
Action: "Ping",
Data: map[string]int{
"live_id": wsC.LiveId,
},
}
wsC.Fc.Mux.Lock()
// 返回当前链接
wsErr := wsC.Fc.Ws.WriteJSON(PingStruct)
wsC.Fc.Mux.Unlock()
if wsErr != nil {
if !websocket.IsCloseError(wsErr,
websocket.CloseGoingAway,
websocket.CloseNormalClosure) {
zap.S().Errorf("[WsPing] unexpected read error %v", wsErr)
}
}
}
func (wsC *Clients) WsPushMsg(msg forms.Msg) {
//校验当前goroutine的链接 是否有效 如果无效 无需广播
if wsC.Fc.IsValid == false {
return
}
// 解析评论结构
b, err := json.Marshal(&msg.Data)
if err != nil {
zap.S().Errorf("cannot json Marshal:%v", err)
}
PingStruct := struct {
Status int `json:"status"`
Action string `json:"action"`
Data string `json:"data"`
}{
Action: "WsPushMsg",
Status: 200,
Data: "[我是服务器]: " + string(b),
}
// 开辟新内存 接收uid
uid := wsC.Fc.Id
// 消息广播
i := 0
ids := ""
for _, v := range global.WsClients[wsC.LiveId] {
if v.IsValid == true {
ids += "------- " + v.Id
if v.Id == uid { // id 相同
continue
}
if v.Ws == wsC.Fc.Ws { // conn 相同
continue
}
i++
wsC.Fc.Mux.Lock()
wsErr := v.Ws.WriteJSON(PingStruct)
wsC.Fc.Mux.Unlock()
if wsErr != nil {
zap.S().Errorf("[WsPushMsg] Write Marshal:%v", wsErr)
break
}
}
}
zap.S().Infof("当前id为%v \n 执行的live_id:%v 循环了%d次: \n 内容是:%v", uid, wsC.LiveId, i, ids)
}15
收起
正在回答 回答被采纳积分+1
1回答
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星