⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 Java知音 「啊杰」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

摘要:通过两篇原理篇的学习,我相信大家对于如何实现一个聊天系统已经有了一个大概的思路了。接下来我们就通过实战,来巩固一下这些知识点,并实现一个在线聊天系统吧。


一、前提回顾

基于 Netty 实现在线聊天系统(原理篇一)

基于 Netty 实现在线聊天系统(原理篇二)

二、目录介绍

  • 功能梳理
  • 具体实现

三、需求梳理

通过前面两章内容的学习,我们基本学会了如何使用 Netty 建立一个长连接,接下来我们就在这个基础上,实现一个单机版的 im 系统。

主要功能,我梳理了一下:

  • 登录
  • 维持连接、心跳检测
  • 聊天消息
  • 消息ack

使用到的相关组件:

  • SpringBoot-job
  • GuavaCache

四、具体实现

本期的内容是基于,原理篇一的 dome 代码基础上进行的,没有看过的原理一的小伙伴,建议先回顾一下原理篇一。

原理篇一的代码结构:

  • Server(主程序)

    • ServerHandler(业务处理程序)

实战篇一的代码结构:

代码的层级结构如上所示,接下来,我们将会一个个模块对逻辑进行讲解。

1、登录

1)实现逻辑

不管是长连接还是短连接,鉴权这个动作都是要有的,我相信这个功能模块,大家是很好理解的。我这里就不在过多的赘述了,具体实现步骤如下所示:

1、前后端建立 ws 连接

2、前端发送登录类型的报文,如下所示:

{
"token": "2",
"type": "10"
}

token:这里的 token,就是用户登录标识,大家可以根据自己所依赖的业务系统,进行修改。

type:这里表示消息报文的类型,本文所有类型定义如下所示:

  • USER_LOGIN(10, "用户上线")
  • USER_LOGIN_RESP(11, "用户上线响应")
  • HEARTBEAT_TIMEOUT(30, "心跳超时")
  • PING(40, "心跳")
  • PONG(41, "心跳响应")
  • CHAT(80, "聊天"),
  • CHAT_RESP(81, "聊天响应")
  • ACK(90, "确认")
  • ACK_RESP(91, "确认响应")
  • UNKNOWN(0, "未知类型")

示例代码如下图所示,WsMsgDispatcher.dispatch

消息类型

3、后端对 token 进行校验,校验成功就记录用户登录信息。

示例代码如下图所示,UserLoginProcessor.login

登录业务逻辑

2)具体效果

主要的业务代码我们已经讲解完毕了,接下来我们来看看效果:

用户登录

从上图,我们可以看到,我们登录的两个用户都成功了,并且返回了对应的用户信息。

2、维持连接、心跳检测

这个模块的功能,其实我们在原理篇二的时候已经讲过了具体的实现方案了,这里也不再过多的赘述了,我们直接来看具体实现方法吧。

1)维持连接

1、前端每10秒发送一次心跳消息,报文如下所示:

{
"type": "40",
"fromId": "2"
}

注:前端发送的每个消息,理论上都是需要带上用户表示的,后端都是需要进行鉴权操作的。我们这里为了方便讲解(偷懒,bushi)将这部分逻辑进行了简化,大家在具体实现的时候,记得一定要加上 鉴权逻辑

2、后端检测,用户是否还在线,如果在线,则刷新用户的最新在线时间,并回复 PONG 消息。

示例代码如下图所示,HeartBeatProcessor.process

维持连接1

维持连接2

2)心跳检测

这里主要是基于 IdleStateEvent 事件实现的。

TextWebSocketFrameHandler 继承 SimpleChannelInboundHandler 类,并实现 userEventTriggered 方法,具体代码如下所示:

心跳检测

