前言:

RabbitMQ是一种比较流行的消息中间件,全称为Message Queue,即消息队列。是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

离线安装RabbitMQ

# 安装离线包
yum install -y ./erlang-21.3.8.2-1.el7.x86_64.rpm
yum install -y ./socat-1.7.3.2-2.el7.x86_64.rpm
yum install -y ./rabbitmq-server-3.7.15-1.el7.noarch.rpm
# 前台启动MQ sudo rabbitmq-server start
# 后台启动MQ
rabbitmq-server -detached
rabbitmq-plugins enable rabbitmq_management
# 创建用户
rabbitmqctl add_user admin 密码
# 设置角色
rabbitmqctl set_user_tags admin administrator
# 设置默认vhost(“/”)访问权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# 查看admin账户的权限
rabbitmqctl list_permissions -p /
# 设置开机自启
chkconfig rabbitmq-server on

RabbitMQ常用命令

# 应用管理
rabbitmqctl status //显示RabbitMQ中间件的所有信息
rabbitmqctl stop //停止RabbitMQ应用,关闭节点
rabbitmqctl stop_app //停止RabbitMQ应用
rabbitmqctl start_app //启动RabbitMQ应用
rabbitmqctl restart //重置RabbitMQ节点
rabbitmqctl force_restart //强制重置RabbitMQ节点
# 用户管理
rabbitmqctl add_user username password //添加用户
rabbitmqctl delete_user username //删除用户
rabbitmqctl change_password username newpassword //修改密码
rabbitmqctl list_users //列出所有用户
# 权限控制管理
rabbitmqctl add_vhost vhostpath //创建虚拟主机
rabbitmqctl delete_vhost vhostpath //删除虚拟主机
rabbitmqctl list_vhosts //列出所有虚拟主机
rabbitmqctl set_permissions [-p vhostpath] username <conf> <write> <read> //设置用户权限
rabbitmqctl clear_permissions [-p vhostpath] username //删除用户权限
rabbitmqctl list_permissions [-p vhostpath] //列出虚拟机上的所有权限
rabbitmqctl list_user_permissions username //列出用户权限
# 集群管理
rabbitmqctl cluster_status //获得集群配置信息
rabbitmqctl join_cluster rabbit@localhost --ram | --disc //加入到RabbitMQ节点中,使用内存模式或者磁盘模式
rabbitmqctl change_cluster_node_type disc | ram //修改存储模式
rabbitmqctl set_cluster_name newname //修改名字
# 查看管理
rabbitmqctl list_queues [-p <vhostpath>] //查看所有队列
rabbitmqctl list_exchanges [-p <vhostpath>] //查看所有交换机
rabbitmqctl list_bindings [-p <vhostpath>] //查看所有绑定
rabbitmqctl list_connections //查看所有连接
rabbitmqctl list_channels //查看所有信道
rabbitmqctl list_consumers //查看所有消费者信息

Python操作RabbitMQ

常见的总共有六种消息模型,目前自己做的项目用的是消息分发模型:多个收听者监听一个队列。
以后做到其他模型再补充,下面是此模型的例子:
图1
生产者:

import pika

credentials = pika.PlainCredentials('admin', '密码')  # mq用户名和密码。
# BlockingConnection:同步模式。
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=credentials
    )
)
# 建立rabbit协议的通道。
channel = connection.channel()
# 申明消息队列。当不确定生产者和消费者哪个先启动时,可以两边重复声明消息队列。
# durable指定队列是否持久化。确保没有确认的消息不会丢失。
channel.queue_declare(queue='demands-sys', durable=True)
# message不能直接发送给queue,需经exchange到达queue,
# 此处使用以空字符串标识的默认的exchange。
# 向队列插入数值 routing_key是队列名。
# basic_publish的properties参数指定message的属性。
# 此处delivery_mode=2指明message为持久的。
for i in range(10):
    channel.basic_publish(
        exchange='',
        routing_key='demands-sys',
        body='Hello World%s' % i,
        properties=pika.BasicProperties(delivery_mode=2)
    )
# 关闭与rabbitmq server的连接。
connection.close()

消费者:

import pika
import time

credentials = pika.PlainCredentials('admin', '密码')  # mq用户名和密码。
# BlockingConnection:同步模式。
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=credentials
    )
)
# 建立rabbit协议的通道。
channel = connection.channel()
# 申明消息队列。当不确定生产者和消费者哪个先启动时,可以两边重复声明消息队列。
# durable指定队列是否持久化。确保没有确认的消息不会丢失。
channel.queue_declare(queue='demands-sys', durable=True)
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来。
def callback(ch, method, properties, body):
    # 手动发送确认消息。
    time.sleep(1)
    print(body.decode())
    # 告诉生产者,消费者已收到消息。
    ch.basic_ack(delivery_tag=method.delivery_tag)
# 告诉rabbitmq,用callback来接收消息。
# 默认情况下是要对消息进行确认的,以防止消息丢失。
channel.basic_consume('demands-sys', on_message_callback=callback)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。
channel.start_consuming()
Last modification:April 29th, 2021 at 09:37 am