频道栏目
首页 > 资讯 > 其他 > 正文

实用的mina断包粘包解决方案

18-05-02        来源:[db:作者]  
收藏   我要投稿

记mina断包粘包解决方案

最近项目中遇到了一个基于mina的断包粘包问题,下面分享下解决方案。
在mina中,一般的应用场景使用TextLineDecoder的Decode和Encode就够了(TextLine的默认分割符是\r\n,也可以通过new TextLineDecoder(charset,decodingDelimiter)方法指定分隔符),但默认解码器每次读取缓冲的数据是有限制的,即ReadBufferSize的大小,默认是2048个字节,当数据包比较大时将被分成多次读取,造成断包。虽然可以通过acceptor.getSessionConfig().setReadBufferSize(newsize)这种方式来增加默认容量,但容易造成空间浪费,肯定也会降低数据的处理效率。所以,当我们接收的数据的大小不是很固定,且容易偏大的时候,默认的TextLine就不适合了。这时我们在解析之前就需要判断数据包是否完整,这样处理起来就会非常麻烦。
幸好mina中提供了CumulativeProtocolDecoder类,从名字上可以看出累积性的协议解码器,也就是说只要有数据发送过来,这个类就会去读取数据,然后累积到内部的IoBuffer 缓冲区,但是具体的拆包(把累积到缓冲区的数据解码为JAVA 对象)交由子类的doDecode()方法完成。通过阅读源码,我们可以发现CumulativeProtocolDecoder就是在decode()方法内反复地调用暴露给子类实现的doDecode()方法。
CumulativeProtocolDecoder的具体执行过程如下所示:
(1)你的doDecode()方法返回true时,CumulativeProtocolDecoder的decode()方法会首先判断你是否在doDecode()方法中从内部的IoBuffer 缓冲区读取了数据,如果没有,则会抛出非法的状态异常,也就是你的doDecode()方法返回true 就表示你已经消费了本次数据(相当于聊天室中一个完整的消息已经读取完毕),进一步说,也就是此时你必须已经消费过内部的IoBuffer缓冲区的数据(哪怕是消费了一个字节的数据)。如果验证通过,那么CumulativeProtocolDecoder会检查缓冲区内是否还有数据未读取,如果有就继续调用doDecode()方法,没有就停止对doDecode()方法的调用,直到有新的数据被缓冲。
(2)当你的doDecode()方法返回false时,CumulativeProtocolDecoder会停止对doDecode()方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的IoBuffer缓冲区保存到IoSession中,以便下一次数据到来时可以从IoSession中提取合并。如果发现本次数据全都读取完毕,则清空IoBuffer缓冲区。
简而言之,当你认为读取到的数据已经够解码了,那么就返回true,否则就返回false。这个CumulativeProtocolDecoder其实最重要的工作就是帮你完成了数据的累积,因为这个工作是很烦琐的。
下面开始介绍项目中CumulativeProtocolDecoder类的具体使用情况。
项目中使用的报文由8位报文长度+6位操作码+报文体组成,其结构如下图所示:

这里8位报文长度表示的报文长度不包括前8位。
首先定义一个DeviceMessageDecoder类继承CumulativeProtocolDecoder,实现抽象方法doDecode,并定义一个内部类Context,用来封转当前解码器中的一些公共数据,如报文总长度,当前已获取的报文长度。
处理过程为:
(1)判断报文长度是否大于预先设定的报文长度,若小于,则表示出现断包,需要扔给父类重新获取一次。
(2)获取Context中保存的报文总长度和已获取的报文长度,当总长度为0表示第一次解析,获取前8位,作为报文总长度;获取当前缓冲区中剩余报文长度,判断剩余报文长度是否小于报文总长度,若小于,则表示出现了断包,需要扔给父类重新获取一次;若剩余报文长度等于报文总长度,则表示未出现分包,正常获取报文;若获取完报文总长度的报文后,缓冲区中还有数据,则可认为是出现了粘包的情况,等待下一个包合并处理。
(3)获取Context中保存的报文总长度和已获取的报文长度,当总长度不为0表示非第一次,是断包后的解析处理。此时不用截取前8位,直接获取缓冲区报文长度,判断报文长度是否小于报文总长度,若小于,则表示出现了断包,需要扔给父类重新获取一次;若剩余报文长度等于报文总长度,则表示未出现分包,正常获取报文;若获取完报文总长度的报文后,缓冲区中还有数据,则可认为是出现了粘包的情况,等待下一个包合并处理。
代码如下所示,如有问题请大神们指出,谢谢!!!

public class DeviceMessageDecoder extends CumulativeProtocolDecoder {

    private final static Logger log = LoggerFactory
            .getLogger(DeviceManagerDecoder.class);
    private final AttributeKey DECODER = new AttributeKey(getClass(), "decoder");
    private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");

