1
2
3
4
5
6
7
作者:李晓辉

联系方式:

1. 微信:Lxh_Chat

2. 邮箱:939958092@qq.com

今天咱们来聊聊OpenStack里那个超重要的“通信枢纽”——RabbitMQ。这玩意儿可厉害了,它就像是OpenStack各个组件之间的“传话筒”,让它们能顺畅地交流。别看它平时没啥存在感,但要是没了它,OpenStack可就乱成一锅粥了。

RabbitMQ

OpenStack的后端主要靠两样东西撑起来:一个是数据库,用来存那些得长期保存的数据;另一个就是消息代理,它就像是各个服务组件之间的“快递员”,负责把消息从一个地方送到另一个地方。消息代理这事儿,只要支持AMQP协议的都可以用,而咱们这次用的是RabbitMQ。

RabbitMQ的“工作原理”

想象一下,你有个消息要发给朋友,但你不想直接打电话,也不想发短信,而是想通过一个中间人来传话。RabbitMQ就是这个中间人。它里面有个“交换机”(Exchange),还有个“队列”(Queue),这两个东西是它工作的核心。

  • 交换机(Exchange):就像是一个消息的“分发中心”。当你有个消息要发出去的时候,你先把消息扔到交换机里。交换机收到消息后,就会根据消息里的“地址”(也就是路由密钥)来决定把消息发到哪个队列里。

  • 队列(Queue):这就是消息的“暂存地”。交换机把消息发过来后,消息就会在这个队列里等着。订阅了这个队列的使用者(比如其他服务组件)就会从队列里把消息取走,然后处理。

整个过程是这样的:

  1. 消息生成:比如,某个服务组件(我们叫它“创建者”)有个消息要发给其他组件(我们叫它们“使用者”)。创建者就把消息打包好,贴上“地址”(路由密钥),然后扔到交换机里。

  2. 交换机分发:交换机收到消息后,看看消息上的“地址”,然后把消息送到对应的队列里。这个过程就像是快递员根据包裹上的地址,把包裹送到不同的快递点。

  3. 队列暂存:消息到了队列里,就会乖乖地待着。队列就像是一个“消息池”,里面可以存好多消息。

  4. 使用者接收:使用者组件订阅了这个队列,它们会时不时地过来取消息。一旦取到消息,就开始处理,比如执行某个操作或者回复消息。

为啥要用RabbitMQ?

RabbitMQ的好处在于,它能保证消息不会丢失。即使使用者暂时没来取消息,消息也会安全地待在队列里,等着使用者来处理。而且,它还能同时处理好多好多消息,效率超高!这就像是一个超级快递公司,不管有多少包裹,都能保证送到收件人手里。
在OpenStack里,RabbitMQ就像是一个超级调度员,让各个服务组件能够高效、可靠地“聊天”。要是没有它,OpenStack的各个组件可能就得“失联”了。

术语描述
Binding(绑定)这就像是消息的“导航路线”。它告诉交换机,哪些消息应该发到哪个队列里。
Consumer(消费者)这就是接收消息的应用程序,也就是那些需要处理消息的组件。
Exchange(交换机)这是消息的“分发中心”,它根据消息的“地址”(路由密钥)把消息发到对应的队列。
Publisher/Producer(发布者/生产者)这就是发送消息的应用程序,也就是那些需要把消息发出去的组件。
Queue(队列)这是消息的“暂存地”,消息会在这里等着,直到被消费者取走。
Routing key(路由密钥)这是消息的“地址”,生产者在消息上写上这个地址,交换机就会根据它来分发消息。
Vhost(虚拟主机)这就像是RabbitMQ的“隔离区”,不同的应用程序可以在不同的虚拟主机里,互不干扰。

消息代理交换概念

在RabbitMQ里,交换(Exchange)和队列(Queue)之间的交互,其实就像是“按图索骥”。消息里会有一个“地址标签”,我们叫它“路由密钥”(Routing Key)。而队列会绑定一个“过滤条件”,我们叫它“绑定密钥”(Binding Key)。交换会把消息送到那些绑定密钥和消息的路由密钥匹配的队列里。

根据这个“匹配”机制的不同方式,RabbitMQ提供了好几种不同类型的交换。每种交换都有自己的“玩法”,具体用哪一种,就看你的需求了。

RabbitMQ提供了几种不同类型的交换(Exchange),每种类型都有其独特的功能和用途。以下是RabbitMQ中常见的交换类型及其特点

