老师,为什么我明明代码没错误,但是接收一条后就不接收了?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | package workqueues; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 描述: 任务有所耗时,多个任务 */ public class NewTask { //队列名字 private final static String TASK_QUEUE_NAME= "task_queue" ; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost( "192.168.70.137" ); factory.setUsername( "admin" ); factory.setPassword( "password" ); //建立连接 Connection connection = factory.newConnection(); //获得信道 Channel channel = connection.createChannel(); //声明队列 需要四个参数 //第一个参数-队列名称、第二个参数-队列是否持久,若服务器重启,队列是否要存在、 //第三个参数-是不是独有,是不是队列只供此连接使用,第四个参数-需不需要自动删除,队列没使用的时候,第五个参数-参数 channel.queueDeclare(TASK_QUEUE_NAME, true , false , false , null ); //发布消息 for ( int i = 0 ; i < 10 ; i++) { String message; if (i% 2 == 0 ){ message = i + "..." ; } else { message = String.valueOf(i); } channel.basicPublish( "" , TASK_QUEUE_NAME, null , message.getBytes( "UTF-8" )); System.out.println( "发送了消息:" +message); } //关闭连接 channel.close(); connection.close(); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | package workqueues; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Random; import java.util.concurrent.TimeoutException; /** * 描述: 消费者,接受前面的批量消息 */ public class Worker { private final static String TASK_QUEUE_NAME= "task_queue" ; public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost( "192.168.70.137" ); factory.setUsername( "admin" ); factory.setPassword( "password" ); //建立连接 Connection connection = factory.newConnection(); //获得信道 Channel channel = connection.createChannel(); //声明队列 需要四个参数 //第一个参数-队列名称、第二个参数-队列是否持久,若服务器重启,队列是否要存在、 //第三个参数-是不是独有,是不是队列只供此连接使用,第四个参数-需不需要自动删除,队列没使用的时候,第五个参数-参数 channel.queueDeclare(TASK_QUEUE_NAME, true , false , false , null ); System.out.println( "开始接收消息" ); //希望处理的数量,在处理完毕前,不接收下一个任务 channel.basicQos( 1 ); //!!! //但必须告诉我你是否处理完毕了,把第二个autoAck参数改成false!!!! channel.basicConsume(TASK_QUEUE_NAME, false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String message = new String(body, "UTF-8" ); System.out.println( "收到了消息:" +message); try { doWork(message); } finally { System.out.println( "消息处理完成" ); //消息处理完成,要确认消息 channel.basicAck(envelope.getDeliveryTag(), false ); //!!! } } }); } private static void doWork(String task){ char [] chars = task.toCharArray(); for ( char ch:chars){ if (ch == '.' ){ try { Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } } } |
12
收起
正在回答 回答被采纳积分+1
1回答
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