    /**
     * 设置编码
     */
    private final Charset charset;

    /**
     * 断包处理次数
     */
    private int count = 0;

    public DeviceMessageDecoder(Charset charset) {
        if (charset == null) {
            this.charset = Charset.forName(ConstantValues.DEFAULTCHARSET);
        } else {
            this.charset = charset;
        }
    }

    /**
     * 获取session的decoder
     * 
     * @param session
     * @return
     */
    public CharsetDecoder getCharsetDecoder(IoSession session) {
        CharsetDecoder decoder = (CharsetDecoder) session.getAttribute(DECODER);
        if (decoder == null) {
            decoder = charset.newDecoder();
            session.setAttribute(DECODER, decoder);
        }
        return decoder;
    }

    /**
     * 获取session的context
     * 
     * @param session
     * @return
     */
    public Context getContext(IoSession session) {
        Context ctx = (Context) session.getAttribute(CONTEXT);
        if (ctx == null) {
            ctx = new Context();
            session.setAttribute(CONTEXT, ctx);
        }
        return ctx;
    }

    @Override
    protected boolean doDecode(IoSession session, IoBuffer in,
            ProtocolDecoderOutput out) throws Exception {
        count=count+1;
        CharsetDecoder decoder = getCharsetDecoder(session);
        // 当前缓冲区中数据长度小于预先设定的报文长度的长度8,则表示出现了断包
        if (in.remaining() < ConstantValues.PREFIXLENGTH) {
            log.info("出现了断包");
            return false;
        }
        Context ctx = getContext(session);

        int matchCount = ctx.getMatchCount();// 目前已获取的数据
        int length = ctx.getLength();// 数据总长度
        log.info("第"+count+"次解析");
        // length为0,表示第一次解析
        if (length == 0) {
            // 取前8位
            // get方法会改变position的值
            String sLen = in.getString(ConstantValues.PREFIXLENGTH, decoder);
            log.info("前8位:" + sLen);
            int msgLen = Integer.valueOf(sLen).intValue();
            // 除去前8位后的报文长度
            matchCount = in.remaining();

            ctx.setLength(msgLen);
            ctx.setMatchCount(matchCount);
            log.info("已获取消息的长度:" + matchCount + ",消息总长度:" + msgLen);

            in.mark();

            // 已获取消息的长度小于消息总长度,断包了,需要重新获取一次
            if (matchCount < msgLen) {
                log.info("已获取报文的长度小于报文总长度,出现断包了");
                in.reset();
                return false;
            }
            // 处理当前消息
            String message = in.getString(msgLen, decoder);
            session.write(message);
            // 粘包了
            // 处理完了就清空断包标志
            ctx.reset();
            if (in.hasRemaining()) {
                log.info(in.remaining() + "");
                return true;
            }
        } else {
            // 获取当前消息长度
            int tmp = in.remaining();
            //log.info("第二次读取的数据长度=" + tmp);
            matchCount = tmp;
            ctx.setMatchCount(matchCount);
            log.info("已获取消息的长度:" + matchCount + ",消息总长度:" + length);

            in.mark();

            //// 已获取消息的长度小于消息总长度,断包了,需要重新获取一次
            if (matchCount < length) {
                log.info("已获取报文的长度小于报文总长度,出现断包了");
                in.reset();
                return false;
            }

            //获取length长度的报文
            String message = in.getString(length, decoder);
            session.write(message);
            // 粘包了
            ctx.reset();
            if (in.hasRemaining()) {
                log.info(in.remaining() + "");
                return true;
            }
        }
        count=0;
        log.info("报文解析完成");
        return false;
    }

    /**
     * 
     * 定义一个内部类,用来封转当前解码器中的一些公共数据,主要是用于超长数据分包解析
     */
    private class Context {

        /**
         * 报文总长度
         */
        public int length = 0;
        /**
         * 已获取的报文长度
         */
        public int matchCount = 0;

        public Context() {
        }

        public void setLength(int length) {
            this.length = length;
        }

        public void setMatchCount(int matchCount) {
            this.matchCount = matchCount;
        }

        public int getLength() {
            return length;
        }

        public int getMatchCount() {
            return matchCount;
        }

        public void reset() {
            this.length = 0;
            this.matchCount = 0;
        }

    }

}
public final class ConstantValues {

    /**
     * 消息的长度的长度。用数字字符串表示消息的长度。
     */
    public static int PREFIXLENGTH = 8;

    /**
     * 操作码的长度。
     */
    public static int OPERATORLENGTH = 6;

    /**
     * 默认的编码方式utf-8
     */
    public static final String DEFAULTCHARSET = "utf-8";

}
相关TAG标签
上一篇:全面分析秒杀系统架构
下一篇:JMS 基于ActiveMq实现
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站