Spring整合rabbitmq实践(一):基础

作者&投稿:窦鬼 (若有异议请与网页底部的电邮联系)
~

Spring整合rabbitmq实践(二):扩展
Spring整合rabbitmq实践(三):源码

producer:消息生产者;

consumer:消息消费者;

queue:消息队列;

exchange:接收producer发送的消息按照binding规则转发给相应的queue;

binding:exchange与queue之间的关系;

virtualHost:每个virtualHost持有自己的exchange、queue、binding,用户只能在virtualHost粒度控制权限。

fanout:

群发到所有绑定的queue;

direct:

根据routing key routing到相应的queue,routing不到任何queue的消息扔掉;可以不同的key绑到同一个queue,也可以同一个key绑到不同的queue;

topic:

类似direct,区别是routing key是由一组以“.”分隔的单词组成,可以有通配符,“*”匹配一个单词,“#”匹配0个或多个单词;

headers:

根据arguments来routing。

arguments为一组key-value对,任意设置。

“x-match”是一个特殊的key,值为“all”时必须匹配所有argument,值为“any”时只需匹配任意一个argument,不设置默认为“all”。

通过以下配置,可以获得最基础的发送消息到queue,以及从queue接收消息的功能。

这个包同时包含了一些其它的包:spring-context、spring-tx、spring-web、spring-messaging、spring-retry、spring-amqp、amqp-client,如果想单纯一点,可以单独引入。

最主要的是以下几个包,

spring-amqp:

spring-rabbit:

amqp-client:

个人理解就是,spring-amqp是spring整合的amqp,spring-rabbit是spring整合的rabbitmq(rabbitmq是amqp的一个实现,所以可能spring-rabbit也是类似关系),amqp-client提供操作rabbitmq的java api。

目前最新的是2.0.5.RELEASE版本。如果编译报错,以下信息或许能有所帮助:

(1)

解决方案:spring-amqp版本改为2.0.5.RELEASE。

(2)

解决方案:spring-context版本改为5.0.7.RELEASE。

(3)

解决方案:spring-core版本改为5.0.7.RELEASE。

(4)

解决方案:spring-beans版本改为5.0.7.RELEASE。

(5)

解决方案:spring-aop版本改为5.0.7.RELEASE。

总之,需要5.0.7.RELEASE版本的spring,及相匹配版本的amqp-client。

后面所讲的这些bean配置,spring-amqp中都有默认配置,如果不需要修改默认配置,则不用人为配置这些bean。后面这些配置也没有涉及到所有的属性。

这里的ConnectionFactory指的是spring-rabbit包下面的ConnectionFactory接口,不是amqp-client包下面的ConnectionFactory类。

上面这个bean是spring-amqp的核心,不论是发送消息还是接收消息都需要这个bean,下面描述一下里面这些配置的含义。

setAddresses :设置了rabbitmq的地址、端口,集群部署的情况下可填写多个,“,”分隔。

setUsername :设置rabbitmq的用户名。

setPassword :设置rabbitmq的用户密码。

setVirtualHost :设置virtualHost。

setCacheMode :设置缓存模式,共有两种, CHANNEL 和 CONNECTION 模式。

CHANNEL 模式,程序运行期间ConnectionFactory会维护着一个Connection,所有的操作都会使用这个Connection,但一个Connection中可以有多个Channel,操作rabbitmq之前都必须先获取到一个Channel,否则就会阻塞(可以通过setChannelCheckoutTimeout()设置等待时间),这些Channel会被缓存(缓存的数量可以通过setChannelCacheSize()设置);

CONNECTION 模式,这个模式下允许创建多个Connection,会缓存一定数量的Connection,每个Connection中同样会缓存一些Channel,除了可以有多个Connection,其它都跟CHANNEL模式一样。

这里的Connection和Channel是spring-amqp中的概念,并非rabbitmq中的概念,官方文档对Connection和Channel有这样的描述:

关于 CONNECTION 模式中,可以存在多个Connection的使用场景,官方文档的描述:

setChannelCacheSize :设置每个Connection中(注意是每个Connection)可以缓存的Channel数量,注意只是缓存的Channel数量,不是Channel的数量上限,操作rabbitmq之前(send/receive message等)要先获取到一个Channel,获取Channel时会先从缓存中找闲置的Channel,如果没有则创建新的Channel,当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。

注意,改变这个值不会影响已经存在的Connection,只影响之后创建的Connection。

setChannelCheckoutTimeout :当这个值大于0时, channelCacheSize 不仅是缓存数量,同时也会变成数量上限,从缓存获取不到可用的Channel时,不会创建新的Channel,会等待这个值设置的毫秒数,到时间仍然获取不到可用的Channel会抛出AmqpTimeoutException异常。

同时,在 CONNECTION 模式,这个值也会影响获取Connection的等待时间,超时获取不到Connection也会抛出AmqpTimeoutException异常。

