package com.kidgrow.rabbitmq.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 石家庄喜高科技有限责任公司 版权所有 © Copyright 2020
*
* @Description: RabbitMQ连接基本配置
* @Project:
* @CreateDate: Created in 2020/3/23 17:14
* @Author: liuke
*/
@ConfigurationProperties(prefix = "spring.rabbitmq")
@Slf4j
@Data
@Configuration
public class RabbitConfig {
/**
* RabbitMq服务器IP
*/
private String host;
/**
* 端口
*/
private int port;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* Virtual Hosts权限管理
*/
private String virtualHost;
/**
* 交换机名称
*/
private String exchangeName;
/**
* 队列名称
*/
private String queueName;
/**
* routingKeyName名称
*/
private String routingKeyName;
/**
* AI返回结果队列
*/
private String AIEvaluationResults;
/**
* 骨龄评价统计
*/
private String BoneAgeEvaluationData;
/**
* 档案统计
*/
private String ChildRecordData;
/**
* 报告统计
*/
private String ReportData;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
//当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。
connectionFactory.setChannelCacheSize(10);
//2、CHANNEL模式,程序运行期间ConnectionFactory会维护着一个Connection,
// 所有的操作都会使用这个Connection,但一个Connection中可以有多个Channel,
// 操作rabbitmq之前都必须先获取到一个Channel,
// 否则就会阻塞(可以通过setChannelCheckoutTimeout()设置等待时间),
// 这些Channel会被缓存(缓存的数量可以通过setChannelCacheSize()设置);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION); //设置CONNECTION模式,可创建多个Connection连接
//单位:毫秒;配合channelCacheSize不仅是缓存数量,而且是最大的数量。
// 从缓存获取不到可用的Channel时,不会创建新的Channel,会等待这个值设置的毫秒数
//同时,在CONNECTION模式,这个值也会影响获取Connection的等待时间,
// 超时获取不到Connection也会抛出AmqpTimeoutException异常。
connectionFactory.setChannelCheckoutTimeout(600);
//仅在CONNECTION模式使用,设置Connection的缓存数量。
connectionFactory.setConnectionCacheSize(3);
//setConnectionLimit:仅在CONNECTION模式使用,设置Connection的数量上限。
connectionFactory.setConnectionLimit(10);
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setExchange(exchangeName);
//客户端开启confirm模式
template.setMandatory(true);
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
}
});
template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
}
});
return template;
}
@Bean
public TopicExchange exchange() {
return new TopicExchange(exchangeName);
}
@Bean
public Queue KidgrowQueue() {
return new Queue(queueName);
}
@Bean
Binding bindingExchangeMessage(TopicExchange exchange) {
return BindingBuilder.bind(KidgrowQueue()).to(exchange).with(routingKeyName);
}
@Bean
public Queue AIEvaluation() {
return new Queue("AIEvaluation");
}
@Bean
public Queue AIEvaluationResults() {
return new Queue(AIEvaluationResults);
}
@Bean
public Queue BoneAgeEvaluationData() {
return new Queue(BoneAgeEvaluationData);
}
@Bean
public Queue ChildRecordData() {
return new Queue(ChildRecordData);
}
@Bean
public Queue ReportData() {
return new Queue(ReportData);
}
@Bean
Binding bindingExchangeAIEvaluation(TopicExchange exchange) {
return BindingBuilder.bind(AIEvaluation()).to(exchange).with(routingKeyName);
}
@Bean
public Queue AdvancedEvaluation() {
return new Queue("AdvancedEvaluation");
}
@Bean
Binding bindingExchangeAdvancedEvaluation(TopicExchange exchange) {
return BindingBuilder.bind(AdvancedEvaluation()).to(exchange).with(routingKeyName);
}
}