技术文章
当前位置:首页 > Java技术文章 > spring boot集成rabbitmq的实例教程

spring boot集成rabbitmq的代码实例

  • 发布时间:
  • 作者:码农之家原创
  • 点击:108

这篇文章主要知识点是关于springboot、rabbitmq、rabbitmq与spring集成、rabbitmq和springboot、的内容,如果大家想对相关知识点有系统深入的学习,可以参阅以下java相关的电子书

Spring Boot 2精髓-从构建小系统到架构分布式大系统
  • 类型:Spring大小:174.8 MB格式:PDF出版:电子工业出版社作者:李家智
立即下载

更多Java相关的学习资源可以参阅 Java电子书程序设计电子书 等栏目。

spring boot集成rabbitmq的实例教程

一、RabbitMQ的介绍  

RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache).

消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下:

spring boot集成rabbitmq的实例教程

从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息队列,并且当消息队列收到消息之后,接收消息队列传来的消息,并且给予相应的处理.消息队列常用于分布式系统之间互相信息的传递.

对于RabbitMQ来说,除了这三个基本模块以外,还添加了一个模块,即交换机(Exchange).它使得生产者和消息队列之间产生了隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列.那么RabitMQ的工作流程如下所示:

spring boot集成rabbitmq的实例教程

紧接着说一下交换机.交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.

Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.

topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.

headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.

Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略. 

概念:

  • 生产者 消息的产生方,负责将消息推送到消息队列
  • 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息
  • 队列 消息的寄存器,负责存放生产者发送的消息
  • 交换机 负责根据一定规则分发生产者产生的消息
  • 绑定 完成交换机和队列之间的绑定

模式:

1、direct

直连模式,用于实例间的任务分发

2、topic

话题模式,通过可配置的规则分发给绑定在该exchange上的队列

3、headers

适用规则复杂的分发,用headers里的参数表达规则

4、fanout

分发给所有绑定到该exchange上的队列,忽略routing key

安装

单机版安装很简单,大概步骤如下:

# 安装erlang包
 yum install erlang
# 安装socat
 yum install socat
# 安装rabbit 
 rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm 
# 启动服务
 rabbitmq-server start
# 增加管理控制功能
 rabbitmq-plugins enable rabbitmq_management
# 增加用户:
 sudo rabbitmqctl add_user root password
 rabbitmqctl set_user_tags root administrator 
 rabbitmqctl set_permissions -p / root '.*' '.*' '.*'

集群安装,可参考这篇文章:

     rabbitmq集群安装

以上就是rabbitmq的介绍,下面开始本文的正文:spring boot 集成rabbitmq ,本人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理此篇文章。

二、springboot配置

废话少说直接上代码:

配置参数

application.yml:

spring:
 rabbitmq:
 addresses: 192.168.1.1:5672
 username: username
 password: password
 publisher-confirms: true
 virtual-host: /

java config读取参数

/**
 * RabbitMq配置文件读取类
 *
 * @author chenhf
 * @create 2017-10-23 上午9:31
 **/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {

 @Value("${spring.rabbitmq.addresses}")
 private String addresses;
 @Value("${spring.rabbitmq.username}")
 private String username;
 @Value("${spring.rabbitmq.password}")
 private String password;
 @Value("${spring.rabbitmq.publisher-confirms}")
 private Boolean publisherConfirms;
 @Value("${spring.rabbitmq.virtual-host}")
 private String virtualHost;

 // 构建mq实例工厂
 @Bean
 public ConnectionFactory connectionFactory(){
 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
 connectionFactory.setAddresses(addresses);
 connectionFactory.setUsername(username);
 connectionFactory.setPassword(password);
 connectionFactory.setPublisherConfirms(publisherConfirms);
 connectionFactory.setVirtualHost(virtualHost);
 return connectionFactory;
 }

 @Bean
 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
 return new RabbitAdmin(connectionFactory);
 }

 @Bean
 @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
 public RabbitTemplate rabbitTemplate(){
 RabbitTemplate template = new RabbitTemplate(connectionFactory());
 return template;
 }
}

三、rabbitmq生产者配置