1. Direct Exchange(直连交换)

  • 特点:这种交换类型就像是“一对一”的快递服务。消息的路由密钥(Routing Key)必须和队列绑定的绑定密钥(Binding Key)完全匹配,消息才会被发送到对应的队列。

  • 应用场景:当你需要把消息精准地发送到特定的队列时,就用这个。比如,一个订单处理系统,不同的订单类型(如退货、换货)可以绑定到不同的队列。

例子

  • 生产者发送消息时,指定路由密钥为order.return

  • 队列绑定了order.return这个绑定密钥。

  • 消息只会被发送到这个队列。

taskC_create 消息路由到名为 taskC_queue 的队列

2. Fanout Exchange(扇形交换)

  • 特点:这种交换类型就像是“广播”。它会把消息发送到所有绑定的队列,不管消息的路由密钥是什么。简单来说,就是“有绑定,就发消息”。

  • 应用场景:当你需要把消息同时发送给多个消费者时,就用这个。比如,一个日志系统,所有绑定的队列都会收到日志消息。

例子

  • 生产者发送消息,不管路由密钥是什么。

  • 所有绑定到这个交换的队列都会收到消息。

3. Topic Exchange(主题交换)

  • 特点:这种交换类型就像是“模糊匹配”。消息的路由密钥和队列的绑定密钥可以包含通配符(*#),允许更灵活的匹配。*匹配一个单词,#匹配多个单词。

  • 应用场景:当你需要根据消息的主题来分发消息时,就用这个。比如,一个消息系统,可以根据消息的主题(如user.loginuser.logout)来分发消息。

例子

  • 生产者发送消息,路由密钥为user.login

  • 队列绑定了user.*,表示匹配所有以user.开头的消息。

  • 消息会被发送到这个队列。

下图例子:

  • 通过在路由模式中使⽤通配符,消息可以同时发送到⼀个或多个队列

  • 只有计算任务路由到名为compute_tasks 的队列,但包括计算在内的所有任务都路由到名为 all_tasks 的队列。

使⽤消息队列实施 RPC

想象一下,你有个请求需要处理,但你自己搞不定,得找服务器帮忙。这时候,你就可以用消息队列来实现RPC。具体来说,就是你(客户端)给服务器(服务端)发个消息,服务器处理完后,再把结果发回来。整个过程就像是“你问我答”。

客户端(请求方)的流程

  1. 发送请求

    • 客户端(请求方)有个问题要问服务器,于是它会创建一条消息,把问题写进去。

    • 客户端还会创建一个临时的队列(我们叫它“回调队列”),用来接收服务器的回复。

    • 客户端把消息发送到一个“请求队列”,并在消息里附上“回调队列”的名字,告诉服务器:“嘿,处理完后把结果发到这里哦!”

  2. 等待回复

    • 客户端发送完消息后,不会干等着,而是继续做自己的事情,但会时不时地去“回调队列”里看看有没有回复。

    • 客户端会一直轮询“回调队列”,直到收到服务器的回复。

服务端(服务器)的流程

  1. 接收请求

    • 服务器(服务端)会订阅“请求队列”,时刻准备接收客户端发来的消息。

    • 当服务器收到消息后,会解析消息内容,看看客户端问了啥问题。

  2. 处理请求

    • 服务器根据客户端的问题,进行处理,计算结果。
  3. 发送回复

    • 服务器处理完后,会创建一条回复消息,把结果写进去。

    • 服务器会查看客户端发来的消息,找到“回调队列”的名字,然后把回复消息发送到这个队列。

假设你有个客户端,需要计算两个数的和。它会给服务器发个消息:“嘿,帮我算一下3 + 5。” 同时,客户端会创建一个临时的“回调队列”。

服务器收到消息后,会计算3 + 5 = 8,然后把结果8写进一条新的消息里,再把这条消息发送到客户端指定的“回调队列”。

客户端一直在轮询“回调队列”,当它收到消息后,发现结果是8,于是就开心地说:“哦,原来是8!”

大概图示:

rabbitmq常见实操

查看状态摘要

使⽤ report 命令可显⽰ RabbitMQ 守护进程的当前状态摘要,包括交换和队列的数量及类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@controller0 ~]# podman exec -it rabbitmq-bundle-podman-0 /bin/bash
()[root@controller0 /]# rabbitmqctl report
Reporting server status of node rabbit@controller0 ...

