⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 blog.csdn.net/wzljiayou/article/details/110506164 「wzljiayou」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

WebSocket简介

WebSocket是协议重制,为用户提供完整的网络版本设计和服务器控制系统的桌面版本,可以提供各种不同的解决方案,因此,它们可以在任何时候提供不同的解决方案,因此,它们可以消息回执

WebSocket特点:

  • HTML5与客户端与消息通信,基于的文本或服务协议,实现
  • 适合于性要求的场合,如通信、直播、会议现场,特别适合在会议现场举行的会议,比较适合于客户端的实时会议、多人活动、协作等平台
  • 采用新的协议,必须单独实现
  • 客户端所有自己浏览器都支持

WebSocket 沟通交流

在从标准的HTTP协议HTTPS切换到WebSocket时,使用一种称为可交换的机制,因此,使用WebSocket的应用程序HTTP/S升级或者将始终作为启动,然后执行升级这个操作发生的具体时刻特定于应用程序;它可能会发生启动时,可能会发生在请求之后还有某个 URL

下面是WebSocket请求和响应的标识信息:

客户端的请求:

  • Connection属性中标识Upgrade,表示客户端希望连接升级
  • Upgrade属性中标识为Websocket,表示希望升级成Websocket协议
  • Sec-WebSocket-Key属性,表示随时字符串,服务器端会用这些数据来构造出一个 SHA-1 的信息摘要。把“ Sec-WebSocket-Key”然后添加一个字符串“ 258EAFA5-E914-47DA-95CA-C5AB0DC85B11”,计算 SHA-1 的摘要,之后进行 BASE-64 编码,将结果执行为“ Sec-WebSocket-Accept”头的值,返回给客户端。如此操作,可以避免误认为普通的 HTTP 请求被 Websocket 协议。
  • Sec-WebSocket-Version,表示的Websocket版本,RFC6455要求使用的版本支持是 13,应该放弃使用之前的版本的版本

服务器端响应:

  • Upgrade属性中标识为websocket
  • Connection告诉客户端升级是Websocket协议
  • Sec-WebSocket-Accept经过这个服务器确认,并且加密之后的Sec-WebSocket-Key

Netty为WebSocket数据帧提供了支持

由IETF的发布WebSocket RFC,定义了6种帧,Netty为调试器提供了一个POJO

实战

首先,定义WebSocket服务端,其中创建了一个NettyChannelGroup变量记录所有已经连接的客户端通道,而这个就是提供ChannelGroup完成使用群发和单聊功能的

//定义websocket服务端
public class WebSocketServer {

private static EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private static EventLoopGroup workerGroup = new NioEventLoopGroup();
private static ServerBootstrap bootstrap = new ServerBootstrap();

private static final int PORT =8761;

//创建 DefaultChannelGroup,用来保存所有已经连接的 WebSocket Channel,群发和一对一功能可以用上
private final static ChannelGroup channelGroup =
new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);

public static void startServer(){
try {
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketServerInitializer(channelGroup));
Channel ch = bootstrap.bind(PORT).sync().channel();
System.out.println("打开浏览器访问: http://127.0.0.1:" + PORT + '/');
ch.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
startServer();
}
}