主要配置了直连和话题模式,其中话题模式设置两个队列(queueTopicTest1、queueTopicTest2),此两个队列在和交换机绑定时分别设置不同的routingkey(.TEST.以及lazy.#)来验证匹配模式。

/**
 * 用于配置交换机和队列对应关系
 * 新增消息队列应该按照如下步骤
 * 1、增加queue bean,参见queueXXXX方法
 * 2、增加queue和exchange的binding
 * @author chenhf
 * @create 2017-10-23 上午10:33
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {
 /** logger */
 private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);

 /**
 * @Author:chenhf
 * @Description: 主题型交换机
 * @Date:下午5:49 2017/10/23
 * @param
 * @return
 */
 @Bean
 TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){
 TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());
 rabbitAdmin.declareExchange(contractTopicExchange);
 logger.debug("完成主题型交换机bean实例化");
 return contractTopicExchange;
 }
 /**
 * 直连型交换机
 */
 @Bean
 DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
 DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());
 rabbitAdmin.declareExchange(contractDirectExchange);
 logger.debug("完成直连型交换机bean实例化");
 return contractDirectExchange;
 }

 //在此可以定义队列

 @Bean
 Queue queueTest(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("测试队列实例化完成");
 return queue;
 }

 //topic 1
 @Bean
 Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("话题测试队列1实例化完成");
 return queue;
 }
 //topic 2
 @Bean
 Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("话题测试队列2实例化完成");
 return queue;
 }


 //在此处完成队列和交换机绑定
 @Bean
 Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("测试队列与直连型交换机绑定完成");
 return binding;
 }
 //topic binding1
 @Bean
 Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("测试队列与话题交换机1绑定完成");
 return binding;
 }

 //topic binding2
 @Bean
 Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("测试队列与话题交换机2绑定完成");
 return binding;
 }

}

在这里用到枚举类:RabbitMqEnum

/**
 * 定义rabbitMq需要的常量
 *
 * @author chenhf
 * @create 2017-10-23 下午4:07
 **/
public class RabbitMqEnum {

 /**
 * @param
 * @Author:chenhf
 * @Description:定义数据交换方式
 * @Date:下午4:08 2017/10/23
 * @return
 */
 public enum Exchange {
 CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分发"),
 CONTRACT_TOPIC("CONTRACT_TOPIC", "消息订阅"),
 CONTRACT_DIRECT("CONTRACT_DIRECT", "点对点");

 private String code;
 private String name;

 Exchange(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }
 }

 /**
 * describe: 定义队列名称
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum QueueName {
 TESTQUEUE("TESTQUEUE", "测试队列"),
 TOPICTEST1("TOPICTEST1", "topic测试队列"),
 TOPICTEST2("TOPICTEST2", "topic测试队列");

 private String code;
 private String name;

 QueueName(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }

 }

 /**
 * describe: 定义routing_key
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum QueueEnum {
 TESTQUEUE("TESTQUEUE1", "测试队列key"),
 TESTTOPICQUEUE1("*.TEST.*", "topic测试队列key"),
 TESTTOPICQUEUE2("lazy.#", "topic测试队列key");


 private String code;
 private String name;

 QueueEnum(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }
 }

}

以上完成消息生产者的定义,下面封装调用接口

测试时直接调用此工具类,testUser类需自己实现

rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
/**
 * rabbitmq发送消息工具类
 *
 * @author chenhf
 * @create 2017-10-26 上午11:10
 **/

@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{
 /** logger */
 private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);

 private RabbitTemplate rabbitTemplate;

 @Autowired
 public RabbitMqSender(RabbitTemplate rabbitTemplate) {
 this.rabbitTemplate = rabbitTemplate;
 this.rabbitTemplate.setConfirmCallback(this);
 }

 @Override
 public void confirm(CorrelationData correlationData, boolean b, String s) {
 logger.info("confirm: " + correlationData.getId());
 }

 /**
 * 发送到 指定routekey的指定queue
 * @param routeKey
 * @param obj
 */
 public void sendRabbitmqDirect(String routeKey,Object obj) {
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 logger.info("send: " + correlationData.getId());
 this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);
 }

 /**
 * 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
 * @param routeKey
 * @param obj
 */
 public void sendRabbitmqTopic(String routeKey,Object obj) {
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 logger.info("send: " + correlationData.getId());
 this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);
 }
}

四、rabbitmq消费者配置

springboot注解方式监听队列,无法手动指定回调,所以采用了实现ChannelAwareMessageListener接口,重写onMessage来进行手动回调,详见以下代码,详细介绍可以在spring的官网上找amqp相关章节阅读

直连消费者

通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费

