package com.kidgrow.rabbitmq.config;
|
|
import lombok.Data;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.amqp.core.*;
|
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
|
/**
|
* 石家庄喜高科技有限责任公司 版权所有 © Copyright 2020<br>
|
*
|
* @Description: RabbitMQ连接基本配置<br>
|
* @Project: <br>
|
* @CreateDate: Created in 2020/3/23 17:14 <br>
|
* @Author: <a href="4345453@kidgrow.com">liuke</a>
|
*/
|
@ConfigurationProperties(prefix = "spring.rabbitmq")
|
@Slf4j
|
@Data
|
@Configuration
|
public class RabbitConfig {
|
|
/**
|
* RabbitMq服务器IP
|
*/
|
private String host;
|
/**
|
* 端口
|
*/
|
private int port;
|
/**
|
* 用户名
|
*/
|
private String username;
|
/**
|
* 密码
|
*/
|
private String password;
|
/**
|
* Virtual Hosts权限管理
|
*/
|
private String virtualHost;
|
/**
|
* 交换机名称
|
*/
|
private String exchangeName;
|
/**
|
* 队列名称
|
*/
|
private String queueName;
|
/**
|
* routingKeyName名称
|
*/
|
private String routingKeyName;
|
|
/**
|
* AI返回结果队列
|
*/
|
private String AIEvaluationResults;
|
|
@Bean
|
public ConnectionFactory connectionFactory() {
|
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
|
connectionFactory.setUsername(username);
|
connectionFactory.setPassword(password);
|
connectionFactory.setVirtualHost(virtualHost);
|
connectionFactory.setPublisherConfirms(true);
|
connectionFactory.setPublisherReturns(true);
|
//当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。
|
connectionFactory.setChannelCacheSize(10);
|
//2、CHANNEL模式,程序运行期间ConnectionFactory会维护着一个Connection,
|
// 所有的操作都会使用这个Connection,但一个Connection中可以有多个Channel,
|
// 操作rabbitmq之前都必须先获取到一个Channel,
|
// 否则就会阻塞(可以通过setChannelCheckoutTimeout()设置等待时间),
|
// 这些Channel会被缓存(缓存的数量可以通过setChannelCacheSize()设置);
|
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION); //设置CONNECTION模式,可创建多个Connection连接
|
//单位:毫秒;配合channelCacheSize不仅是缓存数量,而且是最大的数量。
|
// 从缓存获取不到可用的Channel时,不会创建新的Channel,会等待这个值设置的毫秒数
|
//同时,在CONNECTION模式,这个值也会影响获取Connection的等待时间,
|
// 超时获取不到Connection也会抛出AmqpTimeoutException异常。
|
connectionFactory.setChannelCheckoutTimeout(600);
|
//仅在CONNECTION模式使用,设置Connection的缓存数量。
|
connectionFactory.setConnectionCacheSize(3);
|
//setConnectionLimit:仅在CONNECTION模式使用,设置Connection的数量上限。
|
connectionFactory.setConnectionLimit(10);
|
return connectionFactory;
|
}
|
|
|
@Bean
|
public RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
|
return new RabbitAdmin(connectionFactory);
|
}
|
|
|
@Bean
|
public RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
|
RabbitTemplate template = new RabbitTemplate(connectionFactory);
|
template.setExchange(exchangeName);
|
//客户端开启confirm模式
|
template.setMandatory(true);
|
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
|
@Override
|
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
|
log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
|
}
|
});
|
template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
|
@Override
|
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
|
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
|
}
|
});
|
return template;
|
}
|
|
@Bean
|
public TopicExchange exchange() {
|
return new TopicExchange(exchangeName);
|
}
|
|
@Bean
|
public Queue KidgrowQueue() {
|
return new Queue(queueName);
|
}
|
|
@Bean
|
Binding bindingExchangeMessage(TopicExchange exchange) {
|
return BindingBuilder.bind(KidgrowQueue()).to(exchange).with(routingKeyName);
|
}
|
|
@Bean
|
public Queue AIEvaluation() {
|
return new Queue("AIEvaluation");
|
}
|
|
@Bean
|
public Queue AIEvaluationResults() {
|
return new Queue(AIEvaluationResults);
|
}
|
|
@Bean
|
Binding bindingExchangeAIEvaluation(TopicExchange exchange) {
|
return BindingBuilder.bind(AIEvaluation()).to(exchange).with(routingKeyName);
|
}
|
|
@Bean
|
public Queue AdvancedEvaluation() {
|
return new Queue("AdvancedEvaluation");
|
}
|
|
@Bean
|
Binding bindingExchangeAdvancedEvaluation(TopicExchange exchange) {
|
return BindingBuilder.bind(AdvancedEvaluation()).to(exchange).with(routingKeyName);
|
}
|
}
|