I want to learn to learn WebSocket recently to do a real-time communication training program
The main technology stack used is WebSocket Netty Vue Pinia MySQL SpringBoot, which implements a persistent data, a single group chat, and supports multi-user chat interface.
The following is the implementation process
rear end
SpringBoot will occupy a port when it starts, and Netty will also occupy a port. These two ports cannot be repeated. Because Netty will block the current thread after starting, it needs to open another thread to prevent blocking SpringBoot.
1. Write a Netty server
Personally, I think the most important thing about Netty is channel, which can represent a client.
I am using the @PostConstruct annotation here. After the bean is initialized, I call the method inside and open a new thread to run Netty. Because I hope Netty is managed by Spring, I add the spring annotation. I can also directly inject Netty into the startup class and start manually
@Service public class NettyService { private EventLoopGroup bossGroup = new NioEventLoopGroup(1); private EventLoopGroup workGroup = new NioEventLoopGroup(); @Autowired private WebSocketHandler webSocketHandler; @Autowired private HeartBeatHandler heartBeatHandler; @PostConstruct public void initNetty() throws BaseException { new Thread(()->{ try { start(); } catch (Exception e) { throw new RuntimeException(e); } }).start(); } @PreDestroy public void destroy() throws BaseException { (); (); } @Async public void start() throws BaseException { try { ChannelFuture channelFuture = new ServerBootstrap() .group(bossGroup, workGroup) .channel() .handler(new LoggingHandler()) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { () // http decoder encoder .addLast(new HttpServerCodec()) // Handle complete HTTP messages .addLast(new HttpObjectAggregator(64 * 1024)) // Heartbeat detection duration .addLast(new IdleStateHandler(300, 0, 0, )) // Heartbeat detection processor .addLast(heartBeatHandler) // Support ws protocol (customized) .addLast(new WebSocketServerProtocolHandler("/ws",null,true,64*1024,true,true,10000)) // ws request processor (custom) .addLast(webSocketHandler) ; } }).bind(8081).sync(); ("Netty starts successfully"); ChannelFuture future = ().closeFuture().sync(); } catch (InterruptedException e){ throw new InterruptedException (); } finally { //Close elegantly (); (); } } }
The server class only specifies some basic information, including processor class, supported protocols, etc. The specific processing logic needs to be implemented by custom classes.
2. Heartbeat detection processor
Heartbeat detection means that the server cannot actively determine the status of the client (the user may have closed the web page, but the server cannot know). In order to determine whether the client is online, the client needs to send a message regularly. The content of the message is not important. What is important is that sending the message means that the client is still online. When the client has not sent data for a long time, it means that the client has already been offline.
package .payroll_management.; @Component @ public class HeartBeatHandler extends ChannelDuplexHandler { @Autowired private ChannelContext channelContext; private static final Logger logger = (); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ // Heartbeat detection timeout IdleStateEvent e = (IdleStateEvent) evt; ("Heartbeat detection timeout"); if (() == IdleState.READER_IDLE){ Attribute<Integer> attr = ().attr((().id().toString())); Integer userId = (); // Read timeout, it has been offline, and it is actively disconnected (userId); (); } else if (() == IdleState.WRITER_IDLE){ ("Heartbeat Detection"); } } (ctx, evt); } }
3. webSocket processor
When the client sends a message, the content of the message will be sent. When the webSocket processor, the corresponding method can be processed. I was lazy here and made a group. All users can only chat in the same group. However, creating multiple groups, or single-to-single chat is not complicated. You only need to save the group ID.
The first problem arises here, that is, the interceptor of SpringMVC will not intercept requests from other ports. The solution is to place the token in the request parameter and perform a token check again in the userEventTriggered method.
The second problem is that I save the user ID through ThreadLocal in the interceptor. I can't get the user ID elsewhere without leaving the interceptor. The solution is to resave it in the userEventTriggered method, or the attachment can be saved (the data it carries on itself) in the channel and save the id directly to the attachment.
The third problem is the persistence of messages. When the user reopens the interface, he definitely hopes that the message will still exist. Given the real-time nature of webSocket, data persistence will definitely not be completed in the same thread. I use BlockingQueue + thread pool here to complete the asynchronous saving of messages, or it can also be implemented using mq.
However, using (); may cause OOM problems. You can customize a thread pool later. When the task is full, specify the rejection strategy to throw an exception, and then obtain the corresponding data through global exception capture to save it to the database. However, small projects like me should not have such problems.
The fourth question is the message content. This requires the front and back end to unify it. It will be OK after determining the transmission format, and then extract the data from JSON for processing.
Finally, there is online user statistics. There is nothing to say about this. There is a corresponding method. When exiting, just kick the channel out.
package .payroll_management.; @Component @ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Autowired private ChannelContext channelContext; @Autowired private MessageMapper messageMapper; @Autowired private UserService userService; private static final Logger logger = (); private static final BlockingQueue<WebSocketMessageDto> blockingQueue = new ArrayBlockingQueue(1024 * 1024); private static final ExecutorService EXECUTOR_SERVICE = (); // Submit thread @PostConstruct private void init(){ EXECUTOR_SERVICE.submit(new MessageHandler()); } private class MessageHandler implements Runnable{ // Asynchronously save @Override public void run() { while(true){ WebSocketMessageDto message = null; try { message = (); ("Message persistence"); } catch (InterruptedException e) { throw new RuntimeException(e); } Integer success = (message); if (success < 1){ try { throw new BaseException("Save information failed"); } catch (BaseException e) { throw new RuntimeException(e); } } } } } // When the read event occurs (there is a client sending a message) @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { Channel channel = (); // Message received String text = (); Attribute<Integer> attr = ().attr((().id().toString())); Integer userId = (); ("Received userIDfor {} News: {}",userId,text); // TODO converts text to JSON and extracts the data inside WebSocketMessageDto webSocketMessage = (text, ); if (().equals("Heartbeat Detection")){ ("{}Send heartbeat detection",userId); } else if (().equals("Mass")){ ChannelGroup channelGroup = (null); WebSocketMessageDto messageDto = (text, ); WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto(); ("Mass"); (()); ("all"); ((userId)); (("yyyy-MM-dd")); (webSocketMessageDto); (new TextWebSocketFrame((webSocketMessageDto))); } else{ ("Please send the correct format"); } } // Trigger after establishing a connection (there is a client request to establish a connection) @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ("Create a Connection"); (ctx); } // Trigger after the connection is disconnected (there is a request for the client to close the connection) @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Attribute<Integer> attr = ().attr((().id().toString())); Integer userId = (); ("userID:{} Disconnect",userId); ChannelGroup channelGroup = (null); (()); (userId); WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto(); ("User Change"); List<OnLineUserVo> onlineUser = (); ((onlineUser)); ("all"); ("0"); (("yyyy-MM-dd")); (new TextWebSocketFrame((webSocketMessageDto))); (ctx); } // Trigger after establishing a connection (the client completes the connection) @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof ){ handshakeComplete = () evt; String uri = (); ("uri: {}",uri); String token = getToken(uri); if (token == null){ ("Token verification failed"); (); throw new BaseException("Token verification failed"); } ("token: {}",token); Integer userId = null; try{ Claims claims = (token); userId = ((String) ("userId")); }catch (Exception e){ ("Token verification failed"); (); throw new BaseException("Token verification failed"); } // Add user ID to attachments in channel (userId,()); (userId,()); (null,()); ChannelGroup channelGroup = (null); WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto(); ("User Change"); List<OnLineUserVo> onlineUser = (); ((onlineUser)); ("all"); ("0"); (("yyyy-MM-dd")); (new TextWebSocketFrame((webSocketMessageDto))); } (ctx, evt); } private String getToken(String uri){ if (()){ return null; } if(!("token")){ return null; } String[] split = ("\\?"); if (!=2){ return null; } String[] split1 = split[1].split("="); if (!=2){ return null; } return split1[1]; } }
4. Tools
Mainly used to save user information
Don't ask me why there is static and ordinary methods. I just don't bother to change it. Here I save the same group directly. If multiple groups are needed, you need to create SQL data.
package .payroll_management.websocket; @Component public class ChannelContext { private static final Map<Integer, Channel> USER_CHANNEL_MAP = new ConcurrentHashMap<>(); private static final Map<Integer, ChannelGroup> USER_CHANNELGROUP_MAP = new ConcurrentHashMap<>(); private static final Integer GROUP_ID = 10086; private static final Logger logger = (); public void addContext(Integer userId,Channel channel){ String channelId = ().toString(); AttributeKey attributeKey = null; if ((channelId)){ attributeKey = (channelId); } else{ attributeKey = (channelId); } (attributeKey).set(userId); } public static List<Integer> getAllUserId(){ return new ArrayList<>(USER_CHANNEL_MAP.keySet()); } public static void setChannel(Integer userId,Channel channel){ USER_CHANNEL_MAP.put(userId,channel); } public static Channel getChannel(Integer userId){ return USER_CHANNEL_MAP.get(userId); } public static void removeChannel(Integer userId){ USER_CHANNEL_MAP.remove(userId); } public static void setChannelGroup(Integer groupId,Channel channel){ if(groupId == null){ groupId = GROUP_ID; } ChannelGroup channelGroup = USER_CHANNELGROUP_MAP.get(groupId); if (channelGroup == null){ channelGroup =new DefaultChannelGroup(); USER_CHANNELGROUP_MAP.put(GROUP_ID, channelGroup); } if (channel == null){ return ; } (channel); ("TowardsgroupAdded inchannel,ChannelGroupAlreadyChannelquantity:{}",()); } public static ChannelGroup getChannelGroup(Integer groupId){ if (groupId == null){ groupId = GROUP_ID; } return USER_CHANNELGROUP_MAP.get(groupId); } public static void removeChannelGroup(Integer groupId){ if (groupId == null){ groupId = GROUP_ID; } USER_CHANNELGROUP_MAP.remove(groupId); } }
Having written this, the Netty service is built, and you can wait for the front-end request to be established later.
front end
The front-end I use vue, because I want to automatically establish a WS connection when the user logs in, so I added a WS establishment request after logging in successfully. Then I found that if the user closes the web page and reopens it, because the login interface is skipped, the WS request will not be automatically established, so a set of global WS requests are needed.
But my front-end is not very good (in fact, the back-end is also average), so there must be better writing methods in many places.
1. pinia
Use pinia to save ws requests, making it easy to call in other components
Define WebSocket instance (ws) and a request to establish judgment (wsConnect)
You can receive service messages through WS later
import { defineStore } from 'pinia' export const useWebSocketStore = defineStore('webSocket', { state() { return { ws: null, wsConnect: false, } }, actions: { wsInit() { if ( === null) { const token = ("token") if (token === null) return; = new WebSocket(`ws://localhost:8081/ws?token=${token}`) = () => { = true; ("WS protocol was established successfully") // Send heartbeat const intervalId = setInterval(() => { if (!) { clearInterval(intervalId) } const webSocketMessageDto = { type: "Heartbeat Detection" } ((webSocketMessageDto)); }, 1000 * 3 * 60); } = () => { = null; = false; } } }, sendMessage(message) { if (message == null || message == '') { return; } if (!) { ("WS protocol not established") (); } (message); }, wsClose() { if () { (); = false; } } } })
Then, loop to establish the connection (retry the establishment request)
const wsConnect = function () { const token = ("token") if (token === null) { return; } try { if (!) { ("Try to create a ws request") (); } else { return; } } catch { wsConnect(); } }
2. Chat component
I believe everyone will draw the interface, mainly talk about the problems I encountered
The first one is to pull-up refresh, which is the function of loading history. I use the element-plus UI, and I don’t know if it is my problem. The infinite scrolling in the UI is either to send requests repeatedly or send requests infinitely, and it seems that there is no function of pull-up loading. So I used IntersectionObserver to solve the problem, add a div at the bottom of the page, and when I observe this div, the request is triggered.
Second, when the scroll bar reaches the top, the data is requested and the data is placed. The scroll bar will automatically scroll to the top, and since the observed elements are always at the top, it causes unlimited requests. This is actually not a big problem, because the chat messages are limited. After there is no data, I set to stop observing, mainly because the user experience is not very good. This is what I added to display: flex; flex-direction: column-reverse; to solve this problem (flex is amazing). The general principle seems to be vertically flipped (for example, above, I put the observation element at the first child element of the div, and after adding flex, the observation element will reach the last child element position), that is, when the scroll bar is at the bottom, after adding data, the scroll bar will automatically scroll to the bottom, but this experience is very good
Don't ask me why I have to add data || Ask it just means I'm too lazy to unify the data
<style lang="scss" scoped> .chatBox { border-radius: 20px; box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px; width: 1200px; height: 600px; background-color: white; display: flex; .chat { width: 1000px; height: inherit; .chatBackground { height: 500px; overflow: auto; display: flex; flex-direction: column-reverse; .loading { text-align: center; font-size: 12px; margin-top: 20px; color: gray; } .chatItem { width: 100%; padding-bottom: 20px; .avatar { margin-left: 20px; display: flex; align-items: center; .username { margin-left: 10px; color: rgb(153, 153, 153); font-size: 13px; } } .chatItemMessage { margin-left: 60px; padding: 10px; font-size: 14px; width: 200px; word-break: break-all; max-width: 400px; line-height: 25px; width: fit-content; border-radius: 10px; height: auto; /* background-color: skyblue; */ box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px; } .sendDate { font-size: 12px; margin-top: 10px; margin-left: 60px; color: rgb(187, 187, 187); } } } .chatBottom { height: 100px; background-color: #F3F3F3; border-radius: 20px; display: flex; box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px; .messageInput { border-radius: 20px; width: 400px; height: 40px; } } } .userList { width: 200px; height: inherit; border-radius: 20px; box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px; .user { width: inherit; height: 50px; line-height: 50px; text-indent: 2em; border-radius: 20px; transition: all 0.5s ease; } } } .user:hover { box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px; transform: translateX(-5px) translateY(-5px); } </style> <template> {{hasMessage}} <div class="chatBox"> <div class="chat"> <div class="chatBackground" ref="chatBackgroundRef"> <div class="chatItem" v-for="i in messageList"> <div class="avatar"> <el-avatar :size="40" :src="imageUrl" /> <div class="username">{{ || }}</div> </div> <div class="chatItemMessage"> {{ || }} </div> <div class="sendDate"> {{ || }} </div> </div> <div class="loading" ref="loading"> Show more content </div> </div> <div class="chatBottom"> <el-input class="messageInput" v-model="message" placeholder="Message Content"></el-input> <el-button @click="sendMessage">Send a message</el-button> </div> </div> <!-- Make infinite scrolling --> <div class="userList"> <div v-for="user in userList"> <div class="user"> {{}} </div> </div> </div> </div> </template> <script setup> import { ref, onMounted, nextTick } from 'vue' import request from '@/utils/' import { useWebSocketStore } from '@/stores/useWebSocketStore' import imageUrl from '@/assets/default avatar.jpg' const webSocketStore = useWebSocketStore(); const chatBackgroundRef = ref(null) const userList = ref([]) const message = ref('') const messageList = ref([ ]) const loading = ref(null) const page = ref(1); const size = 10; const hasMessage = ref(true); const observer = new IntersectionObserver((entries, observer) => { (async entry => { if () { () await pageQueryMessage(); } }) }) onMounted(() => { () getOnlineUserList(); if (!) { (); } const ws = ; = async (e) => { // (e); const webSocketMessage = (); const messageObj = { username: , text: , date: , type: } ("###") // (()) if ( === "Mass") { (messageObj) } else if ( === "User Change") { = () } await nextTick(); // When sending a new message, it will automatically scroll to the bottom of the page and can be replaced with the style of the message prompt. // = ; (webSocketMessage) } }) const pageQueryMessage = function () { request({ url: '/api/message/pageQueryMessage', method: 'post', data: { page: , size: size } }).then((res) => { (res) if ( === 0) { = false; } else { () = + 1; (...) } }) } function getOnlineUserList() { request({ url: '/api/user/getOnlineUser', method: 'get' }).then((res) => { (res) = ; }) } const sendMessage = function () { if (!) { (); } const webSocketMessageDto = { type: "Mass", text: } ((webSocketMessageDto)); } </script>
This realizes a simple persistence of chat data and supports online chat interface. In general, WebSocket is still very convenient to use.
This is the end of this article about SpringBoot+Netty+Vue+WebSocket online chat. For more related SpringBoot Netty Vue WebSocket online chat content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!