Netty ByteToMessageDecoder runs more times than it has to

I am trying to write a simple Client Server application using netty. I followed this tutorial, and specifically the time server model along with the POJO model. My problem is with the ByteToMessageDecoder : it runs more times than it has to, meaning instead of stopping when it reads a null ByteBuf it reads one more time and for some reason, that I cannot understand, it finds the previous message that my client has sent! I am certain that the client only sends said message only once!

So the idea is this : a simple Client Server model where the Client sends a "DataPacket" with a "hello world" message in it and the server responds with a "DataPacket" with an "ACK". I am using the DataPacket type because in the future I want to pass more things in it, add a header and built something a bit more complicated...But for starters I need to see what am I doing wrong in this one...

The ERROR :

As you can see the Server starts normally, my Client (Transceiver) sends the message, the encoder activates and converts it from DataPacket to ByteBuf, the message is sent and received from the server, the Decoder from the Server activates and converts it from ByteBuf to DataPacket and then the Server handles it accordingly... It should sent the ACK and repeat the same backwards but this is were things go wrong and I cannot understand why.

I have read some posts here and already tried a LengthFieldBasedFrameDecoder, it did not work and I also want to see what is wrong with this one if possible and not use something else...

Code:

Encoder and Decoder class :

 package org.client_server;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;

public class EncoderDecoder {

    public static class NettyEncoder extends MessageToByteEncoder<DataPacket> {

        @Override
        protected void encode(ChannelHandlerContext ctx, DataPacket msg, ByteBuf out)
                throws Exception {
            System.out.println("Encode: "+msg.getData());
            out.writeBytes(msg.convertData());
        }       
    }

    public static class NettyDecoder extends ByteToMessageDecoder{

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                List<Object> out) throws Exception {
            if((in.readableBytes() < 4) ) {
                return;
            }
            String msg = in.toString(CharsetUtil.UTF_8);
            System.out.println("Decode:"+msg);

            out.add(new DataPacket(msg));
        }

    }

}

Server handler :

class DataAvroHandler extends ChannelInboundHandlerAdapter {


        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            try {
                DataPacket in = (DataPacket)msg;
                System.out.println("[Server]: Message received..."+in.getData());
            }finally {
                ReferenceCountUtil.release(msg);
                //ctx.close();
            }
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx)
                throws Exception {
            System.out.println("[Server]: Read Complete...");
            DataPacket pkt = new DataPacket("ACK!");
            //pkt.setData(Unpooled.copiedBuffer("ACK", CharsetUtil.UTF_8));
            ctx.writeAndFlush(pkt);         
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            serverLog.warning("[Server]: Error..." + cause.toString());
            ctx.close();            
        }

The Client handler :

class DataAvroHandlerCl extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("[Transceiver]: Channel Active!!!");
        DataPacket pkt = new DataPacket("Hello World!");
        ChannelFuture f = ctx.writeAndFlush(pkt);
        //f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            DataPacket in = (DataPacket)msg;
            System.out.println("[Transceiver]: Message received..."+in.getData());
        }finally {
            ReferenceCountUtil.release(msg);
            //ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        transLog.warning("[Transceiver] : Error..." + cause.getMessage());
        ctx.close();
    }

}

The Server and Client pipelines :

ch.pipeline().addLast("Decoder", new EncoderDecoder.NettyDecoder());
ch.pipeline().addLast("Encoder", new EncoderDecoder.NettyEncoder());
ch.pipeline().addLast("DataAvroHandler", new DataAvroHandler());

Answers


Your problem arises from using the toString() method of the ByteBuf in in your NettyDecoder.

Quoting from the javadoc (http://netty.io/4.0/api/io/netty/buffer/ByteBuf.html#toString%28java.nio.charset.Charset%29):

This method does not modify readerIndex or writerIndex of this buffer.

Now, the ByteToMessageDecoder doesn't know how many bytes you have actually decoded! It looks like you decoded 0 bytes, because the buffer's readerIndex was not modified and therefore you also get the error messages in your console.

You have to modify the readerIndex manually:

String msg = in.toString(CharsetUtil.UTF_8);
in.readerIndex(in.readerIndex() + in.readableBytes());
System.out.println("Decode:"+msg);

Need Your Help

How far do you go with Mobile First Responsive Design?

mobile user-interface responsive-design

I'm retro-fitting a website for Mobile First Responsive Design (MFRD). My question is - how far do you go with the "Mobile First" part?.

notifyDataSetChange not working from custom adapter

android listview baseadapter notifydatasetchanged

When I repopulate my ListView, I call a specific method from my Adapter.