forked from kidgrow-microservices-platform

zhaoxiaohao
2020-09-18 daa697719eb0ddfd170f1ab94c5422a4f5b93951
kidgrow-commons/kidgrow-rabbitmq-spring-boot-starter/src/main/java/com/kidgrow/rabbitmq/config/RabbitConfig.java
New file
@@ -0,0 +1,195 @@
package com.kidgrow.rabbitmq.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 石家庄喜高科技有限责任公司 版权所有 © Copyright 2020<br>
 *
 * @Description: RabbitMQ连接基本配置<br>
 * @Project: <br>
 * @CreateDate: Created in 2020/3/23 17:14 <br>
 * @Author: <a href="4345453@kidgrow.com">liuke</a>
 */
@ConfigurationProperties(prefix = "spring.rabbitmq")
@Slf4j
@Data
@Configuration
public class RabbitConfig {
    /**
     * RabbitMq服务器IP
     */
    private String host;
    /**
     * 端口
     */
    private int port;
    /**
     * 用户名
     */
    private String username;
    /**
     * 密码
     */
    private String password;
    /**
     * Virtual Hosts权限管理
     */
    private String virtualHost;
    /**
     * 交换机名称
     */
    private String exchangeName;
    /**
     * 队列名称
     */
    private String queueName;
    /**
     * routingKeyName名称
     */
    private String routingKeyName;
    /**
     * AI返回结果队列
     */
    private String AIEvaluationResults;
    /**
     * 骨龄评价统计
     */
    private String BoneAgeEvaluationData;
    /**
     * 档案统计
     */
    private String ChildRecordData;
    /**
     * 报告统计
     */
    private String ReportData;
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        //当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。
        connectionFactory.setChannelCacheSize(10);
        //2、CHANNEL模式,程序运行期间ConnectionFactory会维护着一个Connection,
        // 所有的操作都会使用这个Connection,但一个Connection中可以有多个Channel,
        // 操作rabbitmq之前都必须先获取到一个Channel,
        // 否则就会阻塞(可以通过setChannelCheckoutTimeout()设置等待时间),
        // 这些Channel会被缓存(缓存的数量可以通过setChannelCacheSize()设置);
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);   //设置CONNECTION模式,可创建多个Connection连接
        //单位:毫秒;配合channelCacheSize不仅是缓存数量,而且是最大的数量。
        // 从缓存获取不到可用的Channel时,不会创建新的Channel,会等待这个值设置的毫秒数
        //同时,在CONNECTION模式,这个值也会影响获取Connection的等待时间,
        // 超时获取不到Connection也会抛出AmqpTimeoutException异常。
        connectionFactory.setChannelCheckoutTimeout(600);
        //仅在CONNECTION模式使用,设置Connection的缓存数量。
        connectionFactory.setConnectionCacheSize(3);
        //setConnectionLimit:仅在CONNECTION模式使用,设置Connection的数量上限。
        connectionFactory.setConnectionLimit(10);
        return connectionFactory;
    }
    @Bean
    public RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
    @Bean
    public RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setExchange(exchangeName);
        //客户端开启confirm模式
        template.setMandatory(true);
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
            }
        });
        template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });
        return template;
    }
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchangeName);
    }
    @Bean
    public Queue KidgrowQueue() {
        return new Queue(queueName);
    }
    @Bean
    Binding bindingExchangeMessage(TopicExchange exchange) {
        return BindingBuilder.bind(KidgrowQueue()).to(exchange).with(routingKeyName);
    }
    @Bean
    public Queue AIEvaluation() {
        return new Queue("AIEvaluation");
    }
    @Bean
    public Queue AIEvaluationResults() {
        return new Queue(AIEvaluationResults);
    }
    @Bean
    public Queue BoneAgeEvaluationData() {
        return new Queue(BoneAgeEvaluationData);
    }
    @Bean
    public Queue ChildRecordData() {
        return new Queue(ChildRecordData);
    }
    @Bean
    public Queue ReportData() {
        return new Queue(ReportData);
    }
    @Bean
    Binding bindingExchangeAIEvaluation(TopicExchange exchange) {
        return BindingBuilder.bind(AIEvaluation()).to(exchange).with(routingKeyName);
    }
    @Bean
    public Queue AdvancedEvaluation() {
        return new Queue("AdvancedEvaluation");
    }
    @Bean
    Binding bindingExchangeAdvancedEvaluation(TopicExchange exchange) {
        return BindingBuilder.bind(AdvancedEvaluation()).to(exchange).with(routingKeyName);
    }
}