1.配置rabbitmq集群(可以参考前一篇)
2.Nginx做负载均衡
注意:Nginx1.90版本后 新增了stream 模块用于一般的 TCP 代理和负载均衡,之前版本不支持
修改Nginx配置文件nginx.conf添加如下配置,监听12345端口
stream {
upstream rabbitmqserver { server 192.168.191.20:5672; server 192.168.191.131:5672; server 192.168.191.132:5672; }server {
listen 12345; proxy_pass rabbitmqserver; } }修改后重启Nginx,使之生效。
3.测试
获取连接的工具类
package com.sky.study;
import java.io.IOException;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; /** * 生成连接 * 86940 * */ public class ConnectionUtil { public static Connection getConnection() throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.191.132");//做Nginx负载均衡的服务器地址 factory.setPort(12345);//使用Nginx做了tcp负载均衡,监听的是12345端口 factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("hello"); Connection connection = factory.newConnection(); return connection; } }生产者
package com.sky.study.helloWorld;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.sky.study.ConnectionUtil;public class Send {
private static final String QUEUE_NAME = "q.test.01";public static void main(String[] args) {
Connection connection = null; Channel channel = null; try { // 获取连接 connection = ConnectionUtil.getConnection(); // 创建通过 channel = connection.createChannel(); // 声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送成功");} catch (IOException e) {
e.printStackTrace(); } finally { if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } }}
} 消费者package com.sky.study.helloWorld;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.sky.study.ConnectionUtil; import com.rabbitmq.client.ShutdownSignalException;public class Recv {
private static final String QUEUE_NAME = "q.test.01";public static void main(String[] args) {
Connection connection = null; Channel channel = null; try { connection = ConnectionUtil.getConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列 channel.basicConsume(QUEUE_NAME, true, consumer); // 获取消息 while (true) { Delivery nextDelivery = consumer.nextDelivery(); String message = new String(nextDelivery.getBody()); System.out.println(message); } } catch (IOException e) { e.printStackTrace(); } catch (ShutdownSignalException e) { e.printStackTrace(); } catch (ConsumerCancelledException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }结束,谢谢支持