/**
 * 消费者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class ExampleAmqpConfiguration {
 @Bean("testQueueContainer")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TESTQUEUE");
 container.setMessageListener(exampleListener());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("testQueueListener")
 public ChannelAwareMessageListener exampleListener() {
 return new ChannelAwareMessageListener() {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
 //通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费
 if ("2".equals(testUser.getUserName())){
  System.out.println(testUser.toString());
  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }

 if ("1".equals(testUser.getUserName())){
  System.out.println(testUser.toString());
  channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
 }

 }
 };
 }

}

topic消费者1

/**
 * 消费者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration {
 @Bean("topicTest1Container")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TOPICTEST1");
 container.setMessageListener(exampleListener1());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("topicTest1Listener")
 public ChannelAwareMessageListener exampleListener1(){
 return new ChannelAwareMessageListener() {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
 System.out.println("TOPICTEST1:"+testUser.toString());
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

 }
 };
 }




}

topic消费者2

/**
 * 消费者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration2 {
 @Bean("topicTest2Container")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TOPICTEST2");
 container.setMessageListener(exampleListener());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("topicTest2Listener")
 public ChannelAwareMessageListener exampleListener() {
 return new ChannelAwareMessageListener() {
 @Override
 public void

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对码农之家的支持。

以上就是本次给大家分享的关于Java的全部知识点内容总结,大家还可以在下方相关文章里找到Java基于JDK 1.8的LinkedList源、 Java编程实现高斯模糊和图、 Java的校验银行卡功能实例、 等java文章进一步学习,感谢大家的阅读和支持。

上一篇:Spring Boot报错:No session repository could be auto-configured, check your configuration如何解决

下一篇:Java在不存在文件夹的目录下创建文件实例代码

展开 +

收起 -

相关电子书
学习笔记
网友NO.898431

SpringBoot 2 快速整合 Filter过程解析

概述 SpringBoot 中没有 web.xml, 我们无法按照原来的方式在 web.xml 中配置 Filter 。但是我们可以通过 JavaConfig(@Configuration +@Bean)方式进行配置。通过FilterRegistrationBean 将自定义 Filter 添加到 SpringBoot 的过滤链中。 实战操作 实战操作通过定义一个拦截所有访问项目的URL的 Filter来进行演示的。 首先定义一个统一访问 URL 拦截的 Filter。代码如下: public class UrlFilter implements Filter { private Logger log = LoggerFactory.getLogger(UrlFilter.class); @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HttpServletRequest httpServletRequest = (HttpServletRequest) request; String requestURI = httpServletRequest.getRequestURI(); StringBuffer requestURL = httpServletRequest.getRequestURL(); log.info("requestURI:" +requestURI+" "+"requestURL:"+requestURL); chain.doFilter(httpServletRequest, response); }} 通……

网友NO.196405

SpringBoot集成Swagger2生成接口文档的方法示例

我们提供Restful接口的时候,API文档是尤为的重要,它承载着对接口的定义,描述等。它还是和API消费方沟通的重要工具。在实际情况中由于接口和文档存放的位置不同,我们很难及时的去维护文档。个人在实际的工作中就遇到过很多接口更新了很久,但是文档却还是老版本的情况,其实在这个时候这份文档就已经失去了它存在的意义。而 Swagger 是目前我见过的最好的API文档生成工具,使用起来也很方便,还可以直接调试我们的API。我们今天就来看下 Swagger2 与 SpringBoot 的结合。 准备工作 一个SpringBoot项目,可以直接去官网 生成一个demo 。 一个用户类。 package cn.itweknow.springbootswagger.model;import java.io.Serializable;/** * @author ganchaoyang * @date 2018/12/19 10:29 * @description */public class User implements Serializable { private Integer id; private String name; private String password; private String……

网友NO.261169

Spring boot创建自定义starter的完整步骤

前言: Springboot的出现极大的简化了开发人员的配置,而这之中的一大利器便是springboot的starter,starter是springboot的核心组成部分,springboot官方同时也为开发人员封装了各种各样方便好用的starter模块,例如: spring-boot-starter-web//spring MVC相关 spring-boot-starter-aop //切面编程相关 spring-boot-starter-cache //缓存相关 starter的出现极大的帮助开发者们从繁琐的框架配置中解放出来,从而更专注于业务代码,而springboot能做的不仅仅停留于此,当面对一些特殊的情况时,我们可以使用我们自定义的springboot starter。 在创建我们自定义的starter之前呢,我们先看看官方是怎么说的: 模块 在springboot官方文档中,特别提到,我们需要创建两个module ,其中一个是autoconfigure module 一个是 starter module ,其中 starter module 依赖 autoconfigure module 但是,网上仍然有很多blog在说这块的时候其……

网友NO.535360

详解Spring Boot实战之Restful API的构建

上一篇文章讲解了通过Spring boot与JdbcTemplate、JPA和MyBatis的集成,实现对数据库的访问。今天主要给大家分享一下如何通过Spring boot向前端返回数据。 在现在的开发流程中,为了最大程度实现前后端的分离,通常后端接口只提供数据接口,由前端通过Ajax请求从后端获取数据并进行渲染再展示给用户。我们用的最多的方式就是后端会返回给前端一个JSON字符串,前端解析JSON字符串生成JavaScript的对象,然后再做处理。本文就来演示一下Spring boot如何实现这种模式,本文重点会讲解如何设计一个Restful的API,并通过Spring boot来实现相关的API。不过,为了大家更好的了解Restful风格的API,我们先设计一个传统的数据返回接口,这样大家可以对比着来理解。 一、非Restful接口的支持 我们这里以文章列表为例,实现一个返回文章列表的接口,代码如下: @Controller@Reque……

<
1
>

Copyright 2018-2019 xz577.com 码农之家

版权责任说明