初始化管道,用于Pipeline中注册所有HTTP请求的ChannelHandler,主要包括:处理请求向解码端的使用HttpServerCodec、自定义的处理HTTP请求的HttpRequestHandler处理WebSocket帧数据以及升级可笑的以及WebSocketServerProtocolHandler自定义的处理TextWebSocketFrame招招和完成事件的WebSocketServerHanlder

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel>{

/*websocket访问路径*/
private static final String WEBSOCKET_PATH = "/ws";

private ChannelGroup channelGroup;

public WebSocketServerInitializer(ChannelGroup channelGroup){
this.channelGroup=channelGroup;
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
//用于HTTP请求的编解码
ch.pipeline().addLast(new HttpServerCodec());
//用于写入一个文件的内容
ch.pipeline().addLast(new ChunkedWriteHandler());
//用于http请求的聚合
ch.pipeline().addLast(new HttpObjectAggregator(64*1024));
//用于WebSocket应答数据压缩传输
ch.pipeline().addLast(new WebSocketServerCompressionHandler());
//处理http请求,对非websocket请求的处理
ch.pipeline().addLast(new HttpRequestHandler(WEBSOCKET_PATH));
//根据websocket规范,处理升级握手以及各种websocket数据帧
ch.pipeline().addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, "", true));
//对websocket的数据进行处理,主要处理TextWebSocketFrame数据帧和握手完成事件
ch.pipeline().addLast(new WebSocketServerHanlder(channelGroup));
}
}
HttpRequestHandler`最初是用来先确认当前的HTTP请求是否指向了的请求,如果通过调用方法将它转向给下一个HTTP请求URI的请求,并通过调用方法将它转向给下一个HTTP请求`WebSocket`URI,是因为是因为调用方法完成之后,会进行资源释放)`HttpRequestHandler``FullHttpRequest``fireChannelRead(msg)``ChannelInboundHandler``channelRead0

