package com.kidgrow.rabbitmq.config; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 石家庄喜高科技有限责任公司 版权所有 © Copyright 2020
* * @Description: RabbitMQ连接基本配置
* @Project:
* @CreateDate: Created in 2020/3/23 17:14
* @Author: liuke */ @ConfigurationProperties(prefix = "spring.rabbitmq") @Slf4j @Data @Configuration public class RabbitConfig { /** * RabbitMq服务器IP */ private String host; /** * 端口 */ private int port; /** * 用户名 */ private String username; /** * 密码 */ private String password; /** * Virtual Hosts权限管理 */ private String virtualHost; /** * 交换机名称 */ private String exchangeName; /** * 队列名称 */ private String queueName; /** * routingKeyName名称 */ private String routingKeyName; /** * AI返回结果队列 */ private String AIEvaluationResults; /** * 骨龄评价统计 */ private String BoneAgeEvaluationData; /** * 档案统计 */ private String ChildRecordData; /** * 报告统计 */ private String ReportData; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); //当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。 connectionFactory.setChannelCacheSize(10); //2、CHANNEL模式,程序运行期间ConnectionFactory会维护着一个Connection, // 所有的操作都会使用这个Connection,但一个Connection中可以有多个Channel, // 操作rabbitmq之前都必须先获取到一个Channel, // 否则就会阻塞(可以通过setChannelCheckoutTimeout()设置等待时间), // 这些Channel会被缓存(缓存的数量可以通过setChannelCacheSize()设置); connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION); //设置CONNECTION模式,可创建多个Connection连接 //单位:毫秒;配合channelCacheSize不仅是缓存数量,而且是最大的数量。 // 从缓存获取不到可用的Channel时,不会创建新的Channel,会等待这个值设置的毫秒数 //同时,在CONNECTION模式,这个值也会影响获取Connection的等待时间, // 超时获取不到Connection也会抛出AmqpTimeoutException异常。 connectionFactory.setChannelCheckoutTimeout(600); //仅在CONNECTION模式使用,设置Connection的缓存数量。 connectionFactory.setConnectionCacheSize(3); //setConnectionLimit:仅在CONNECTION模式使用,设置Connection的数量上限。 connectionFactory.setConnectionLimit(10); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } @Bean public RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setExchange(exchangeName); //客户端开启confirm模式 template.setMandatory(true); template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause); } }); template.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); } }); return template; } @Bean public TopicExchange exchange() { return new TopicExchange(exchangeName); } @Bean public Queue KidgrowQueue() { return new Queue(queueName); } @Bean Binding bindingExchangeMessage(TopicExchange exchange) { return BindingBuilder.bind(KidgrowQueue()).to(exchange).with(routingKeyName); } @Bean public Queue AIEvaluation() { return new Queue("AIEvaluation"); } @Bean public Queue AIEvaluationResults() { return new Queue(AIEvaluationResults); } @Bean public Queue BoneAgeEvaluationData() { return new Queue(BoneAgeEvaluationData); } @Bean public Queue ChildRecordData() { return new Queue(ChildRecordData); } @Bean public Queue ReportData() { return new Queue(ReportData); } @Bean Binding bindingExchangeAIEvaluation(TopicExchange exchange) { return BindingBuilder.bind(AIEvaluation()).to(exchange).with(routingKeyName); } @Bean public Queue AdvancedEvaluation() { return new Queue("AdvancedEvaluation"); } @Bean Binding bindingExchangeAdvancedEvaluation(TopicExchange exchange) { return BindingBuilder.bind(AdvancedEvaluation()).to(exchange).with(routingKeyName); } }