forked from kidgrow-microservices-platform

houruijun
2020-10-29 f4d5b46cfffc27d626afc3c43e8e20a11e686a00
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package com.kidgrow.websocket.server;
 
import com.kidgrow.websocket.entity.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
 
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
 
/**
 * 石家庄喜高科技有限责任公司 版权所有 © Copyright 2020<br>
 *
 * @Description: websocket服务端<br>
 * @Project: <br>
 * @CreateDate: Created in 2020/3/1 16:05 <br>
 * @Author: <a href="4345453@kidgrow.com">liuke</a>
 */
@ServerEndpoint(value = "/socketServer/{userName}")
@Component
public class SocketServer {
 
    private static final Logger logger = LoggerFactory.getLogger(SocketServer.class);
 
    /**
     *
     * 用线程安全的CopyOnWriteArraySet来存放客户端连接的信息
     */
    private static CopyOnWriteArraySet<Client> socketServers = new CopyOnWriteArraySet<>();
 
    /**
     *
     * websocket封装的session,信息推送,就是通过它来信息推送
     */
    private Session session;
 
    /**
     *
     * 服务端的userName,因为用的是set,每个客户端的username必须不一样,否则会被覆盖。
     * 要想完成ui界面聊天的功能,服务端也需要作为客户端来接收后台推送用户发送的信息
     */
    private final static String SYS_USERNAME = "kidgrow";
 
 
    /**
     *
     * 用户连接时触发,我们将其添加到
     * 保存客户端连接信息的socketServers中
     *
     * @param session
     * @param userName
     */
    @OnOpen
    public void open(Session session,@PathParam(value="userName")String userName){
 
            this.session = session;
            socketServers.add(new Client(userName,session));
 
            logger.info("客户端:【{}】连接成功",userName);
 
    }
 
    /**
     *
     * 收到客户端发送信息时触发
     * 我们将其推送给客户端(niezhiliang9595)
     * 其实也就是服务端本身,为了达到前端聊天效果才这么做的
     *
     * @param message
     */
    @OnMessage
    public void onMessage(String message){
 
        Client client = socketServers.stream().filter( cli -> cli.getSession() == session)
                .collect(Collectors.toList()).get(0);
        sendMessage(client.getUserName()+"<--"+message,SYS_USERNAME);
 
        logger.info("客户端:【{}】发送信息:{}",client.getUserName(),message);
    }
 
    /**
     *
     * 连接关闭触发,通过sessionId来移除
     * socketServers中客户端连接信息
     */
    @OnClose
    public void onClose(){
        socketServers.forEach(client ->{
            if (client.getSession().getId().equals(session.getId())) {
 
                logger.info("客户端:【{}】断开连接",client.getUserName());
                socketServers.remove(client);
 
            }
        });
    }
 
    /**
     *
     * 发生错误时触发
     * @param error
     */
    @OnError
    public void onError(Throwable error) {
        socketServers.forEach(client ->{
            if (client.getSession().getId().equals(session.getId())) {
                socketServers.remove(client);
                logger.error("客户端:【{}】发生异常",client.getUserName());
                error.printStackTrace();
            }
        });
    }
 
    /**
     *
     * 信息发送的方法,通过客户端的userName
     * 拿到其对应的session,调用信息推送的方法
     * @param message
     * @param userName
     */
    public synchronized static void sendMessage(String message,String userName) {
 
        socketServers.forEach(client ->{
            if (userName.equals(client.getUserName())) {
                try {
                    client.getSession().getBasicRemote().sendText(message);
 
                    logger.info("服务端推送给客户端 :【{}】",client.getUserName(),message);
 
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }
 
    /**
     *
     * 获取服务端当前客户端的连接数量,
     * 因为服务端本身也作为客户端接受信息,
     * 所以连接总数还要减去服务端
     * 本身的一个连接数
     *
     * 这里运用三元运算符是因为客户端第一次在加载的时候
     * 客户端本身也没有进行连接,-1 就会出现总数为-1的情况,
     * 这里主要就是为了避免出现连接数为-1的情况
     *
     * @return
     */
    public synchronized static int getOnlineNum(){
        return socketServers.stream().filter(client -> !client.getUserName().equals(SYS_USERNAME))
                .collect(Collectors.toList()).size();
    }
 
    /**
     *
     * 获取在线用户名,前端界面需要用到
     * @return
     */
    public synchronized static List<String> getOnlineUsers(){
 
        List<String> onlineUsers = socketServers.stream()
                .filter(client -> !client.getUserName().equals(SYS_USERNAME))
                .map(client -> client.getUserName())
                .collect(Collectors.toList());
 
        return onlineUsers;
    }
 
    /**
     *
     * 信息群发,我们要排除服务端自己不接收到推送信息
     * 所以我们在发送的时候将服务端排除掉
     * @param message
     */
    public synchronized static void sendAll(String message) {
        //群发,不能发送给服务端自己
        socketServers.stream().filter(cli -> cli.getUserName() != SYS_USERNAME)
                .forEach(client -> {
            try {
                client.getSession().getBasicRemote().sendText(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
 
        logger.info("服务端推送给所有客户端 :【{}】",message);
    }
 
    /**
     *
     * 多个人发送给指定的几个用户
     * @param message
     * @param persons
     */
    public synchronized static void SendMany(String message,String [] persons) {
        for (String userName : persons) {
            sendMessage(message,userName);
        }
    }
}