Status of node rabbit@controller0 ...
[{pid,677},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","3.7.23"},
{rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.7.23"},
{amqp_client,"RabbitMQ AMQP Client","3.7.23"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","3.7.23"},
{rabbit,"RabbitMQ","3.7.23"},
{mnesia,"MNESIA CXC 138 12","4.15.6"},
{rabbit_common,
"Modules shared by rabbitmq-server and rabbitmq-erlang-client",
"3.7.23"},

列出用户

使⽤ list_users 命令来列出 RabbitMQ ⽤⼾

1
2
3
4
()[root@controller0 /]# rabbitmqctl list_users
Listing users ...
user tags
guest [administrator]

创建用户

使⽤ add_user命令来创建RabbitMQ ⽤⼾

1
2
3
4
5
6
7
()[root@controller0 /]# rabbitmqctl add_user lixiaohui lxhpassword
Adding user "lixiaohui" ...
()[root@controller0 /]# rabbitmqctl list_users
Listing users ...
user tags
lixiaohui []
guest [administrator]

分配权限

使⽤ set_permissions命令来设置RabbitMQ ⽤⼾权限

权限为:

  • configure:定义用户可以配置的资源。

  • write:定义用户可以写入的资源。

  • read:定义用户可以读取的资源。

1
2
3
4
5
6
7
()[root@controller0 /]# rabbitmqctl set_permissions lixiaohui '.*' '.*' '.*'
Setting permissions for user "lixiaohui" in vhost "/" ...
()[root@controller0 /]# rabbitmqctl list_permissions
Listing permissions for vhost "/" ...
user configure write read
lixiaohui .* .* .*
guest .* .* .*

添加用户标签

使⽤ set_user_tags命令来设置⽤⼾标签,administrator将此用户标记为管理员

1
2
3
4
5
6
7
()[root@controller0 /]# rabbitmqctl set_user_tags lixiaohui administrator
Setting tags for user "lixiaohui" to [administrator] ...
()[root@controller0 /]# rabbitmqctl list_users
Listing users ...
user tags
lixiaohui [administrator]
guest [administrator]

列出Exchange

使⽤ list_exchanges 显⽰ RabbitMQ 守护进程上默认配置的交换

1
2
3
4
5
()[root@controller0 /]# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name type
heat-engine-listener_fanout fanout
manila-share_fanout fanout

列出队列

使⽤ list_queues 命令可列出可⽤的队列及其属性

1
2
3
4
5
6
7
()[root@controller0 /]# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
compute.compute1.overcloud.example.com 0
compute.computehci0.overcloud.example.com 0
manila-scheduler.hostgroup 0

列出消费者

使⽤ list_consumers 命令可列出所有的消费者,以及它们所订阅的队列

1
2
3
4
()[root@controller0 /]# rabbitmqctl list_consumers
Listing consumers in vhost / ...
queue_name channel_pid consumer_tag ack_required prefetch_count arguments
compute.compute1.overcloud.example.com <rabbit@controller0.1.1237.0> 2 true 0 []

跟踪 RabbitMQ 消息

RabbitMQ有一个很酷的功能,叫做Firehose Tracer(消防水带追踪器)。这个名字听起来就很厉害,它的作用也确实很强大。这个功能可以用来跟踪系统中所有的消息,无论是进来的还是出去的。换句话说,它就像一个超级监控器,把所有消息的流动都记录下来。

当你启用这个功能后,所有进入RabbitMQ系统的消息都会被复制到一个特殊的交换(Exchange)中,这个交换的名字叫amq.rabbitmq.trace。这样,你就可以在这个交换里看到所有消息的流动情况,方便你进行调试和监控。

为什么叫“Firehose”?

“Firehose”(消防水带)这个名字来源于它的功能——就像消防水带一样,把所有的消息都“喷”出来,让你能够看到每一个消息的流动。这个功能非常适合在开发和调试阶段使用,但因为它会记录所有的消息,所以会对系统性能产生一定的影响。

如果你想启用这个功能,可以使用以下命令:

这个命令会启动Firehose Tracer功能,所有进入系统的消息都会被复制到amq.rabbitmq.trace交换中。你可以通过查看这个交换中的消息来跟踪系统的运行情况。

1
rabbitmqctl trace_on

当你完成调试后,记得禁用这个功能,以避免对系统性能产生不必要的影响。禁用跟踪的命令是:

执行这个命令后,Firehose Tracer功能就会停止,系统不会再复制消息到amq.rabbitmq.trace交换中。

1
rabbitmqctl trace_off

注意事项

  • 性能影响:启用Firehose Tracer会增加系统的负载,因为它需要处理和复制所有的消息。因此,建议只在需要调试的时候启用它,并在调试完成后尽快禁用。

  • 监控amq.rabbitmq.trace:启用跟踪后,你可以通过查看amq.rabbitmq.trace交换中的消息来了解系统的运行情况。这个交换会包含所有消息的副本,方便你进行分析。