张小驰出没 张小驰出没
首页
  • Spring合集
  • SpringMVC合集
  • Mybatis合集
  • Spring Boot合集
  • Mybatis-plus合集
  • Spring Security合集
  • Vue合集
  • Redis合集
  • RabbitMQ合集
  • 数据结构

    • 数据结构
  • 算法解析

    • 日常算法
    • 剑指Offer
    • LeetCode
  • 技术与Bug

    • 技术文档
    • Bug解决方法
  • 个人博客

    • Hexo博客搭建
  • 我的项目
  • 我的面试
  • 分类
  • 标签
  • 归档
友链
关于
Hexo
GitHub

MoYu

何德何能,何其荣幸
首页
  • Spring合集
  • SpringMVC合集
  • Mybatis合集
  • Spring Boot合集
  • Mybatis-plus合集
  • Spring Security合集
  • Vue合集
  • Redis合集
  • RabbitMQ合集
  • 数据结构

    • 数据结构
  • 算法解析

    • 日常算法
    • 剑指Offer
    • LeetCode
  • 技术与Bug

    • 技术文档
    • Bug解决方法
  • 个人博客

    • Hexo博客搭建
  • 我的项目
  • 我的面试
  • 分类
  • 标签
  • 归档
友链
关于
Hexo
GitHub
  • Spring合集

  • SpringMVC合集

  • Mybatis合集

  • Spring Boot合集

  • Mybatis-plus合集

  • Spring Security合集

  • Redis合集

  • Vue合集

  • RabbitMQ合集

    • RabbitMQ安装与界面管理
    • RabbitMQ入门案例
      • Rabbit 模式
      • 实现步骤
      • 初步实现
        • 前期准备
        • 简单模型
        • 生产者
        • 消费者
      • AMQP
        • 概念介绍
        • RabbitMQ运转流程
        • 生产者流转过程解析
        • 消费者流转过程解析
    • 消息中间件RabbitMQ
    • RabbitMQ工作模式
    • Rabbit 高级操作
    • Spring Boot 整合 RabbitMQ
  • 学习ing
  • RabbitMQ合集
MoYu
2021-08-13

RabbitMQ入门案例

# RabbitMQ入门案例

# Rabbit 模式

https://www.rabbitmq.com/getstarted.html

# 实现步骤

  • 构建一个 maven工程
  • 导入 rabbitmq的依赖
  • 启动 rabbitmq-server服务
  • 定义生产者
  • 定义消费者
  • 观察消息的在 rabbitmq-server服务中的进程

# 初步实现

# 前期准备

# 1.构建项目

1

# 2.导入依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>
1
2
3
4
5

# 简单模型

4

在上图的模型中,有以下概念:

  1. 生产者,也就是要发送消息的程序
  2. 消费者:消息的接受者,会一直等待消息到来。
  3. 消息队列:图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp

实现步骤:

  1. 创建连接工程
  2. 创建连接 connection
  3. 通过连接获取通道 Channel
  4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
  5. 准备消息内容
  6. 发送消息给队列 queue
  7. 关闭连接
  8. 关闭通道

# 生产者

