01-正文
本章节下载 (1.50 MB)
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息中间件,使用Erlang语言开发。
AMQP,全称为Advanced Message Queuing Protocol,是一个提供统一消息服务的应用层标准高级消息队列协议,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息。
RabbitMQ 作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色,特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时延迟处理等均有广泛的应用。
图1-1 RabbitMQ整体架构
涉及到的概念如下:
· 生产者(Producer、Publisher):消息的生产者。
· 消费者(Consumer):消息的消费者。
· 队列(Queue):用于存储还未被消费者消费的消息。
· 服务器(Broker):接受客户端连接,实现AMQP消息队列和路由功能的进程。
· 虚拟主机(Virtual Host):一个Virtual Host里面有若干个Exchange和Queue,类似于资源组的概念。
· 交换机(Exchange):接受发布者发送的消息,并根据Binding规则将消息路由到服务器中的队列。
· 连接(Connection):连接指的是TCP连接,发布者与消费者都是通过TCP连接与RabbitMQ建立连接。
· 通道(Channel):客户端与服务器之间通过通道建立连接。由于TCP建立连接和释放连接的资源开销较大,每一个线程都建立一个TCP连接非常浪费,通过通道共享Connection。
在RabbitMQ中,生产者(Producer)发布消息到交换机(Exchange),交换机根据路由规则将收到的消息分发到与该交换机绑定的队列(Queue),然后消费者根据不同的策略对队列中的信息进行处理。
在新建集群时,选择不同资源模式需要准备不同的资源类型,在RabbitMQ云服务中,RabbitMQ支持部署的资源模式有虚拟机和裸金属部署。资源类型准备说明如下:
· 虚拟机集群:新建虚拟机集群前,要求在云操作系统中有可用资源,可通过[资源/计算可用域]完成虚拟机云资源的准备。
· 裸金属集群:裸金属的节点规格根据硬件配置自动获取,规格数据来自云平台的[资源/裸金属资源池]中处于“可分配”状态的节点。
消息中间件RabbitMQ适用于以下场景:
· 应用解耦:消息中间件RabbitMQ可以插入应用中间,为两边应用提供接口,实现应用通过消息中间件进行通信。
· 异步处理:消息中间件RabbitMQ提供异步处理机制,允许应用把一些消息放入消息中间件中,并不立即处理它,在之后需要的时候再处理。
· 流量削峰:使用消息中间件RabbitMQ能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。
新建集群前,要求在云平台中已完成云资源的准备。
根据集群部署规划,在云资源准备完成以后,可新建RabbitMQ集群。新建RabbitMQ集群的步骤详情可参见产品安装部署手册或在线联机帮助。
本文档内容将以集群已创建成功为前提,介绍如何进行状态检查、以及对集群的基本操作等。
· 关于RabbitMQ的部署流程以及部署过程中相关的参数说明等,详情请参见产品安装部署手册。
· RabbitMQ搭建模式包括单节点和集群模式,其中集群模式中至少需要3个节点。
完成部署集群之后,可在集群管理页面查看集群列表中的运行状态,若运行状态为“运行中”、RabbitMQ控制台可以正常访问、且无实例和主机异常告警,则表示集群运行正常;否则,集群存在异常,如图2-1所示。
(1) 通过RabbitMQ云服务平台进入,在集群列表中,单击集群名,进入集群详情页面。
(2) 选择[组件]页签,单击组件名,进入组件详情页。
(3) 单击右上角<快速链接>,选择主机名,即可进入RabbitMQ对应的控制台页面。
图2-2 访问控制台
图2-3 RabbitMQ控制台页面
单击[Queues]菜单,进入队列列表界面,在[Add a new queue]菜单下输入队列相关参数,点击<Add queue>。
图2-4 Add queue页面
单击[Exchanges]菜单,进入交换机列表界面,在[Add a new exchange]菜单下输入交换机相关参数;此处,以direct类型为例,Type选择direct。单击<Add exchange>。
图2-5 Add exchange页面
在Exchanges列表界面,单击已创建的exchange名称,进入它的详情页面。点击下方的[Bindings]菜单,将exchange和对应的queue进行绑定。单击<Bind>。
图2-6 绑定Exchange和Queue
在Exchanges列表界面,单击已创建的exchange名称,进入它的详情页面,点击下方的[Publish message]菜单,将要发送的消息输入到Payload,单击<Publish message>按钮。
图2-7 发布消息
单击[Queue]菜单项,单击已创建的Queue名称,进入它的Queue详情页面,点击下方的[Get messages]菜单下的<Get Message(s)>消费一条消息。
图2-8 接收消息
进入集群详情页面,如图3-1所示。集群详情页主要展示该集群的基本信息、概览、组件、主机、告警和计费等相关信息,同时可对集群进行各项操作。
· 基本信息:展示该集群的基本信息,包含集群模式、节点数、资源区域、资源类型、密钥对、运行状态、监控状态等。
· 概览:展示了该集群中组件的统计、CPU\磁盘\内存使用率的性能趋势、CPU\磁盘\内存使用率等。
· 组件:展示集群中安装的组件以及对组件的停止、重启等操作;单击组件名,可查看组件中进程的部署拓扑、配置、配置修改历史信息。
· 主机:展示集群中的主机信息,以及查看监控详情。
· 告警:展示告警信息统计数,以及产生的告警信息,可进行告警相关操作。
· 计费:展示当前集群计费信息,包括计费开始/结束时间、产生的费用。
RabbitMQ组件详情页面如图3-2所示。组件详情页面主要展示部署拓扑、配置详情和配置修改历史等相关信息,同时可对组件或组件进程执行相关管理操作,可查看或修改组件的各配置项信息,查看组件的配置修改历史及当前使用配置版本。
主要功能如下:
· 部署拓扑:在[组件详情/部署拓扑]页签,可查看组件进程的安装详情以及运行状态详情。
进程名:同一个进程可分别安装在多个主机节点上,所以进程列表中某一进程名可能重复出现,但同一进程名对应的主机名和主机IP不同。
· 配置:在[组件详情/配置]页签,可查看或修改组件各配置项的信息。
· 配置修改历史:在[组件详情/配置修改历史]页签,可查询组件的配置历史版本以及当前使用版本。
· 组件操作:在组件详情右上角,可对组件执行重启、停止等操作。
图3-2 RabbitMQ组件详情
· RabbitMQ集群扩容是指集群新添加节点并在该节点上新增RabbitMQ服务。
· 主机扩容只能选择与原集群节点相同的规格。
· 主机扩容操作不可中止或暂停;集群中的主机不可删除。
RabbitMQ扩容操作步骤如下:
(1) 在云服务RabbitMQ管理页面的左侧导航树中选择[集群管理],进入集群管理页面。
(2) 选择[集群列表]页签,单击集群名称可跳转至集群详情页面。
(3) 选择[主机]页签,单击<主机扩容>按钮,弹出主机扩容页面;或单击右上角<集群操作/主机扩容>进入主机扩容页面。
(4) 选择扩容节点数,单击<确定>按钮,即完成主机扩容。
图3-3 RabbitMQ集群扩容
通过顶部导航栏[云服务/消息中间件RabbitMQ]进入RabbitMQ后,在左侧导航树[主机管理]及[磁盘管理]进入对应页面,可以查看创建的所有RabbitMQ集群主机,以及主机的资源使用情况和磁盘使用情况。
图3-4 主机管理配置
图3-5 磁盘管理
可以在新建集群时,选择开启监控;或者新建集群成功后,再开启监控。
开启监控后可以查看主机和集群相关指标,RabbitMQ开启操作步骤如下:
(1) 在RabbitMQ云服务管理页面的左侧导航树中选择[集群管理],进入集群管理页面。
(2) 在集群对应的操作列单击<开启监控>按钮,完成开启监控;对于已开启监控的集群,点击<停止监控>按钮,关闭集群监控。
图3-6 RabbitMQ集群开启和关闭监控
可执行查看主机监控详情的操作,监控内容以性能图表的形式展示主机资源监控、RabbitMQ基本运行信息。步骤如下:
(1) 在云服务管理页面的左侧导航树中选择[集群管理],进入集群管理页面。
(2) 选择[集群列表]页签,单击集群名称,进入集群详情页面。
(3) 在集群详情页面,单击[主机]页签,单击操作列的<监控详情>,可查看监控信息。
图3-7 RabbitMQ监控信息
RabbitMQ告警规则修改步骤如下:
(1) 在云服务管理页面的左侧导航树中选择[集群管理],进入集群管理页面。
(2) 选择[集群列表]页签,单击集群名称,进入集群详情页面。
(3) 在集群详情页面,选择[告警]页签,单击<告警配置>按钮,在弹出的窗口中修改告警规则。
¡ RabbitMQ云服务的告警指标包括两大类:
- RabbitMQ服务相关指标:消息总数、通道数、队列数、实例可用性、连接数、消费者数、未确认消息数
- 主机相关指标:主机可用性、内存使用率、CPU使用率、磁盘使用率。
用户可以根据自己业务需要修改告警阈值。
图3-8 告警配置
RabbitMQ告警信息查看步骤如下:
(1) 在RabbitMQ云服务管理页面的左侧导航树中选择[集群管理],进入集群管理页面。
(2) 选择[集群列表]页签,单击集群名称,进入集群详情页面。
(3) 在集群详情页面,单击[告警]页签,即可查看产生的告警信息;同时,对告警信息可进行“历史查询”、“手动恢复”、“删除”等操作。
RabbitMQ告警管理相关操作。
(1) 在云服务管理页面的左侧导航树中选择[告警管理]菜单项,进入告警管理页面。
(2) 可以在[告警列表]、[告警联系组]、[告警联系人]页面查看相应内容;同时可以新建告警组、新建告警联系人。
图3-9 告警管理
图3-10 告警联系人管理
日志管理主要对RabbitMQ服务运行日志及集群运行日志的收集并下载供用户使用的,提高快速定位异常问题的效率。在[日志管理]页面可收集运行日志、集群日志。本节仅支持系统管理员操作。
运行日志以天为单位进行收集和下载,当某天没有产生日志信息时,下载成功后解压没有该天的日志文件信息。
在服务运行过程中,若遇到异常或其他情况,可查看运行日志信息以便定位异常。
(1) 在日志管理页面,选择<运行日志收集>,弹出运行日志收集窗口。
(2) 输入SSH端口信息和root密码后,单击<测试远程连接>按钮,返回远程连接校验结果。
(3) 显示运行日志收集条件。部分参数说明如下:
¡ 远程连接信息
- SSH端口:云平台远程SSH连接端口。
- root密码:root用户的远程登录密码。
¡ 日志收集类型
- 基础日志:base服务中的相关日志,默认选中不可修改。
- 服务日志:RabbitMQ服务运行的日志,默认选中不可修改。
¡ 日志收集时间:日志产生的时间范围,默认选择前一天和当天的时间,最大范围30天。
(4) 相关信息配置完成后,单击<确定>按钮,即可开始运行日志收集并下载日志压缩文件。
仅支持下载集群节点为“在线”状态的集群日志信息。
在集群使用过程中,若出现集群运行异常或其他情况,可查看集群节点的日志信息以便定位异常。
(1) 在日志管理页面,单击<集群日志收集>,弹出集群日志收集窗口。
(2) 选择需要下载日志信息的集群名称,会查询当前集群下的节点信息并展示在下方列表中,部分参数说明如下:
¡ 节点名:集群节点的名称。
¡ 状态:集群节点的状态,分为在线、离线两种状态。
¡ 节点IP:集群节点的IP信息。
当集群节点状态为“在线”时,单击列表操作列的<下载>按钮,即可开始集群日志收集并下载日志压缩文件。
开发所使用jar包
compile("com.rabbitmq:amqp-client:5.7.3")
· OpenJDK版本为1.8。
· 保证开发环境与连接集群时间同步,避免出现连接不上、连接超时等问题。
RabbitMQ调用结束时注意关闭RabbitMQ连接和通道。下面示例代码中的主机地址、端口、用户名密码等信息需要视情况而定,RabbitMQ相关的术语概念请查看官网定义。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeoutException;
public class RabbitMQDemo {
private final String[] hosts;
private final int port;
private final String username;
private final String password;
private final String vhost;
private ConnectionFactory connectionFactory;
private Connection connection;
private Channel channel;
public RabbitMQDemo(String[] hosts, int port, String username, String password, String vhost) {
this.hosts = hosts;
this.port = port;
this.username = username;
this.password = password;
this.vhost = vhost;
}
public void connect() throws IOException, TimeoutException {
this.connectionFactory = createConnectionFactory(); // 创建连接工厂
this.connection = connectionFactory.newConnection(); // 创建连接
this.channel = connection.createChannel(); // 创建通道
// 增加消息返回监听器
this.channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
System.out.println(String.format("Message returned: [%s - %s] - %s: %s(%s)", exchange, routingKey, replyCode, replyText, new String(body)));
});
this.channel.confirmSelect(); // 开启消息确认机制
this.channel.addConfirmListener(new ConfirmListener() { // 添加消息确认监听器,即使用异步确认机制
@Override
public void handleAck(long deliveryTag, boolean multiple) {
System.out.println(String.format("Message[%d-%s] is acked", deliveryTag, multiple));
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
System.out.println(String.format("Message[%d-%s] is not acked", deliveryTag, multiple));
}
});
}
public void disconnect() {
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException | TimeoutException ignored) {
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException ignored) {
}
}
}
public void init(String exchange, String routingKey, String queue) throws IOException {
// 声明交换机、路由键、队列
channel.exchangeDeclare(exchange, "direct");
channel.queueDeclare(queue, true, false, true, Collections.emptyMap());
channel.queueBind(queue, exchange, routingKey);
}
public void registerConsumer(String queue, Consumer consumer) throws IOException {
// 注册消费者
channel.basicConsume(queue, consumer);
}
public void publish(String exchange, String routingKey, String message) throws IOException {
// 发布消息
channel.basicPublish(exchange, routingKey, true, new AMQP.BasicProperties(), message.getBytes());
}
private ConnectionFactory createConnectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost(vhost);
factory.setUsername(username);
factory.setPassword(password);
ArrayList<String> hosts = new ArrayList<>(Arrays.asList(this.hosts));
Collections.shuffle(hosts);
factory.setHost(String.join(",", hosts));
factory.setPort(port);
return factory;
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
RabbitMQDemo rabbit = new RabbitMQDemo(new String[]{"10.121.92.158"}, 5672, "admin", "passwd@123", "/");
rabbit.connect();
rabbit.init("testex", "testrk", "testqueue");
rabbit.registerConsumer("testqueue", new DefaultConsumer(rabbit.channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(String.format("Received a message: %s", new String(body)));
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
}); // 模拟服务启动
rabbit.publish("testex", "testrk", "hello, this is a message");
Thread.sleep(10000); // 模拟服务运行
rabbit.disconnect(); // 模拟服务关闭
}
}
RabbitMQ其它常用接口函数,详情请参见官网https://www.rabbitmq.com。
RabbitMQ可以通过http 接口、web admin组件进行管理。除此之外,还可以通过rabbitmqctl进行管理,rabbitmqctl基本上包含了RabbitMQ的所有管理功能,更为全面,常用方法总结如此。
· 创建用户
rabbitmqctl add_user username password
· 删除用户
rabbitmqctl delete_user username
· 修改用户密码
rabbitmqctl change_password username password
· 查看当前用户
rabbitmqctl list_users
· 设置用户角色
rabbitmqctl set_user_tags username tag # tag分为:administrator, monitoring, management, policymaker
· 查看指定用户权限
· 查看集群状态
rabbitmqctl cluster_status
· 查看当前所有用户
rabbitmqctl list_users
· 查看插件状态
rabbitmq-plugins list
· 开启插件
rabbitmq-plugins enable plugin_name
· 关闭插件
rabbitmq-plugins disable plugin_name
· 查看队列
rabbitmqctl list_queues
当前版本RabbitMQ支持通过页面配置来修改RabbitMQ的组件配置,修改完成后需要重新启动RabbitMQ集群。
通常情况下,页面配置足以满足需求。在页面配置无法满足需求的时候,您可以登录到服务主机,逐一修改节点配置。但是请注意,通过后面这种方式修改的配置,会在页面重启RabbitMQ集群时被页面的配置覆盖掉,请谨慎操作。
不同款型规格的资料略有差异, 详细信息请向具体销售和400咨询。H3C保留在没有任何通知或提示的情况下对资料内容进行修改的权利!