Springboot+netty实现Web聊天室
目录
- 一、项目的创建
- 二、代码编写
- 三、运行效果
一、项目的创建
新建Spring项目:
选择JDK版本:
选择Spring Web:
项目名称和位置的设置:
二、代码编写
导入.jar包:
gson: https://search.maven.org/artifact/com.google.code.gson/gson/2.8.9/jar
DemoApplication:
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.env.Environment; import java.net.InetAddress; import java.net.UnknownHostException; @SpringBootApplication public class DemoApplication { public static void main(String[] args) throws UnknownHostException { ConfigurableApplicationContext application = SpringApplication.run(DemoApplication.class, args); Environment env = application.getEnvironment(); String host = InetAddress.getLocalHost().getHostAddress(); String port = env.getProperty("server.port"); System.out.println("[----------------------------------------------------------]"); System.out.println("聊天室启动成功!点击进入:\t http://" + host + ":" + port); System.out.println("[----------------------------------------------------------"); WebSocketServer.inst().run(53134); } }
User:
package com.example.demo; import java.util.Objects; public class User { public String id; public String nickname; public User(String id, String nickname) { super(); this.id = id; this.nickname = nickname; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getNickname() { return nickname; } public void setNickname(String nickname) { this.nickname = nickname; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; User user = (User) o; return id.equals(user.getId()); } @Override public int hashCode() { return Objects.hash(id); } public String getUid() { return id; } }
SessionGroup:
package com.example.demo; import com.google.gson.Gson; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.ImmediateEventExecutor; import org.springframework.util.StringUtils; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public final class SessionGroup { private static SessionGroup singleInstance = new SessionGroup(); // 组的映射 private ConcurrentHashMap<String, ChannelGroup> groupMap = new ConcurrentHashMap<>(); public static SessionGroup inst() { return singleInstance; } public void shutdownGracefully() { Iterator<ChannelGroup> groupIterator = groupMap.values().iterator(); while (groupIterator.hasNext()) { ChannelGroup group = groupIterator.next(); group.close(); } } public void sendToOthers(Map<String, String> result, SocketSession s) { // 获取组 ChannelGroup group = groupMap.get(s.getGroup()); if (null == group) { return; } Gson gson=new Gson(); String json = gson.toJson(result); // 自己发送的消息不返回给自己 // Channel channel = s.getChannel(); // 从组中移除通道 // group.remove(channel); ChannelGroupFuture future = group.writeAndFlush(new TextWebSocketFrame(json)); future.addListener(f -> { System.out.println("完成发送:"+json); // group.add(channel);//发送消息完毕重新添加。 }); } public void addSession(SocketSession session) { String groupName = session.getGroup(); if (StringUtils.isEmpty(groupName)) { // 组为空,直接返回 return; } ChannelGroup group = groupMap.get(groupName); if (null == group) { group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); groupMap.put(groupName, group); } group.add(session.getChannel()); } /** * 关闭连接, 关闭前发送一条通知消息 */ public void closeSession(SocketSession session, String echo) { ChannelFuture sendFuture = session.getChannel().writeAndFlush(new TextWebSocketFrame(echo)); sendFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { System.out.println("关闭连接:"+echo); future.channel().close(); } }); } /** * 关闭连接 */ public void closeSession(SocketSession session) { ChannelFuture sendFuture = session.getChannel().close(); sendFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { System.out.println("发送所有完成:"+session.getUser().getNickname()); } }); } /** * 发送消息 * @param ctx 上下文 * @param msg 待发送的消息 */ public void sendMsg(ChannelHandlerContext ctx, String msg) { ChannelFuture sendFuture = ctx.writeAndFlush(new TextWebSocketFrame(msg)); sendFuture.addListener(f -> {//发送监听 System.out.println("对所有发送完成:"+msg); }); } }
SocketSession:
package com.example.demo; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.util.AttributeKey; import java.util.HashMap; import java.util.Map; import java.util.UUID; public class SocketSession { public static final AttributeKey<SocketSession> SESSION_KEY = AttributeKey.valueOf("SESSION_KEY"); /** * 用户实现服务端会话管理的核心 */ // 通道 private Channel channel; // 用户 private User user; // session唯一标示 private final String sessionId; private String group; /** * session中存储的session 变量属性值 */ private Map<String, Object> map = new HashMap<String, Object>(); public SocketSession(Channel channel) {//注意传入参数channel。不同客户端会有不同channel this.channel = channel; this.sessionId = buildNewSessionId(); channel.attr(SocketSession.SESSION_KEY).set(this); } // 反向导航 public static SocketSession getSession(ChannelHandlerContext ctx) {//注意ctx,不同的客户端会有不同ctx Channel channel = ctx.channel(); return channel.attr(SocketSession.SESSION_KEY).get(); } // 反向导航 public static SocketSession getSession(Channel channel) { return channel.attr(SocketSession.SESSION_KEY).get(); } public String getId() { return sessionId; } private static String buildNewSessionId() { String uuid = UUID.randomUUID().toString(); return uuid.replaceAll("-", ""); } public synchronized void set(String key, Object value) { map.put(key, value); } public synchronized <T> T get(String key) { return (T) map.get(key); } public boolean isValid() { return getUser() != null ? true : false; } public User getUser() { return user; } public void setUser(User user) { this.user = user; } public String getGroup() { return group; } public void setGroup(String group) { this.group = group; } public Channel getChannel() { return channel; } }
WebSocketServer:
package com.example.demo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; public class WebSocketServer { private static WebSocketServer wbss; private static final int READ_IDLE_TIME_OUT = 60; // 读超时 private static final int WRITE_IDLE_TIME_OUT = 0;// 写超时 private static final int ALL_IDLE_TIME_OUT = 0; // 所有超时 public static WebSocketServer inst() { return wbss = new WebSocketServer(); } public void run(int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Netty自己的http解码器和编码器,报文级别 HTTP请求的解码和编码 pipeline.addLast(new HttpServerCodec()); // ChunkedWriteHandler 是用于大数据的分区传输 // 主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的; // 增加之后就不用考虑这个问题了 pipeline.addLast(new ChunkedWriteHandler()); // HttpObjectAggregator 是完全的解析Http消息体请求用的 // 把多个消息转换为一个单一的完全FullHttpRequest或是FullHttpResponse, // 原因是HTTP解码器会在每个HTTP消息中生成多个消息对象HttpRequest/HttpResponse,HttpContent,LastHttpContent pipeline.addLast(new HttpObjectAggregator(64 * 1024)); // WebSocket数据压缩 pipeline.addLast(new WebSocketServerCompressionHandler()); // WebSocketServerProtocolHandler是配置websocket的监听地址/协议包长度限制 pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 10 * 1024)); // 当连接在60秒内没有接收到消息时,就会触发一个 IdleStateEvent 事件, // 此事件被 HeartbeatHandler 的 userEventTriggered 方法处理到 pipeline.addLast( new IdleStateHandler(READ_IDLE_TIME_OUT, WRITE_IDLE_TIME_OUT, ALL_IDLE_TIME_OUT, TimeUnit.SECONDS)); // WebSocketServerHandler、TextWebSocketFrameHandler 是自定义逻辑处理器, pipeline.addLast(new WebSocketTextHandler()); } }); Channel ch = b.bind(port).syncUninterruptibly().channel(); ch.closeFuture().syncUninterruptibly(); // 返回与当前Java应用程序关联的运行时对象 Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { SessionGroup.inst().shutdownGracefully(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }); } }
WebSocketTextHandler:
package com.example.demo; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import java.util.HashMap; import java.util.Map; public class WebSocketTextHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { //@Override protected void channelRead(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { SocketSession session = SocketSession.getSession(ctx); TypeToken<HashMap<String, String>> typeToken = new TypeToken<HashMap<String, String>>() { }; Gson gson=new Gson(); java.util.Map<String,String> map = gson.fromJson(msg.text(), typeToken.getType()); User user = null; switch (map.get("type")) { case "msg": Map<String, String> result = new HashMap<>(); user = session.getUser(); result.put("type", "msg"); result.put("msg", map.get("msg")); result.put("sendUser", user.getNickname()); SessionGroup.inst().sendToOthers(result, session); break; case "init": String room = map.get("room"); session.setGroup(room); String nick = map.get("nick"); user = new User(session.getId(), nick); session.setUser(user); SessionGroup.inst().addSession(session); break; } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // 是否握手成功,升级为 Websocket 协议 if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { // 握手成功,移除 HttpRequestHandler,因此将不会接收到任何消息 // 并把握手成功的 Channel 加入到 ChannelGroup 中 new SocketSession(ctx.channel()); } else if (evt instanceof IdleStateEvent) { IdleStateEvent stateEvent = (IdleStateEvent) evt; if (stateEvent.state() == IdleState.READER_IDLE) { System.out.println("bb22"); } } else { super.userEventTriggered(ctx, evt); } } @Override protected void messageReceived(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { } }
之后项目外创建一个test.html:
<!DOCTYPE HTML> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> <title>群聊天室</title> <style type="text/css"> body { margin-right:50px; margin-left:50px; } .ddois { position: fixed; left: 120px; bottom: 30px; } </style> </head> <body> 群名:<input type="text" id="room" name="group" placeholder="请输入群"> <br /><br /> 昵称:<input type="text" id="nick" name="name" placeholder="请输入昵称"> <br /><br /> <button type="button" onclick="enter()">进入聊天群</button> <br /><br /> <div id="message"></div> <br /><br /> <div class="ddois"> <textarea name="send" id="text" rows="10" cols="30" placeholder="输入发送消息"></textarea> <br /><br /> <button type="button" onclick="send()">发送</button> </div> <script type="text/javascript"> var webSocket; if (window.WebSocket) { webSocket = new WebSocket("ws://localhost:53134/ws"); } else { alert("抱歉,您的浏览器不支持WebSocket协议!"); } //连通之后的回调事件 webSocket.onopen = function() { console.log("已经连通了websocket"); // setMessageInnerHTML("已经连通了websocket"); }; //连接发生错误的回调方法 webSocket.onerror = function(event){ console.log("出错了"); // setMessageInnerHTML("连接失败"); }; //连接关闭的回调方法 webSocket.onclose = function(){ console.log("连接已关闭..."); } //接收到消息的回调方法 webSocket.onmessage = function(event){ console.log("bbdds"); var data = JSON.parse(event.data) var msg = data.msg; var nick = data.sendUser; switch(data.type){ case 'init': console.log("mmll"); break; case 'msg': console.log("bblld"); setMessageInnerHTML(nick+": "+msg); break; default: break; } } function enter(){ var map = new Map(); var nick=document.getElementById('nick').value; var room=document.getElementById('room').value; map.set("type","init"); map.set("nick",nick); console.log(room); map.set("room",room); var message = Map2Json(map); webSocket.send(message); } function send() { var msg = document.getElementById('text').value; var nick = document.getElementById('nick').value; console.log("1:"+msg); if (msg != null && msg != ""){ var map = new Map(); map.set("type","msg"); map.set("msg",msg); var map2json=Map2Json(map); if (map2json.length < 8000){ console.log("4:"+map2json); webSocket.send(map2json); }else { console.log("文本太长了,少写一点吧"); } } } //将消息显示在网页上 function setMessageInnerHTML(innerHTML) { document.getElementById("message").innerHTML += innerHTML + "<br/>"; } function Map2Json(map) { var str = "{"; map.forEach(function (value, key) { str += '"'+key+'"'+':'+ '"'+value+'",'; }) str = str.substring(0,str.length-1) str +="}"; return str; } </script> </body> </html>
先运行项目,然后运行html
三、运行效果
到此这篇关于Springboot+netty实现Web聊天室的文章就介绍到这了,更多相关Springboot netty 聊天室内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!
赞 (0)