老师,为什么我明明代码没错误,但是接收一条后就不接收了?
package workqueues; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 描述: 任务有所耗时,多个任务 */ public class NewTask { //队列名字 private final static String TASK_QUEUE_NAME="task_queue"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("192.168.70.137"); factory.setUsername("admin"); factory.setPassword("password"); //建立连接 Connection connection = factory.newConnection(); //获得信道 Channel channel = connection.createChannel(); //声明队列 需要四个参数 //第一个参数-队列名称、第二个参数-队列是否持久,若服务器重启,队列是否要存在、 //第三个参数-是不是独有,是不是队列只供此连接使用,第四个参数-需不需要自动删除,队列没使用的时候,第五个参数-参数 channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); //发布消息 for (int i = 0; i < 10; i++) { String message; if(i%2==0){ message = i + "..."; }else{ message = String.valueOf(i); } channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("发送了消息:"+message); } //关闭连接 channel.close(); connection.close(); } }
package workqueues; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Random; import java.util.concurrent.TimeoutException; /** * 描述: 消费者,接受前面的批量消息 */ public class Worker { private final static String TASK_QUEUE_NAME="task_queue"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("192.168.70.137"); factory.setUsername("admin"); factory.setPassword("password"); //建立连接 Connection connection = factory.newConnection(); //获得信道 Channel channel = connection.createChannel(); //声明队列 需要四个参数 //第一个参数-队列名称、第二个参数-队列是否持久,若服务器重启,队列是否要存在、 //第三个参数-是不是独有,是不是队列只供此连接使用,第四个参数-需不需要自动删除,队列没使用的时候,第五个参数-参数 channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); System.out.println("开始接收消息"); //希望处理的数量,在处理完毕前,不接收下一个任务 channel.basicQos(1);//!!! //但必须告诉我你是否处理完毕了,把第二个autoAck参数改成false!!!! channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("收到了消息:"+message); try{ doWork(message); }finally { System.out.println("消息处理完成"); //消息处理完成,要确认消息 channel.basicAck(envelope.getDeliveryTag(),false);//!!! } } }); } private static void doWork(String task){ char[] chars = task.toCharArray(); for(char ch:chars){ if(ch == '.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
12
收起
正在回答 回答被采纳积分+1
1回答
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星