关于MQ发布/订阅输出结果的疑问

关于MQ发布/订阅输出结果的疑问

老师,发布/订阅的代码,为啥输出的结果是从 message 2开始的?
c: message 2
c: message 2
c: message 3
c: message 3
c: message 4
c: message 4
c: message 5
c: message 5

不是应该从 message 1开始的,message 1是丢失了吗?

正在回答

登陆购买课程后可参与讨论,去登陆

1回答

这是个好问题。的确是丢了。因为我在Publish的时候是这么设置的。

err := ch.Publish(

exchange,

"",    // key

false, // mandatory

false, // immediate

amqp.Publishing{

Body: []byte(fmt.Sprintf("message %d", i)),

},

)

当immediate和mandatory都为false时,收不到的消息就会被丢掉。当其中一个为true,消息会通过Channel.NotifyReturn返还给消费者。immediate和mandatory的区别很细微,而且最新的更新中immediate已经被去掉了。所以可以不深究它们的区别。


最后这里为什么会丢掉,我们在go subscribe(conn, exchange)之后立刻就开始了发送,此时订阅(subscribe)方由于是在并行执行的goroutine中,还没来得及准备好接收。可以通过额外的go channel来让接收方通知发送方表示已经准备好,这样就不会丢。

  • lieh1203 提问者 #1

    老师,"您说可以通过额外的go channel来让接收方通知发送方表示已经准备好",这块只能通过自己写代码实现是吧? 

    amqp库内部没有封装通知发送方已经准备好的方法是吧?


    例如我下面的写法,consume创建成功后,发个消息给confirmCh,然后在publish之前等待confirmCh收到消息。

    consume(consumer string, ch *amqp.Channel, q string, confirmCh *confirmConsume) {
       msgs, err := ch.Consume(
          q,
          consumer, ,     ,    ,    ,    nil,      )
       err != nil {
          panic(err)
       }
    
       confirmCh <- &confirmConsume{}
    
       msg := msgs {
          fmt.Printf(, consumer, msg.Body)
       }
    }


    2022-11-13 15:23:40
问题已解决,确定采纳
还有疑问,暂不采纳

恭喜解决一个难题,获得1积分~

来为老师/同学的回答评分吧

0 星
请稍等 ...
意见反馈 帮助中心 APP下载
官方微信

在线咨询

领取优惠

免费试听

领取大纲

扫描二维码,添加
你的专属老师