netty为消息和消息之间的转换提供了三个类,这三个类都是抽象类,分别是MessageToMessageDecoder,MessageToMessageEncoder和MessageToMessageCodec。
先来看下他们的定义:
publicabstractclassMessageToMessageEncoderextendsChannelOutboundHandlerAdapter代码语言:javascript复制publicabstractclassMessageToMessageDecoderextendsChannelInboundHandlerAdapter代码语言:javascript复制publicabstractclassMessageToMessageCodec
MessageToMessageDecoder继承自ChannelInboundHandlerAdapter,负责从channel中读取消息。
MessageToMessageCodec继承自ChannelDuplexHandler,它是一个双向的handler,可以从channel中读取消息,也可以向channel中写入消息。
有了这三个抽象类,我们再看下这三个类的具体实现。
先看一下消息的编码器MessageToMessageEncoder,编码器中最重要的方法就是write,看下write的实现:
publicvoidwrite(ChannelHandlerContextctx,Objectmsg,ChannelPromisepromise)throwsException{CodecOutputListout=null;try{if(acceptOutboundMessage(msg)){out=CodecOutputList.newInstance();@SuppressWarnings("unchecked")Icast=(I)msg;try{encode(ctx,cast,out);}finally{ReferenceCountUtil.release(cast);}if(out.isEmpty()){thrownewEncoderException(StringUtil.simpleClassName(this)+"mustproduceatleastonemessage.");}}else{ctx.write(msg,promise);}}catch(EncoderExceptione){throwe;}catch(Throwablet){thrownewEncoderException(t);}finally{if(out!=null){try{finalintsizeMinusOne=out.size()-1;if(sizeMinusOne==0){ctx.write(out.getUnsafe(0),promise);}elseif(sizeMinusOne>0){if(promise==ctx.voidPromise()){writeVoidPromise(ctx,out);}else{writePromiseCombiner(ctx,out,promise);}}}finally{out.recycle();}}}}write方法接受一个需要转换的原始对象msg,和一个表示channel读写进度的ChannelPromise。
首先会对msg进行一个类型判断,这个判断方法是在acceptOutboundMessage中实现的。
publicbooleanacceptOutboundMessage(Objectmsg)throwsException{returnmatcher.match(msg);}这里的matcher是一个TypeParameterMatcher对象,它是一个在MessageToMessageEncoder构造函数中初始化的属性:
protectedMessageToMessageEncoder(){matcher=TypeParameterMatcher.find(this,MessageToMessageEncoder.class,"I");}这里的I就是要匹配的msg类型。
如果不匹配,则继续调用ctx.write(msg,promise);将消息不做任何转换的写入到channel中,供下一个handler调用。
如果匹配成功,则会调用核心的encode方法:encode(ctx,cast,out);
注意,encode方法在MessageToMessageEncoder中是一个抽象方法,需要用户在继承类中自行扩展。
encode方法实际上是将msg对象转换成为要转换的对象,然后添加到out中。这个out是一个list对象,具体而言是一个CodecOutputList对象,作为一个list,out是一个可以存储多个对象的列表。
那么out是什么时候写入到channel中去的呢?
别急,在write方法中最后有一个finally代码块,在这个代码块中,会将out写入到channel里面。
因为out是一个List,可能会出现out中的对象部分写成功的情况,所以这里需要特别处理。
首先判断out中是否只有一个对象,如果是一个对象,那么直接写到channel中即可。如果out中多于一个对象,那么又分成两种情况,第一种情况是传入的promise是一个voidPromise,那么调用writeVoidPromise方法。
什么是voidPromise呢
我们知道Promise有多种状态,可以通过promise的状态变化了解到数据写入的情况。对于voidPromise来说,它只关心一种失败的状态,其他的状态都不关心。
如果用户关心promise的其他状态,则会调用writePromiseCombiner方法,将多个对象的状态合并为一个promise返回。
事实上,在writeVoidPromise和writePromiseCombiner中,out中的对象都是一个一个的取出来,写入到channel中的,所以才会生成多个promise和需要将promise进行合并的情况:
privatestaticvoidwriteVoidPromise(ChannelHandlerContextctx,CodecOutputListout){finalChannelPromisevoidPromise=ctx.voidPromise();for(inti=0;i 首先也是需要判断读取的消息类型,这里也定义了一个TypeParameterMatcher对象,用来检测传入的消息类型: protectedMessageToMessageDecoder(){matcher=TypeParameterMatcher.find(this,MessageToMessageDecoder.class,"I");}decoder中重要的方法是channelRead方法,我们看下它的实现: publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{CodecOutputListout=CodecOutputList.newInstance();try{if(acceptInboundMessage(msg)){@SuppressWarnings("unchecked")Icast=(I)msg;try{decode(ctx,cast,out);}finally{ReferenceCountUtil.release(cast);}}else{out.add(msg);}}catch(DecoderExceptione){throwe;}catch(Exceptione){thrownewDecoderException(e);}finally{try{intsize=out.size();for(inti=0;i 最后在finally代码块中将out中的对象一个个取出来,调用ctx.fireChannelRead进行读取。 消息转换的关键方法是decode,这个方法也是一个抽象方法,需要在继承类中实现具体的功能。 前面讲解了一个编码器和一个解码器,他们都是单向的。最后要讲解的codec叫做MessageToMessageCodec,这个codec是一个双向的,即可以接收消息,也可以发送消息。 先看下它的定义: publicabstractclassMessageToMessageCodec 它定义了两个TypeParameterMatcher,分别用来过滤inboundMsg和outboundMsg: protectedMessageToMessageCodec(){inboundMsgMatcher=TypeParameterMatcher.find(this,MessageToMessageCodec.class,"INBOUND_IN");outboundMsgMatcher=TypeParameterMatcher.find(this,MessageToMessageCodec.class,"OUTBOUND_IN");}分别实现了channelRead和write方法,用来读写消息: @OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{decoder.channelRead(ctx,msg);}@Overridepublicvoidwrite(ChannelHandlerContextctx,Objectmsg,ChannelPromisepromise)throwsException{encoder.write(ctx,msg,promise);}这里的decoder和encoder实际上就是前面我们讲到的MessageToMessageDecoder和MessageToMessageEncoder: privatefinalMessageToMessageEncoder protectedabstractvoidencode(ChannelHandlerContextctx,OUTBOUND_INmsg,List