forked from kidgrow-microservices-platform

zhaoxiaohao
2021-01-26 f7c5db77d404397bf9c35ab1ddc7e03639d131a3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package com.kidgrow.rabbitmq.config;
 
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
/**
 * 石家庄喜高科技有限责任公司 版权所有 © Copyright 2020<br>
 *
 * @Description: RabbitMQ连接基本配置<br>
 * @Project: <br>
 * @CreateDate: Created in 2020/3/23 17:14 <br>
 * @Author: <a href="4345453@kidgrow.com">liuke</a>
 */
@ConfigurationProperties(prefix = "spring.rabbitmq")
@Slf4j
@Data
@Configuration
public class RabbitConfig {
 
    /**
     * RabbitMq服务器IP
     */
    private String host;
    /**
     * 端口
     */
    private int port;
    /**
     * 用户名
     */
    private String username;
    /**
     * 密码
     */
    private String password;
    /**
     * Virtual Hosts权限管理
     */
    private String virtualHost;
    /**
     * 交换机名称
     */
    private String exchangeName;
    /**
     * 队列名称
     */
    private String queueName;
    /**
     * routingKeyName名称
     */
    private String routingKeyName;
 
    /**
     * AI返回结果队列
     */
    private String AIEvaluationResults;
 
    /**
     * 骨龄评价统计
     */
    private String BoneAgeEvaluationData;
 
    /**
     * 档案统计
     */
    private String ChildRecordData;
 
    /**
     * 报告统计
     */
    private String ReportData;
 
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        //当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。
        connectionFactory.setChannelCacheSize(10);
        //2、CHANNEL模式,程序运行期间ConnectionFactory会维护着一个Connection,
        // 所有的操作都会使用这个Connection,但一个Connection中可以有多个Channel,
        // 操作rabbitmq之前都必须先获取到一个Channel,
        // 否则就会阻塞(可以通过setChannelCheckoutTimeout()设置等待时间),
        // 这些Channel会被缓存(缓存的数量可以通过setChannelCacheSize()设置);
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);   //设置CONNECTION模式,可创建多个Connection连接
        //单位:毫秒;配合channelCacheSize不仅是缓存数量,而且是最大的数量。
        // 从缓存获取不到可用的Channel时,不会创建新的Channel,会等待这个值设置的毫秒数
        //同时,在CONNECTION模式,这个值也会影响获取Connection的等待时间,
        // 超时获取不到Connection也会抛出AmqpTimeoutException异常。
        connectionFactory.setChannelCheckoutTimeout(600);
        //仅在CONNECTION模式使用,设置Connection的缓存数量。
        connectionFactory.setConnectionCacheSize(3);
        //setConnectionLimit:仅在CONNECTION模式使用,设置Connection的数量上限。
        connectionFactory.setConnectionLimit(10);
        return connectionFactory;
    }
 
 
    @Bean
    public RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
 
 
    @Bean
    public RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setExchange(exchangeName);
        //客户端开启confirm模式
        template.setMandatory(true);
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
            }
        });
        template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });
        return template;
    }
 
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchangeName);
    }
 
    @Bean
    public Queue KidgrowQueue() {
        return new Queue(queueName);
    }
 
    @Bean
    Binding bindingExchangeMessage(TopicExchange exchange) {
        return BindingBuilder.bind(KidgrowQueue()).to(exchange).with(routingKeyName);
    }
 
    @Bean
    public Queue AIEvaluation() {
        return new Queue("AIEvaluation");
    }
 
    @Bean
    public Queue AIEvaluationResults() {
        return new Queue(AIEvaluationResults);
    }
 
    @Bean
    public Queue BoneAgeEvaluationData() {
        return new Queue(BoneAgeEvaluationData);
    }
 
    @Bean
    public Queue ChildRecordData() {
        return new Queue(ChildRecordData);
    }
 
 
    @Bean
    public Queue ReportData() {
        return new Queue(ReportData);
    }
 
 
    @Bean
    Binding bindingExchangeAIEvaluation(TopicExchange exchange) {
        return BindingBuilder.bind(AIEvaluation()).to(exchange).with(routingKeyName);
    }
 
    @Bean
    public Queue AdvancedEvaluation() {
        return new Queue("AdvancedEvaluation");
    }
 
    @Bean
    Binding bindingExchangeAdvancedEvaluation(TopicExchange exchange) {
        return BindingBuilder.bind(AdvancedEvaluation()).to(exchange).with(routingKeyName);
    }
}