这里详细说一下,三种事件的区别:

  • **readerIdleTimeSeconds:**读超时。即当在指定的时间间隔内没有从 Channel 读取到数据时,会触发一个 READER_IDLEIdleStateEvent 事件。
  • writerIdleTimeSeconds: 写超时。即当在指定的时间间隔内没有数据写入到 Channel 时,会触发一个 WRITER_IDLEIdleStateEvent 事件。
  • allIdleTimeSeconds: 读/写超时。即当在指定的时间间隔内没有读且没有写操作时,会触发一个 ALL_IDLEIdleStateEvent 事件。

所以,我们这里检测 ALL_IDLE 事件即可。

3)具体效果

维持连接效果如下所示:

维持连接效果

心跳检测效果如下所示:

心跳超时效果1

心跳超时效果2

3、聊天消息

聊天消息模块主要分为两部分:

  • 消息接收:客户端推送消息到服务端
  • 消息推送:服务端将消息推送到指定的客户端

这边主要的难点在于,服务端将消息推送到指定的客户端,具体场景有2种情况:

  • 消息的发送者和消息的接受者,在同一台服务器上建立的 ws 连接,这种情况,就很好处理,直接在服务器上找到建立的 ws 连接,然后将消息推送给对应的客户端。
  • 消息的发送者和消息的接受者,在不同的服务器上建立的 ws 连接,这种情况就比较复杂,实现方案也很多,比较简单的实现方式就是,发送一条广播消息,让对应的服务器,将消息推送到指定的客户端。

本文由于是 单机版 的 im,所以只会有第一种情况发生,第二种情况就留给大家自由发挥了。

1)消息接收

具体步骤如下所示:

1、客户端发送类型为80的报文,如下所示:

{
"type": "80",
"fromId": "1",
"toId": "2",
"content": {
"contentType": 1,
"body": "测试消息"
}
}

2、服务端(ChatProcessor)对消息进行处理,具体代码如下所示:

消息接收

2)消息推送

具体步骤如下所示:

1、获取消息接受者所连接的服务器 ip 地址 2、判断当前服务器 ip 地址是否和上面的 ip 地址相同,如果相同则推送消息,否则转发给目标服务器

具体代码如下所示:

消息推送

3)具体效果

1、我们先登录两个用户,分别是张三、李四,如下图所示:

聊天登录

2、张三发送消息给李四,如下图所示:

张三发送消息给李四

3、李四发送消息给张三,如下图所示:

李四发送消息给张三

4、消息 ack

因为网络环境异常或者其他异常状况的发送,可能会出现消息推送失败的情况,这时候就需要 消息 ack 机制和重试,来保证我们的消息可以推送成功。

1)消息 ack 机制

具体步骤如下:

1、客户端收到 80 类型的消息,解析并发送 ack 报文,如下所示:

{
"type": "90",
"msgId": "2bfea133-72a8-4315-82aa-80049fe4fb7b"
}

2、服务端收到 ack 消息,变更消息状态(AckProcessor),具体代码如下图所示:

消息ack

2)消息重试

这里因为是单机版 im,所以直接采用 SpringBoot-Job 实现,Job 代码如下所示:

消息重试

总结

好了我们实战篇一,到这里结束了,希望大家都能跟着文中的思路,具体去实现一遍,如果遇到什么问题或者文中有什么错误的地方,欢迎大家留言。谢谢观看,点个赞再走吧。

文章目录
  1. 1. 一、前提回顾
  2. 2. 二、目录介绍
  3. 3. 三、需求梳理
  4. 4. 四、具体实现
    1. 4.1. 1、登录
      1. 4.1.1. 1)实现逻辑
      2. 4.1.2. 2)具体效果
    2. 4.2. 2、维持连接、心跳检测
      1. 4.2.1. 1)维持连接
      2. 4.2.2. 2)心跳检测
      3. 4.2.3. 3)具体效果
    3. 4.3. 3、聊天消息
      1. 4.3.1. 1)消息接收
      2. 4.3.2. 2)消息推送
      3. 4.3.3. 3)具体效果
    4. 4.4. 4、消息 ack
      1. 4.4.1. 1)消息 ack 机制
      2. 4.4.2. 2)消息重试
  5. 5. 总结