关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。
原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
本文配套源码的开源托管地址是:
关于Netty是什么,这里简单介绍下:
Netty是一个Java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty是一个基于NIO的客户、服务器端编程框架,使用Netty可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的Socket服务开发。
Netty源码和API在线查阅地址:
协议,这玩意儿相信大家肯定不陌生了,简单回顾一下协议的概念:网络协议是指一种通信双方都必须遵守的约定,两个不同的端,按照一定的格式对数据进行“编码”,同时按照相同的规则进行“解码”,从而实现两者之间的数据传输与通信。
当自己想要打造一款IM通信程序时,对于消息的封装、拆分也同样需要设计一个协议,通信的两端都必须遵守该协议工作,这也是实现通信程序的前提。
但为什么需要通信协议呢?
因为TCP/IP中是基于流的方式传输消息,消息与消息之间没有边界,而协议的目的则在于约定消息的样式、边界等。
不知大家是否还记得之前我聊到的RESP客户端协议,这是Redis提供的一种客户端通信协议。如果想要操作Redis,就必须遵守该协议的格式发送数据。
这个协议特别简单,如下:
这样描述有些令人难懂,那就直接看个案例,例如一条简单set命令。
如下:
客户端命令:setnameZhuZi转变为RESP指令:*3$3set$4name$5ZhuZi
按照Redis的规定,但凡满足RESP协议的客户端,都可以直接连接并操作Redis服务端,这也就意味着咱们可以直接通过Netty来手写一个Redis客户端。
代码如下:
在上述这个案例中,也仅仅只是通过respCommand()这个方法,对用户输入的指令进行了转换。同时在上面通过Netty,与Redis的地址、端口建立了连接。在连接建立成功后,就会向Redis发送一条转换成RESP指令的set命令。接着等待Redis的响应结果并输出,如下:
+OK
因为这是一条写指令,所以当Redis收到执行完成后,最终就会返回一个OK,大家也可直接去Redis中查询,也依旧能够查询到刚刚写入的name这个键值。
前面咱们自己针对于Redis的RESP协议,对用户指令进行了封装,然后发往Redis执行。
但对于这些常用的协议,Netty早已提供好了现成的处理器,想要使用时无需从头开发,可以直接使用现成的处理器来实现。
比如现在咱们可以基于Netty提供的处理器,实现一个简单的HTTP服务器。
其类继承关系如下:
publicfinalclassHttpServerCodecextendsCombinedChannelDuplexHandler
这也就意味着HttpServerCodec即可以对客户端的数据做解码,也可以对服务端响应的数据做编码。
同时除开添加了这个处理器外,在第二个处理器中打印了一下客户端的消息类型,最后一个处理器中,对客户端的请求做出了响应,其实也就是返回了一句话而已。
客户端的请求路径:/index.html
此时来看结果,客户端的请求会被解析成两个部分:
但按理来说浏览器发出的请求,属于GET类型的请求,GET请求是没有请求体信息的,但Netty依旧会解析成两部分~,只不过GET请求的第二部分是空的。
在第三个处理器中,咱们直接向客户端返回了一个h1标签,同时也要记得在响应头里面,加上响应内容的长度信息,否则浏览器的加载圈,会一直不同的转动,毕竟浏览器也不知道内容有多长,就会一直反复加载,尝试等待更多的数据。
很多基于Netty开发的中间件/组件,其内部基本上都开发了专属的通信协议,以此来作为不同节点间通信的基础,所以解下来咱们基于Netty也来自己设计一款通信协议,这也会作为后续实现聊天程序时的基础。
所谓的协议设计,其实仅仅只需要按照一定约束,实现编码器与解码器即可,发送方在发出数据之前,会经过编码器对数据进行处理,而接收方在收到数据之前,则会由解码器对数据进行处理。
在自定义传输协议时,咱们必然需要考虑几个因素,如下:
在设计协议时,一个完整的协议应该涵盖上述所说的几方面,这样才能提供双方通信时的基础。
从而给后续处理器使用(自定义的协议规则本身就是一个编解码处理器而已)。
前面简单聊到过,所谓的自定义协议就是自己规定消息格式,以及自己实现编/解码器对消息实现封装/拆解,所以这里想要自定义一个消息协议,就只需要满足前面两个条件即可。
因此实现如下:
其中主要实现了两个方法:
上述自定义的协议,也就是一定规则的字节数据,每条消息数据的组成如下:
最后来观察一下,大家会发现,在咱们定义的这个协议编解码处理器上,存在着一个@ChannelHandler.Sharable注解,这个注解的作用是干吗的呢?其实很简单,用来标识当前处理器是否可在多线程环境下使用,如果带有该注解的处理器,则表示可以在多个通道间共用,因此只需要创建一个即可,反之同理,如果不带有该注解的处理器,则每个通道需要单独创建使用。
因此在咱们设计IM系统之处,那也需要对应的用户功能实现。但这里为了简单,同样不再结合数据库实现完整的用户模块了,而是基于内存实现用户的管理。
publicinterfaceUserService{booleanlogin(Stringusername,Stringpassword);}
publicclassUserServiceMemoryImplimplementsUserService{privateMap
//在代码块中对用户列表进行初始化,向其中添加了两个用户信息
allUserMap.put("ZhuZi","123");allUserMap.put("XiongMao","123");}@Overridepublicbooleanlogin(Stringusername,Stringpassword){Stringpass=allUserMap.get(username);if(pass==null){returnfalse;}returnpass.equals(password);}}
这个实现类并未结合数据库来实现,而是仅仅在程序启动时,通过代码块的方式,加载了ZhuZi、XiongMao两个用户信息并放入内存的Map容器中,这里有兴趣的小伙伴,可自行将Map容器换成数据库的表即可。
服务端的基础搭建如下:
publicclassChatServer{publicstaticvoidmain(String[]args){NioEventLoopGroupboss=newNioEventLoopGroup();NioEventLoopGroupworker=newNioEventLoopGroup();ChatMessageCodecMESSAGE_CODEC=newChatMessageCodec();try{ServerBootstrapserverBootstrap=newServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss,worker);serverBootstrap.childHandler(newChannelInitializer
服务端的代码目前很简单,仅仅只是装载了一个自己的协议编/解码处理器,然后就是一些老步骤,不再过多的重复赘述,接着再来搭建一个简单的客户端。
代码实现如下:
publicclassChatClient{publicstaticvoidmain(String[]args){NioEventLoopGroupgroup=newNioEventLoopGroup();ChatMessageCodecMESSAGE_CODEC=newChatMessageCodec();try{Bootstrapbootstrap=newBootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(newChannelInitializer
在这个消息父类中,定义了多种消息类型的状态码,不同的消息类型对应不同数字,同时其中还设计了一个抽象方法,即getMessageType(),该方法交给具体的子类实现,每个子类返回各自的消息类型,为了方便后续拓展,这里又创建了一个抽象类作为中间类。
publicclassLoginResponseMessageextendsAbstractResponseMessage{publicLoginResponseMessage(booleansuccess,Stringreason){super(success,reason);}@OverridepublicintgetMessageType(){returnLoginResponseMessage;}}
首先在客户端中,再通过pipeline添加一个处理器,如下:
当然,为了该处理器能够成功生效,这里需要将其装载到服务端的pipeline上。
LoginRequestMessageHandlerLOGIN_HANDLER=newLoginRequestMessageHandler();ch.pipeline().addLast(LOGIN_HANDLER);
首先我定义了一个会话接口,如下:
publicinterfaceSession{voidbind(Channelchannel,Stringusername);voidunbind(Channelchannel);ChannelgetChannel(Stringusername);}
这个接口中依旧只有三个方法,释义如下:
该接口的实现类如下:
publicclassSessionMemoryImplimplementsSession{privatefinalMap
该实现类最关键的是其中的两个Map容器,usernameChannelMap用来存储所有用户名与Socket通道的绑定关系,而channelUsernameMap则是反过来的顺序,这主要是为了方便,即可以通过用户名获得对应通道,也可以通过通道判断出用户名,实际上一个Map也能搞定,但还是那句话,主要为了简单嘛~
publicclassChatRequestMessageextendsMessage{privateStringcontent;privateStringto;privateStringfrom;publicChatRequestMessage(){}publicChatRequestMessage(Stringfrom,Stringto,Stringcontent){this.from=from;this.to=to;this.content=content;}//省略Get/Setting、toString()方法.....}
上述这个类,是提供给客户端用来发送消息数据的,其中主要包含了三个值,聊天的消息内容、发送人与接收人。因为这里是需要实现一个IM聊天程序,所以并不是客户端与服务端进行数据交互,而是客户端与客户端之间进行数据交互,服务端仅仅只提供消息转发的功能,接着再构建一个消息类。
这个类是提供给服务端用来转发的,当服务端收到一个聊天消息后,因为聊天消息中包含了接收人,所以可以先根据接收人的用户名,找到对应的客户端通道,然后再封装成一个响应消息,转发给对应的客户端即可,下面来做具体实现。
while(true){System.out.println("==================================");System.out.println("\t1、发送单聊消息");System.out.println("\t2、发送群聊消息");System.out.println("\t3、创建一个群聊");System.out.println("\t4、获取群聊成员");System.out.println("\t5、加入一个群聊");System.out.println("\t6、退出一个群聊");System.out.println("\t7、退出聊天系统");System.out.println("==================================");Stringcommand=scanner.nextLine();}
首先会开启一个死循环,然后不断接收用户的操作,接着使用switch语法来对具体的菜单功能进行实现,先实现单聊功能。
switch(command){case"1":System.out.print("请选择你要发送消息给谁:");StringtoUserName=scanner.nextLine();System.out.print("请输入你要发送的消息内容:");Stringcontent=scanner.nextLine();ctx.writeAndFlush(newChatRequestMessage(username,toUserName,content));break;}
等用户选择了聊天目标,并且输入了消息内容后,接着会构建一个ChatRequestMessage消息对象,然后会发送给服务端,但这里先不看服务端的实现,客户端这边还需要重写一个方法。
OK,有了上述客户端的代码实现后,接着再来服务端多创建一个处理器。
@ChannelHandler.SharablepublicclassChatRequestMessageHandlerextendsSimpleChannelInboundHandler
该处理器内部的逻辑也并不复杂,首先根据单聊消息的接收人,去找一下与之对应的通道:
接着会根据上面的查询结果,进行对应的结果返回:
有了这个处理器之后,接着还需要把该处理器装载到服务端上,如下:
ChatRequestMessageHandlerCHAT_HANDLER=newChatRequestMessageHandler();ch.pipeline().addLast(CHAT_HANDLER);
装载好单聊处理器后,接着分别启动一个服务端、两个客户端,测试结果如下:
从测试结果中可以明显看出效果,其中的单聊功能的确已经实现,可以实现A→B用户之间的单聊功能,两者之间借助服务器转发,可以实现两人私聊的功能。
但多人聊天室的功能,实现之前还需要先完成建群的功能,毕竟如果群都没建立,自然无法向某个群内发送数据。
实现拉群也好,群聊也罢,其实现步骤依旧和前面相同,如下:
首先来定义两个拉群时用的消息体,如下:
publicclassGroupCreateRequestMessageextendsMessage{privateStringgroupName;privateSet
上述这个消息体是提供给客户端使用的,其中主要存在两个成员,也就是群名称与群成员列表,存放所有群成员的容器选用了Set集合,因为Set集合具备不可重复性,因此可以有效的避免同一用户多次进群,接着再来看看服务端响应时用的消息体。
publicclassGroupCreateResponseMessageextendsAbstractResponseMessage{publicGroupCreateResponseMessage(booleansuccess,Stringreason){super(success,reason);}@OverridepublicintgetMessageType(){returnGroupCreateResponseMessage;}}
这个消息体的实现尤为简单,仅仅只是给客户端返回了拉群状态以及拉群的附加信息。
前面单聊有单聊的会话管理机制,而实现多人群聊时,依旧需要有群聊的会话管理机制,首先封装了一个群聊实体类。
publicclassGroup{//聊天室名称privateStringname;//聊天室成员privateSet
接着定义了一个群聊会话的顶级接口,如下:
publicinterfaceGroupSession{//创建一个群聊GroupcreateGroup(Stringname,Set
上述接口中,提供了几个接口方法,其实也主要是群聊系统中的一些日常操作,如创群、加群、踢人、解散群、查看群成员....等功能,接着来看看该接口的实现者。
publicclassGroupSessionMemoryImplimplementsGroupSession{privatefinalMap
这个实现类没啥好说的,重点记住里面有个Map容器即可,这个容器主要负责存储所有群名称与Group群聊对象的关系,后续可以通过群聊名称,在这个容器中找到一个对应群聊对象。同时为了方便后续调用这些接口,还提供了一个工具类。
publicabstractclassGroupSessionFactory{privatestaticGroupSessionsession=newGroupSessionMemoryImpl();publicstaticGroupSessiongetGroupSession(){returnsession;}}
很简单,仅仅只实例化了一个群聊会话管理的实现类,因为这里没有结合Spring来实现,所以并不能依靠IOC技术来自动管理Bean,因此咱们需要手动创建出一个实例,以供于后续使用。
前面客户端的功能菜单中,3对应着拉群功能,所以咱们需要对3做具体的功能实现。
逻辑如下:
case"3":System.out.print("请输入你要创建的群聊昵称:");StringnewGroupName=scanner.nextLine();System.out.print("请选择你要邀请的群成员(不同成员用、分割):");Stringmembers=scanner.nextLine();Set
在该分支实现中,首先会要求用户输入一个群聊昵称,接着需要输入需要拉入群聊的用户名称,多个用户之间使用、分割,接着会把用户输入的群成员以及自己,全部放入到一个Set集合中,最终组装成一个拉群消息体,发送给服务端处理。
服务端的处理器如下:
@ChannelHandler.SharablepublicclassGroupCreateRequestMessageHandlerextendsSimpleChannelInboundHandler
这里依旧继承了SimpleChannelInboundHandler类,只关心拉群的消息,当客户端出现拉群消息时,首先会获取用户输入的群昵称和群成员,接着通过前面提供的创群接口,尝试创建一个群聊,如果群聊已经存在,则会创建失败,反之则会创建成功,在创建群聊成功的情况下,会给所有的群成员发送一条“你已被拉入[XXX]”的消息。
最后,同样需要将该处理器装载到服务端上,如下:
GroupCreateRequestMessageHandlerGROUP_CREATE_HANDLER=newGroupCreateRequestMessageHandler();ch.pipeline().addLast(GROUP_CREATE_HANDLER);
最后分别启动一个服务端、两个客户端进行效果测试,如下:
从上图的测试结果来看,的确实现了咱们的拉群效果,一个用户拉群之后,被邀请的成员都会收到来自于服务端的拉群提醒,这也就为后续群聊功能奠定了基础。
这里就不重复赘述了,还是之前的套路,定义一个客户端用的消息体,如下:
publicclassGroupChatRequestMessageextendsMessage{privateStringcontent;privateStringgroupName;privateStringfrom;publicGroupChatRequestMessage(Stringfrom,StringgroupName,Stringcontent){this.content=content;this.groupName=groupName;this.from=from;}@OverridepublicintgetMessageType(){returnGroupChatRequestMessage;}//省略其他Get/Settings、toString()方法.....}
这个是客户端用来发送群聊消息的消息体,其中存在三个成员,发送人、群聊昵称、消息内容,通过这三个成员,可以描述清楚任何一条群聊记录,接着来看看服务端响应时用的消息体。
publicclassGroupChatResponseMessageextendsAbstractResponseMessage{privateStringfrom;privateStringcontent;publicGroupChatResponseMessage(booleansuccess,Stringreason){super(success,reason);}publicGroupChatResponseMessage(Stringfrom,Stringcontent){this.from=from;this.content=content;}@OverridepublicintgetMessageType(){returnGroupChatResponseMessage;}//省略其他Get/Settings、toString()方法.....}
在这个消息体中,就省去了群聊昵称这个成员,因为这个消息体的用处,主要是给服务端转发给客户端时使用的,因此不需要群聊昵称,当然,要也可以,我这里就直接省去了。
依旧先来做客户端的实现,实现了客户端之后再去完成服务端的实现,客户端实现如下:
case"2":System.out.print("请选择你要发送消息的群聊:");StringgroupName=scanner.nextLine();System.out.print("请输入你要发送的消息内容:");StringgroupContent=scanner.nextLine();ctx.writeAndFlush(newGroupChatRequestMessage(username,groupName,groupContent));break;
因为发送群聊消息对应着之前菜单中的2,所以这里对该分支进行实现,当用户选择发送群聊消息时,首先会让用户自己先选择一个群聊,接着输入要发送的消息内容,接着组装成一个群聊消息对象,发送给服务端处理。
服务端的实现如下:
@ChannelHandler.SharablepublicclassGroupChatRequestMessageHandlerextendsSimpleChannelInboundHandler
这里依旧定义了一个处理器,关于原因就不再重复啰嗦了,服务端对于群聊消息的实现额外简单,也就是先根据用户选择的群昵称,找到该群所有的群成员,然后依次遍历成员列表,获取对应的Socket通道,转发消息即可。
接着将该处理器装载到服务端pipeline上,然后分别启动一个服务端、两个客户端,进行效果测试,如下:
效果如上图的注释,基于上述的代码测试,效果确实达到了咱们需要的群聊效果~
到这里为止,实现了最基本的建群、群聊的功能,但对于踢人、加群、解散群....等一系列群聊功能还未曾实现,但我这里就不继续重复了。