setPublisherReturns、setPublisherConfirms :producer端的消息确认机制(confirm和return),设为true后开启相应的机制,后文详述。

官方文档描述publisherReturns设为true打开return机制,publisherComfirms设为true打开confirm机制,但测试结果(2.0.5.RELEASE版本)是,任意一个设为true,两个都会打开。

addConnectionListener、addChannelListener、setRecoveryListener :添加或设置相应的Listener,后文详述。

setConnectionCacheSize :仅在 CONNECTION 模式使用,设置Connection的缓存数量。

setConnectionLimit :仅在 CONNECTION 模式使用,设置Connection的数量上限。

上面的bean配置,除了需要注入的几个listener bean以外,其它设置的都是其默认值(2.0.5.RELEASE版本),后面的bean示例配置也是一样,部分属性不同版本的默认值可能有所不同。

一般不用配置这个bean,这里简单提一下。

这个ConnectionFactory是rabbit api中的ConnectionFactory类,这里面是连接rabbitmq节点的Connection配置。

如果想修改这些配置,可以按如下方式配置:

consumer端如果通过@RabbitListener注解的方式接收消息,不需要这个bean。

不建议直接通过ConnectionFactory获取Channel操作rabbitmq,建议通过amqpTemplate操作。

setConnectionFactory :设置spring-amqp的ConnectionFactory。

setRetryTemplate :设置重试机制,详情见后文。

setMessageConverter :设置MessageConverter,用于java对象与Message对象(实际发送和接收的消息对象)之间的相互转换,详情见后文。

setChannelTransacted :打开或关闭Channel的事务,关于amqp的事务后文描述。

setReturnCallback、setConfirmCallback :return和confirm机制的回调接口,后文详述。

setMandatory :设为true使ReturnCallback生效。

这个bean仅在consumer端通过@RabbitListener注解的方式接收消息时使用,每一个@RabbitListener注解的方法都会由这个RabbitListenerContainerFactory创建一个MessageListenerContainer,负责接收消息。

setConnectionFactory :设置spring-amqp的ConnectionFactory。

setMessageConverter :对于consumer端,MessageConverter也可以在这里配置。

setAcknowledgeMode :设置consumer端的应答模式,共有三种:NONE、AUTO、MANUAL。

NONE,无应答,这种模式下rabbitmq默认consumer能正确处理所有发出的消息,所以不管消息有没有被consumer收到,有没有正确处理都不会恢复;

AUTO,由Container自动应答,正确处理发出ack信息,处理失败发出nack信息,rabbitmq发出消息后将会等待consumer端的应答,只有收到ack确认信息才会把消息清除掉,收到nack信息的处理办法由setDefaultRequeueRejected()方法设置,所以在这种模式下,发生错误的消息是可以恢复的。

MANUAL,基本同AUTO模式,区别是需要人为调用方法给应答。

setConcurrentConsumers :设置每个MessageListenerContainer将会创建的Consumer的最小数量,默认是1个。

setMaxConcurrentConsumers :设置每个MessageListenerContainer将会创建的Consumer的最大数量,默认等于最小数量。

setPrefetchCount :设置每次请求发送给每个Consumer的消息数量。

setChannelTransacted :设置Channel的事务。

setTxSize :设置事务当中可以处理的消息数量。

setDefaultRequeueRejected :设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。

setErrorHandler :实现ErrorHandler接口设置进去,所有未catch的异常都会由ErrorHandler处理。

AmqpTamplate里面有下面几个方法可以向queue发送消息:

这里,exchange必须存在,否则消息发不出去,会看到错误日志,但不影响程序运行:

Message是org.springframework.amqp.core.Message类,spring-amqp发送和接收的都是这个Message。

从Message类源码可以看到消息内容放在byte[]里面,MessageProperties对象包含了非常多的一些其它信息,如Header、exchange、routing key等。

这种方式,需要将消息内容(String,或其它Object)转换为byte[],示例:

也可以直接调用下面几个方法,Object将会自动转为Message对象发送:

有两种方法接收消息:

1.polling consumer,轮询调用方法一次获取一条;

2.asynchronous consumer,listener异步接收消息。

polling consumer

直接通过AmqpTemplate的方法从queue获取消息,有如下方法:

如果queue里面没有消息,会立刻返回null;传入timeoutMillis参数后可阻塞等待一段时间。

如果想直接从queue获取想要的java对象,可调用下面这一组方法:

后面4个方法是带泛型的,示例如下:

使用这四个方法需要配置org.springframework.amqp.support.converter.SmartMessageConverter,这是一个接口,Jackson2JsonMessageConverter已经实现了这个接口,所以只要将Jackson2JsonMessageConverter设置到RabbitTemplate中即可。

asynchronous consumer

有多种方式可以实现,详情参考官方文档。