public class Producer {
    public static void main(String[] args) {
        //1. 创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //这里要使用自己的IP地址
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //2. 创建连接 connection
            connection = connectionFactory.newConnection("生产者");
            //3. 通过连接获取通道 Channel
            channel = connection.createChannel();
            //4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息
            String quequeName = "queuel";
            /**
             * @params1 队列的名称
             * @params2 是否要持久化 durable-false
             * @params3 排他性,是否是独占独立
             * @params4 是否自动删除,随着最后一个消费者消息完毕以后是否把队列自动删除
             * @params5 携带的附属参数
             */
            channel.queueDeclare(quequeName,false,false,false,null);
            //5. 准备消息内容
            String message = "Hello,Consumer";
            //6. 发送消息给队列 queue
            channel.basicPublish("",quequeName,null,message.getBytes());
            System.out.println("消息发送成功");
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //7. 关闭连接
            if (channel != null && channel.isOpen()){
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            //8. 关闭通道
            if (connection != null && connection.isOpen()){
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

# 消费者

public class Consumer {

    public static void main(String[] args) {
        //1. 创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //这里要使用自己的IP地址
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //2. 创建连接 connection
            connection = connectionFactory.newConnection("消费者");
            //3. 通过连接获取通道 Channel
            channel = connection.createChannel();
            //4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
            String quequeName = "queue1";
            channel.queueDeclare(quequeName,false,false,false,null);
            //5.监听消息
            DefaultConsumer consumer = new DefaultConsumer(channel){
                /*
                consumerTag:消息者标签,channel.basicConsume可以指定
                envelope:消息包内容,可从中获取消息id,消息routing key,交换机,消息和重装标记(收到消息失败后是否需要重新发送)
                properties:消息属性
                body;消息
                */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //路由key
                    System.out.println("路由key为:"+ envelope.getRoutingKey());
                    //交换机
                    System.out.println("交换机为:"+ envelope.getExchange());
                    //消息id
                    System.out.println("消息id为:"+ envelope.getDeliveryTag());
                    //收到的消息
                    System.out.println("接收到的消息:"+ new String(body,"UTF-8"));
                    System.out.println("");
                    System.out.println("======================================================");
                    System.out.println("");
                }
            };
            channel.basicConsume("queue1", true, consumer);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //6. 不关闭资源,一直监听
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

5

5

# AMQP

# 概念介绍

AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

AMQP是一个二进制协议,拥有一些现代化特点:多信道、协商式,异步,安全,扩平台,中立,高效。

RabbitMQ 是 AMQP协议 的 Erlang的实现。

概念 说明
连接 Connection 一个网络连接,例如:TCP/IP套接字连接。
会话 Session 端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。
信道 Channel 多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。
客户端 Client AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。
服务节点Broker 消息中间件的服务节点。一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。
端点 AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。
消费者 Consumer 一个从消息队列里请求消息的客户端程序。
生产者 Producer 一个向交换机发布消息的客户端应用程序。

# RabbitMQ运转流程

以 入门案例 为例

# 生产者发送消息

  1. 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
  2. 声明队列、设置属性;如是否排它,是否持久化,是否自动删除;
  3. 将路由键(空字符串)与队列绑定起来;
  4. 发送消息至RabbitMQ Broker;
  5. 关闭信道;
  6. 关闭连接;

# 消费者接收消息

  1. 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker
  2. 向Broker 请求消费相应队列中的消息,设置相应的回调函数;
  3. 等待Broker回应闭关投递响应队列中的消息,消费者接收消息;
  4. 确认(ack,自动确认)接收到的消息;
  5. RabbitMQ从队列中删除相应已经被确认的消息;
  6. 关闭信道;
  7. 关闭连接;

# 生产者流转过程解析

  1. 客户端与代理服务器Broker建立连接。调用 newConnection() 方法 , 会进一步封装 Protocol Header 0-9-1 的报文头发送给 Broker ,以此通知Broker 本次交互采用的是 AMQP 0-9-1 协议,紧接着 Broker 返回 Connection.Start 来建立连接,在连接的过程中涉及 Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
  2. 客户端调用 connection.createChannel 方法。此方法开启信道,其包装的 channel.open 命令发送给 Broker , 等待 channel.basicPublish 方法,对应的AMQP命令为 Basic.Publish , 这个命令包含了content Header 和content Body() 。content Header 包含了消息体的属性,例如:投递模式,优先级等,content Body 包含了消息体本身。
  3. 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。

6

# 消费者流转过程解析

  1. 消费者客户端与代理服务器Broker建立连接。会调用 newConnection() 方法,这个方法会进一步封装 Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是 AMQP 0-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
  2. 消费者客户端调用connection.createChannel方法。和生产者客户端一样,协议涉及Channel . Open/Open-Ok命令。
  3. 在真正消费之前,消费者客户端需要向Broker 发送Basic.Consume 命令(即调用channel.basicConsume 方法〉将Channel 置为接收模式,之后Broker 回执 Basic . Consume - Ok 以告诉消费者客户端准备好消费消息。
  4. Broker 向消费者客户端推送(Push) 消息,即 Basic.Deliver 命令,这个命令和 Basic.Publish 命令一样会携带 Content Header 和Content Body。
  5. 消费者接收到消息并正确消费之后,向Broker 发送确认,即 Basic.Ack 命令。
  6. 客户端发送完消息需要关闭资源时,涉及到 Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok 的命令交互。

7

#RabbitMQ
RabbitMQ安装与界面管理
消息中间件RabbitMQ

← RabbitMQ安装与界面管理 消息中间件RabbitMQ→

最近更新
01
链表
01-25
02
约瑟夫问题
01-25
03
快慢指针
01-25
更多文章>
Theme by Vdoing | Copyright © 2021-2022 MoYu | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式