websocket 链接问题

websocket 链接问题

老师好! 我尝试着用websocket 实现了类似于直播间的一个小demo  目前可以正常的广播消息,我做了一个这样的校验: 当我自己的链接上发送消息的时候 不给自己发送,只会广播给其他的链接,但是此时出现了一个奇怪的bug, 当一条链接连上时,给自己发送消息 是校验成功的,但是当第二次的链接上的时候,此时校验就会失效,我怀疑是多个goroutine的问题,我尝试着做了几次修复,但是还是会有这样的问题。能麻烦老师抽时间帮忙给看一下吗?  感谢老师!
项目地址:https://github.com/fangsinan/muke_goods/blob/main/mukeGoodsApi/live_ws/api


https://img1.sycdn.imooc.com//climg/62f9e53709307cee17580692.jpg

部分代码如下:


global.go

type ClientT map[*websocket.Conn]*forms.Client

var (
   Ping      = make(chan struct{}, 1024)
   PushMsg   = make(chan struct{}, 1024)
   WsClients = make(map[int]ClientT)
)


forms.go


// Msg read msg
type Msg struct {
   Action string      `json:"action"`
   Data   interface{} `json:"data"`
}

// Client 用户client
type Client struct {
   Id      string
   Ws      *websocket.Conn
   IsValid bool
   Mux     *sync.Mutex
}


live_ws.go

package api

import (
   "encoding/json"
   "fmt"
   "github.com/gorilla/websocket"
   uuid "github.com/satori/go.uuid"
   "strconv"
   "sync"
   "time"
   "webApi/live_ws/api/handler"
   "webApi/live_ws/forms"
   "webApi/live_ws/global"

   "github.com/gin-gonic/gin"
   "go.uber.org/zap"
)

var Msg = forms.Msg{}

// WsPushInter 定义ws接口
type WsPushInter interface {
   WsPing()
   WsPushMsg(forms.Msg)
}

// WsService 定义ws接口
type WsService struct {
   WsPush WsPushInter
   RC     *forms.Client
   //Mux    sync.RWMutex
}

// WsHandler websocket应用
func WsHandler(c *gin.Context) {

   // 处理参数
   liveIdStr := c.DefaultQuery("live_id", "0")
   liveId, _ := strconv.Atoi(liveIdStr)
   if liveId <= 0 {
      zap.S().Errorf("直播间不存在")
      return
   }
   //初始化liveId
   if _, ok := global.WsClients[liveId]; !ok {
      global.WsClients[liveId] = make(global.ClientT)
   }

   // 升级net 为websocket
   var (
      conn *websocket.Conn
      err  error
   )
   conn, err = global.WsUpgrader.Upgrade(c.Writer, c.Request, nil)
   if err != nil {
      zap.S().Errorf("ws upgrader error: %v", err)
      return
   }

   // 用户链接基础信息
   userC := &forms.Client{
      Id:      fmt.Sprintf("%s_%v", liveIdStr, uuid.NewV4()),
      Ws:      conn,
      IsValid: true,
      Mux:     &sync.Mutex{},
   }

   // 配置推送服务的接口
   Ws := WsService{
      WsPush: &handler.Clients{
         Fc:     userC,
         LiveId: liveId,
      },
      RC: userC,
   }
   defer func() {
      userC.IsValid = false
      userC.Ws.Close()
      zap.S().Infof("退出 =====关闭连接")
   }()

   // 写入连接池进行广播
   global.WsClients[liveId][conn] = userC

   zap.S().Infof("[用户池] :%v", global.WsClients)
   for k, v := range global.WsClients {
      zap.S().Infof("直播间%d有 %d链接", k, len(v))
      for _, client := range v {
         zap.S().Infof("直播间id:  %d ", client.Id)
      }
   }
   // 监控发送「
   go Ws.Send()

   // 监控接收消息
   Ws.Receive()
}

// Receive 循环接收消息
func (s *WsService) Receive() {
   for {
      //校验当前链接是否有效
      if s.RC.IsValid == true {

         s.RC.Mux.Lock()
         _, b, err := s.RC.Ws.ReadMessage()
         s.RC.Mux.Unlock()
         if err != nil {
            //zap.S().Infof("info read error %v", err)
            if !websocket.IsCloseError(err,
               websocket.CloseGoingAway,
               websocket.CloseNormalClosure,
               websocket.CloseNoStatusReceived,
            ) {
               zap.S().Infof("unexpected read error %v", err)
            }
            break
         }
         err = json.Unmarshal(b, &Msg)
         if err != nil {
            zap.S().Infof("unexpected read unmarshal error %v", err)
            return
         }

         // 转发调用不同动作
         switch Msg.Action {
         case "Ping":
            global.Ping <- struct{}{}
         case "Comment":
            global.PushMsg <- struct{}{}
         }
      }

   }
}

func (s *WsService) Send() {
   // 监控client发送的 json 消息
   // Ping消息体 { action: "Ping", data: {id: 123}}
   // Ping消息体 controller: "Push", action: "Comment", data: {content: "你好",}
   for {
      select {
      case <-global.Ping: //每5秒发一次ping
         s.WsPush.WsPing()
      case <-global.PushMsg: //每收到一次评论 广播一次
         s.WsPush.WsPushMsg(Msg)
      case <-time.After(3600 * time.Second): // 设置10秒超时机制 未收到任何消息 关闭当前链接
         zap.S().Infof("==10===关闭连接")
         s.RC.IsValid = false
         s.RC.Ws.Close()
      default:
      }
   }
}


