老师,为什么我明明代码没错误,但是接收一条后就不接收了?
package workqueues;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 描述: 任务有所耗时,多个任务
*/
public class NewTask {
//队列名字
private final static String TASK_QUEUE_NAME="task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("192.168.70.137");
factory.setUsername("admin");
factory.setPassword("password");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列 需要四个参数
//第一个参数-队列名称、第二个参数-队列是否持久,若服务器重启,队列是否要存在、
//第三个参数-是不是独有,是不是队列只供此连接使用,第四个参数-需不需要自动删除,队列没使用的时候,第五个参数-参数
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
//发布消息
for (int i = 0; i < 10; i++) {
String message;
if(i%2==0){
message = i + "...";
}else{
message = String.valueOf(i);
}
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("发送了消息:"+message);
}
//关闭连接
channel.close();
connection.close();
}
}package workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
/**
* 描述: 消费者,接受前面的批量消息
*/
public class Worker {
private final static String TASK_QUEUE_NAME="task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("192.168.70.137");
factory.setUsername("admin");
factory.setPassword("password");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列 需要四个参数
//第一个参数-队列名称、第二个参数-队列是否持久,若服务器重启,队列是否要存在、
//第三个参数-是不是独有,是不是队列只供此连接使用,第四个参数-需不需要自动删除,队列没使用的时候,第五个参数-参数
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
System.out.println("开始接收消息");
//希望处理的数量,在处理完毕前,不接收下一个任务
channel.basicQos(1);//!!!
//但必须告诉我你是否处理完毕了,把第二个autoAck参数改成false!!!!
channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"UTF-8");
System.out.println("收到了消息:"+message);
try{
doWork(message);
}finally {
System.out.println("消息处理完成");
//消息处理完成,要确认消息
channel.basicAck(envelope.getDeliveryTag(),false);//!!!
}
}
});
}
private static void doWork(String task){
char[] chars = task.toCharArray();
for(char ch:chars){
if(ch == '.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

12
收起
正在回答 回答被采纳积分+1
1回答
2023版Java工程师
- 参与学习 人
- 提交作业 8788 份
- 解答问题 9886 个
综合就业常年第一,编程排行常年霸榜,北上广深月薪过万! 不需要基础,无需脱产即可学习,只要你有梦想,想高薪! 全新升级:技术栈升级(包含VUE3.0,ES6,Git)+项目升级(前后端联调与功能升级)
了解课程



恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星