最简单的实现方式是@RabbitListener注解,示例:

这里接收消息的对象用的是Message,也可以是自定义的java对象,但调用Converter转换失败会报错。

注解上指定的queue必须是已经存在并且绑定到某个exchange的,否则会报错:

如果在@RabbitListener注解中指明binding信息,就能自动创建queue、exchange并建立binding关系。

direct和topic类型的exchange需要routingKey,示例:

fanout类型的exchange,示例:

2.0版本之后,可以指定多个routingKey,示例:

并且支持arguments属性,可用于headers类型的exchange,示例:

@Queue有两个参数exclusive和autoDelete顺便解释一下:

exclusive,排他队列,只对创建这个queue的Connection可见,Connection关闭queue删除;

autoDelete,没有consumer对这个queue消费时删除。

对于这两种队列,durable=true是不起作用的。

另外,如果注解申明的queue和exchange及binding关系都已经存在,但与已存在的设置不同,比如,已存在的exchange的是direct类型,这里尝试改为fanout类型,结果是不会有任何影响,不论是修改或者新增参数都不会生效。

如果queue存在,exchange存在,但没有binding,那么程序启动后会自动建立起binding关系。




卢氏县14770108695: rabbitmq与spring整合以后怎么使用topic -
锁牲马来: 下面是RabbitMQ的消息确认机制:“为了确保消息不会丢失,RabbitMQ支持消息确认机制.客户端在接受到消息并处理完后,可以发送一个ack消息给RabbitMQ,告诉它该消息可以安全的删除了.假如客户端在发送ack之前意外死掉了

卢氏县14770108695: Spring整合RabbitMQ 一直收到重复消息怎么处理 -
锁牲马来: 收到重复消息的可能情况有如下几点 发送方一条消息发送了多次.接收到消息后,没有正确通知rabbitmq消息已被消费,导致消息仍然处于队列中,所以被再次发送.

卢氏县14770108695: java 怎么通过rabbitmq 实现多channel -
锁牲马来: 展开全部1; version> org.springframework.amqp:pom文件需要加入spring集成rabbitMq的依赖,都测试通过本文代码样例都是在spring集成环境下写的.6;groupId> spring-rabbit

卢氏县14770108695: springcloud 怎么利用rabbitmq -
锁牲马来:RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然.

卢氏县14770108695: 使用spring - rabbitmq时怎样确定使用了rpc -
锁牲马来: 这里要说明事情有以下几点: 1.RabbitMQ作为消息队列中间件,就设计成进行保证消息被可靠传递,所以才会有上述“RabbitMQ会将消息投递到下一个consumer客户端”的行为.这个默认行为要想更改,要么去改 RabbitMQ 服务端代码

卢氏县14770108695: spring boot怎样向rabbitmq 中插入对象 -
锁牲马来: SpringBoot将在类路径中或从ServletContext的根目录中提供名为/static(或/public或/resources或/META-INF/resources)的目录中的静态内容.也就是说默认情况下,可以将静态文件放到static,public,resources,/META-INF/resources四个目录下....

卢氏县14770108695: spring的配置里有没有rabbitMQ的channel.basicQos属性设置 -
锁牲马来: 无线资源控制协议. RRC处理UE和UTRAN之间控制平面的第三层信息.

卢氏县14770108695: spring +springmvc +mybatis+dubbo+rabbitmq+redis什么好处 -
锁牲马来: Jeesz本身集成Dubbo服务管控、Zookeeper注册中心、Redis分布式缓存技术、FastDFS分布式文件系统、ActiveMQ异步消息中间件、Nginx负载均衡等分布式技术 使用Maven做项目管理,项目模块化,提高项目的易开发性、扩展性以Spring Framework为核心容器,Spring MVC为模型视图控制器,MyBatis为数据访问层, Apache Shiro为权限授权层,Ehcahe对常用数据进行缓存,Activit为工作流引擎等.

卢氏县14770108695: spring boot中使用rabbitmq怎么在代码里面获取消费端的重试次数? -
锁牲马来: 在使用SpringBoot中,笔者使用到了RabbitMQ,其中踩了不少地雷,经过些许的刻版终于把它调通了,笔者主要说的是从生产者生产数据并发送给消费者到后者接收并处理数据这么一个全过程,我这里的数据指的是实体对象.生产者和消费者是...

卢氏县14770108695: AMQP Spring集成的错误处理问题,怎么解决 -
锁牲马来: 异常处理 RabbitMQ Java客户端的许多操作都会抛出checked Exception.例如,在大7a686964616fe59b9ee7ad9431333363393637多数情况下,IOException可能抛出.RabbitTemplate,SimpleMessageListenerContainer和其它的一些Spring ...

本站内容来自于网友发表,不代表本站立场,仅表示其个人看法,不对其真实性、正确性、有效性作任何的担保
相关事宜请发邮件给我们
© 星空见康网