handler/ws.go

package handler

import (
   "encoding/json"
   "github.com/gorilla/websocket"
   "webApi/live_ws/forms"
   "webApi/live_ws/global"

   "go.uber.org/zap"
)

type Clients struct {
   Fc     *forms.Client
   LiveId int
}

func (wsC *Clients) WsPing() {
   PingStruct := struct {
      Controller string         `json:"controller"`
      Action     string         `json:"action"`
      Data       map[string]int `json:"data"`
   }{
      Controller: "Index",
      Action:     "Ping",
      Data: map[string]int{
         "live_id": wsC.LiveId,
      },
   }
   wsC.Fc.Mux.Lock()
   // 返回当前链接
   wsErr := wsC.Fc.Ws.WriteJSON(PingStruct)
   wsC.Fc.Mux.Unlock()
   if wsErr != nil {
      if !websocket.IsCloseError(wsErr,
         websocket.CloseGoingAway,
         websocket.CloseNormalClosure) {
         zap.S().Errorf("[WsPing] unexpected read error %v", wsErr)
      }
   }

}

func (wsC *Clients) WsPushMsg(msg forms.Msg) {

   //校验当前goroutine的链接 是否有效  如果无效  无需广播
   if wsC.Fc.IsValid == false {
      return
   }
   // 解析评论结构
   b, err := json.Marshal(&msg.Data)
   if err != nil {
      zap.S().Errorf("cannot json Marshal:%v", err)
   }

   PingStruct := struct {
      Status int    `json:"status"`
      Action string `json:"action"`
      Data   string `json:"data"`
   }{
      Action: "WsPushMsg",
      Status: 200,
      Data:   "[我是服务器]: " + string(b),
   }
   // 开辟新内存 接收uid
   uid := wsC.Fc.Id

   // 消息广播
   i := 0
   ids := ""
   for _, v := range global.WsClients[wsC.LiveId] {
      if v.IsValid == true {
         ids += "------- " + v.Id
         if v.Id == uid { // id 相同
            continue
         }

         if v.Ws == wsC.Fc.Ws { // conn 相同
            continue
         }
         i++

         wsC.Fc.Mux.Lock()
         wsErr := v.Ws.WriteJSON(PingStruct)
         wsC.Fc.Mux.Unlock()
         if wsErr != nil {
            zap.S().Errorf("[WsPushMsg] Write  Marshal:%v", wsErr)
            break
         }
      }
   }

   zap.S().Infof("当前id为%v   \n 执行的live_id:%v  循环了%d次: \n    内容是:%v", uid, wsC.LiveId, i, ids)

}

正在回答 回答被采纳积分+1

登陆购买课程后可参与讨论,去登陆

1回答
ccmouse 2022-08-15 19:55:31

这里的问题描述不是很清楚,想不给自己发送,但是校验的是什么?

似乎期待的是校验成功,那么校验失败的原因是什么?

我看到这里global.WsClients的读写没有进行保护,有可能会导致一些问题。

  • 提问者 慕沐8624122 #1
    没错,就是想的不给自己发送,在开始的时候为每一个新的链接生成了一个uuid拼接的随机串,存储到一个公共的集合中,但是在做广播操作的时候似乎这个判断并没有起到作用,当我用多个client链接的时候,发送了一条信息后,自己仍然可以收到服务端发来的信息!!
    2022-08-15 20:51:01
  • 提问者 慕沐8624122 #2

    老师,我大概能想到我的问题出现在哪里了。  这一步的校验似乎是有问题的,因为在这里开启了一个goroutine ,当多个用户发送到达消息时,此时的管道 有可能被其他用户的goroutine 抢占了,所以可能会导致,执行广播消息的那个goroutine里 就可能不是自己的uid,所以在这里判断就会失效。(我不太确实我的这个理解是不是正确的)
    我尝试着将客户端每次发送的消息体和客户端的uid进行绑定发送到管道中,在进行global.Wsclients的遍历中判断管道中的uid,对其他的连接推送消息,这样做确实是解决了我的这个自己发送的消息不需要推送给自己的问题。
    但是这样也会衍生出另外的问题,如果消息体的uid 被修改了,那就会有问题。
    老师  除了这种做法还有别的更好的一些方式吗?

    2022-08-16 17:58:22
  • ccmouse 回复 提问者 慕沐8624122 #3

    uuid绑定到管道中的确是正确的做法。

    不过消息体的uuid为何会改变?可不可以让uuid不变?或者我们可以尝试在这样的场合让它重新连接吗?

    2022-08-16 20:31:05
问题已解决,确定采纳
还有疑问,暂不采纳

恭喜解决一个难题,获得1积分~

来为老师/同学的回答评分吧

0 星
请稍等 ...
意见反馈 帮助中心 APP下载
官方微信

在线咨询

领取优惠

免费试听

领取大纲

扫描二维码,添加
你的专属老师