public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("192.168.172.130");
factory.setUsername("admin");
factory.setPassword("password");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(TAST_QUEUE_NAME, true, false, false, null);
System.out.println("开始接受消息");
channel.basicConsume(TAST_QUEUE_NAME, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String message = new String(body, "UTF-8");
System.out.println("收到了消息:" + message);
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("消息处理完成");
}
}
});
}
private static void doWork(String task) throws InterruptedException {
char[] chars = task.toCharArray();
for (char ch : chars){
if(ch == '.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class NewTask {
private final static String TAST_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("192.168.172.130");
factory.setUsername("admin");
factory.setPassword("password");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(TAST_QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 10; i++) {
String message = i + "...";
channel.basicPublish("", TAST_QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息:"+ message);
}
channel.close();
connection.close();
}
}
我试了单个worker的也会漏消息
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星