读取内容,读取内容上路的index.html文件,将输出内容打包成ByteBuf对象,然后,构造一个FullHttpResponse响应对象,将ByteBuf添加进去,设置请求头信息。最后写入,调用writeAndFlush方法冲刷所有的消息。

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest>{

private static final File INDEX = new File("D:/学习/index.html");

private String websocketUrl;

public HttpRequestHandler(String websocketUrl)
{
this.websocketUrl = websocketUrl;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
if(websocketUrl.equalsIgnoreCase(msg.getUri())){
//如果该HTTP请求指向了websocketUrl的URL,那么直接交给下一个ChannelInboundHandler进行处理
ctx.fireChannelRead(msg.retain());
}else{
//生成index页面的具体内容,并送往浏览器
ByteBuf content = loadIndexHtml();
FullHttpResponse res = new DefaultFullHttpResponse(
HTTP_1_1, OK, content);

res.headers().set(HttpHeaderNames.CONTENT_TYPE,
"text/html; charset=UTF-8");
HttpUtil.setContentLength(res, content.readableBytes());
sendHttpResponse(ctx, msg, res);
}
}

public static ByteBuf loadIndexHtml(){
FileInputStream fis = null;
InputStreamReader isr = null;
BufferedReader raf = null;
StringBuffer content = new StringBuffer();
try {
fis = new FileInputStream(INDEX);
isr = new InputStreamReader(fis);
raf = new BufferedReader(isr);
String s = null;
// 读取文件内容,并将其打印
while((s = raf.readLine()) != null) {
content.append(s);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
fis.close();
isr.close();
raf.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return Unpooled.copiedBuffer(content.toString().getBytes());
}
/*发送应答*/
private static void sendHttpResponse(ChannelHandlerContext ctx,
FullHttpRequest req,
FullHttpResponse res) {
// 错误的请求进行处理 (code<>200).
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpUtil.setContentLength(res, res.content().readableBytes());
}

// 发送应答.
ChannelFuture f = ctx.channel().writeAndFlush(res);
//对于不是长连接或者错误的请求直接关闭连接
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}

HttpRequestHandler只是进程管理使用HTTP请求和优先响应进行的,而实际对传输的数据WebSocket的处理是交由进行的WebSocketServerHanlder(其中只对TextWebSocketFrame类型的数据帧处理)。

WebSocketServerHanlder处理时通过各种userEventTriggered方法,并监听成功WebSocket消息的事件,当新客户端的成功写入之后,通过渠道ChannelGroup中的所有渠道来通知所有已经连接的客户端,然后是这个新的加入该ChannelGroup,并且还为每个频道中的每个频道生成了一个用户

之后,如果进行接收到了TextWebSocketFrame消息的时候,先根据当前频道获取的用户群,并解析发送的文本帧信息,确认是通过聊单聊,最后,构造TextWebSocketFrame响应writeAndFlush

/**
* 对websocket的文本数据帧进行处理
*
*/
public class WebSocketServerHanlder extends SimpleChannelInboundHandler<TextWebSocketFrame>{


private ChannelGroup channelGroup;

public WebSocketServerHanlder(ChannelGroup channelGroup){
this.channelGroup=channelGroup;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//获取当前channel用户名
String userName=UserMap.getUser(ctx.channel().id().asLongText());
//文本帧
String content= msg.text();
System.out.println("Client: "+ userName+" received [ "+content+" ]");
String toName = null;
//判断是单聊还是群发(单聊会通过 user@ msg 这种格式进行传输文本帧)
if(content.contains("@")){
String[] str= content.split("@");
content=str[1];
//获取单聊的用户
toName = str[0];
}
if(null!=toName){
Iterator<Channel> it=channelGroup.iterator();
while(it.hasNext()){
Channel channel=it.next();
//找到指定的用户
if(UserMap.getUser(channel.id().asLongText()).equals(toName)){
//单聊
channel.writeAndFlush(new TextWebSocketFrame(userName+"@"+content));
}
}
}else{
channelGroup.remove(ctx.channel());
//群发实现
channelGroup.writeAndFlush(new TextWebSocketFrame(userName+"@"+content));
channelGroup.add(ctx.channel());
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//检测事件,如果是握手成功事件,做点业务处理
if(evt==WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){
String channelId = ctx.channel().id().asLongText();
//随机为当前channel指定一个用户名
UserMap.setUser(channelId);
System.out.println("新的客户端连接:"+UserMap.getUser(channelId));
//通知所有已经连接的 WebSocket 客户端新的客户端已经连接上了
channelGroup.writeAndFlush(new TextWebSocketFrame(UserMap.getUser(channelId)+"加入群聊"));
//将新的 WebSocket Channel 添加到 ChannelGroup 中
channelGroup.add(ctx.channel());
}else{
super.userEventTriggered(ctx, evt);
}
}
}

index.html内容

<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>基于WebSocket实现网页版群聊</title>
</head>
<body>
<script type="text/javascript">
var userName= null;
var socket;
var myDate = new Date();
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://127.0.0.1:8761/ws");
socket.onmessage = function(event) {
var info = document.getElementById("jp-container");
var dataObj=event.data;
if(dataObj.indexOf("@")!=-1){
var arr = dataObj.split('@');
var sendUser;
var acceptMsg;
for(var i=0;i<arr.length;i++){
if(i==0){
sendUser = arr[i];
}else{
acceptMsg =arr[i];
}
}
if(userName==sendUser){
return;
}
var talk= document.createElement("div");
talk.setAttribute("class", "talk_recordboxme");
talk.innerHTML = sendUser+':';
var recordtext= document.createElement("div");
recordtext.setAttribute("class", "talk_recordtextbg");
talk.appendChild(recordtext);
var talk_recordtext=document.createElement("div");
talk_recordtext.setAttribute("class", " talk_recordtext");
var h3=document.createElement("h3");
h3.innerHTML =acceptMsg;
talk_recordtext.appendChild(h3);
var span=document.createElement("span");
span.innerHTML =myDate.toLocaleTimeString();
span.setAttribute("class", "talk_time");
talk_recordtext.appendChild(span);
talk.appendChild(talk_recordtext);
}else{
var talk= document.createElement("div");
talk.style.textAlign="center";
var font = document.createElement("font");
font.color='#212121';
font.innerHTML = dataObj+': '+myDate.toLocaleString( );
talk.appendChild(font);
}
info.appendChild(talk);
};
socket.onopen = function(event) {
console.log("Socket 已打开");
};
socket.onclose = function(event) {
console.log("Socket已关闭");
};
} else {
alert("Your browser does not support Web Socket.");
}
function send(message) {
if (!window.WebSocket) { return; }
if (socket.readyState == WebSocket.OPEN) {
var info = document.getElementById("jp-container");

var talk= document.createElement("div");
talk.setAttribute("class", "talk_recordbox");

var user = document.createElement("div");
user.setAttribute("class", "user");
talk.appendChild(user);
var recordtext= document.createElement("div");

recordtext.setAttribute("class", "talk_recordtextbg");
talk.appendChild(recordtext);

var talk_recordtext=document.createElement("div");
talk_recordtext.setAttribute("class", " talk_recordtext");

var h3=document.createElement("h3");
h3.innerHTML =message;
talk_recordtext.appendChild(h3);
var span=document.createElement("span");
span.innerHTML =myDate.toLocaleTimeString();
span.setAttribute("class", "talk_time");
talk_recordtext.appendChild(span);
talk.appendChild(talk_recordtext);
info.appendChild(talk );
socket.send(message);
} else {
alert("The socket is not open.");
}
}
</script>

<br>
<br>
<div class="talk">
<div class="talk_title"><span>群聊</span></div>
<div class="talk_record" style="background: #EEEEF4;">
<div id="jp-container" class="jp-container">
</div>

</div>
<form onsubmit="return false;">
<div class="talk_word">
&nbsp;
<input class="add_face" id="facial" type="button" title="添加表情" value="" />
<input class="messages emotion" autocomplete="off" name="message" value="在这里输入文字" onFocus="if(this.value=='在这里输入文字'){this.value='';}" onblur="if(this.value==''){this.value='在这里输入文字';}" />
<input class="talk_send" type="button" title="发送" value="发送" onclick="send(this.form.message.value)" />
</div>
</form>
</div>

样式

body{
font-family:verdana, Arial, Helvetica, "宋体", sans-serif;
font-size: 12px;
}

body ,div ,dl ,dt ,dd ,ol ,li ,h1 ,h2 ,h3 ,h4 ,h5 ,h6 ,pre ,form ,fieldset ,input ,P ,blockquote ,th ,td ,img,
INS {
margin: 0px;
padding: 0px;
border:0;
}
ol{
list-style-type: none;
}
img,input{
border:none;
}

a{
color:#198DD0;
text-decoration:none;
}
a:hover{
color:#ba2636;
text-decoration:underline;
}
a{blr:expression(this.onFocus=this.blur())}/*去掉a标签的虚线框,避免出现奇怪的选中区域*/
:focus{outline:0;}


.talk{
height: 480px;
width: 335px;
margin:0 auto;
border-left-width: 1px;
border-left-style: solid;
border-left-color: #444;
}
.talk_title{
width: 100%;
height:40px;
line-height:40px;
text-indent: 12px;
font-size: 16px;
font-weight: bold;
color: #afafaf;
background:#212121;
border-bottom-width: 1px;
border-bottom-style: solid;
border-bottom-color: #434343;
font-family: "微软雅黑";
}
.talk_title span{float:left}
.talk_title_c {
width: 100%;
height:30px;
line-height:30px;
}
.talk_record{
width: 100%;
height:398px;
overflow: hidden;
border-bottom-width: 1px;
border-bottom-style: solid;
border-bottom-color: #434343;
margin: 0px;
}
.talk_word {
line-height: 40px;
height: 40px;
width: 100%;
background:#212121;
}
.messages {
height: 24px;
width: 240px;
text-indent:5px;
overflow: hidden;
font-size: 12px;
line-height: 24px;
color: #666;
background-color: #ccc;
border-radius: 3px;
-moz-border-radius: 3px;
-webkit-border-radius: 3px;
}
.messages:hover{background-color: #fff;}
.talk_send{
width:50px;
height:24px;
line-height: 24px;
font-size:12px;
border:0px;
margin-left: 2px;
color: #fff;
background-repeat: no-repeat;
background-position: 0px 0px;
background-color: transparent;
font-family: "微软雅黑";
}
.talk_send:hover {
background-position: 0px -24px;
}
.talk_record ul{ padding-left:5px;}
.talk_record li {
line-height: 25px;
}
.talk_word .controlbtn a{
margin: 12px;
}
.talk .talk_word .order {
float:left;
display: block;
height: 14px;
width: 16px;
background-repeat: no-repeat;
background-position: 0px 0px;
}

.talk .talk_word .loop {
float:left;
display: block;
height: 14px;
width: 16px;
background-repeat: no-repeat;
background-position: -30px 0px;
}
.talk .talk_word .single {
float:left;
display: block;
height: 14px;
width: 16px;
background-repeat: no-repeat;
background-position: -60px 0px;
}
.talk .talk_word .order:hover,.talk .talk_word .active{
background-position: 0px -20px;
text-decoration: none;
}
.talk .talk_word .loop:hover{
background-position: -30px -20px;
text-decoration: none;
}
.talk .talk_word .single:hover{
background-position: -60px -20px;
text-decoration: none;
}


/*讨论区*/
.jp-container .talk_recordbox{
min-height:80px;
color: #afafaf;
padding-top: 5px;
padding-right: 10px;
padding-left: 10px;
padding-bottom: 0px;
}

.jp-container .talk_recordbox:first-child{border-top:none;}
.jp-container .talk_recordbox:last-child{border-bottom:none;}
.jp-container .talk_recordbox .talk_recordtextbg{
float:left;
width:10px;
height:30px;
display:block;
background-repeat: no-repeat;
background-position: left top;}
.jp-container .talk_recordbox .talk_recordtext{
-moz-border-radius:5px;
-webkit-border-radius:5px;
border-radius:5px;
background-color:#b8d45c;
width:240px;
height:auto;
display:block;
padding: 5px;
float:left;
color:#333333;
}
.jp-container .talk_recordbox h3{
font-size:14px;
padding:2px 0 5px 0;
text-transform:uppercase;
font-weight: 100;

}
.jp-container .talk_recordbox .user {
float:left;
display:inline;
height: 45px;
width: 45px;
margin-top: 0px;
margin-right: 5px;
margin-bottom: 0px;
margin-left: 0px;
font-size: 12px;
line-height: 20px;
text-align: center;
}
/*自己发言样式*/
.jp-container .talk_recordboxme{
display:block;
min-height:80px;
color: #afafaf;
padding-top: 5px;
padding-right: 10px;
padding-left: 10px;
padding-bottom: 0px;
}
.jp-container .talk_recordboxme .talk_recordtextbg{
float:right;
width:10px;
height:30px;
display:block;
background-repeat: no-repeat;
background-position: left top;}

.jp-container .talk_recordboxme .talk_recordtext{
-moz-border-radius:5px;
-webkit-border-radius:5px;
border-radius:5px;
background-color:#fcfcfc;
width:240px;
height:auto;
padding: 5px;
color:#666;
font-size:12px;
float:right;

}
.jp-container .talk_recordboxme h3{
font-size:14px;
padding:2px 0 5px 0;
text-transform:uppercase;
font-weight: 100;
color:#333333;

}
.jp-container .talk_recordboxme .user{
float:right;
height: 45px;
width: 45px;
margin-top: 0px;
margin-right: 10px;
margin-bottom: 0px;
margin-left: 5px;
font-size: 12px;
line-height: 20px;
text-align: center;
display:inline;
}
.talk_time{
color: #666;
text-align: right;
width: 240px;
display: block;
}

测试

首先,开始三个窗口

群聊

单聊

总结

本文,基于NettyWebSocket的一个协议实现的对话室服务器,从代码网页实战上还是可以的,基于Netty的WebSocket实现非常简单、容易实现。

但是WebSocketWebSocket总之,可以在广泛的范围内,扩大我们的视野,在一些具体的工作场景中,解决一些问题

文章目录
  1. 1. WebSocket简介
    1. 1.0.1. WebSocket特点:
  • 2. WebSocket 沟通交流
    1. 2.0.1. 客户端的请求:
    2. 2.0.2. 服务器端响应:
  • 3. Netty为WebSocket数据帧提供了支持
  • 4. 实战
  • 5. 测试
  • 6. 总结