最近项目中遇到了一个基于mina的断包粘包问题,下面分享下解决方案。
public class DeviceMessageDecoder extends CumulativeProtocolDecoder { private final static Logger log = LoggerFactory .getLogger(DeviceManagerDecoder.class); private final AttributeKey DECODER = new AttributeKey(getClass(), "decoder"); private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context"); /** * 设置编码 */ private final Charset charset; /** * 断包处理次数 */ private int count = 0; public DeviceMessageDecoder(Charset charset) { if (charset == null) { this.charset = Charset.forName(ConstantValues.DEFAULTCHARSET); } else { this.charset = charset; } } /** * 获取session的decoder * * @param session * @return */ public CharsetDecoder getCharsetDecoder(IoSession session) { CharsetDecoder decoder = (CharsetDecoder) session.getAttribute(DECODER); if (decoder == null) { decoder = charset.newDecoder(); session.setAttribute(DECODER, decoder); } return decoder; } /** * 获取session的context * * @param session * @return */ public Context getContext(IoSession session) { Context ctx = (Context) session.getAttribute(CONTEXT); if (ctx == null) { ctx = new Context(); session.setAttribute(CONTEXT, ctx); } return ctx; } @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { count=count+1; CharsetDecoder decoder = getCharsetDecoder(session); // 当前缓冲区中数据长度小于预先设定的报文长度的长度8,则表示出现了断包 if (in.remaining() < ConstantValues.PREFIXLENGTH) { log.info("出现了断包"); return false; } Context ctx = getContext(session); int matchCount = ctx.getMatchCount();// 目前已获取的数据 int length = ctx.getLength();// 数据总长度 log.info("第"+count+"次解析"); // length为0,表示第一次解析 if (length == 0) { // 取前8位 // get方法会改变position的值 String sLen = in.getString(ConstantValues.PREFIXLENGTH, decoder); log.info("前8位:" + sLen); int msgLen = Integer.valueOf(sLen).intValue(); // 除去前8位后的报文长度 matchCount = in.remaining(); ctx.setLength(msgLen); ctx.setMatchCount(matchCount); log.info("已获取消息的长度:" + matchCount + ",消息总长度:" + msgLen); in.mark(); // 已获取消息的长度小于消息总长度,断包了,需要重新获取一次 if (matchCount < msgLen) { log.info("已获取报文的长度小于报文总长度,出现断包了"); in.reset(); return false; } // 处理当前消息 String message = in.getString(msgLen, decoder); session.write(message); // 粘包了 // 处理完了就清空断包标志 ctx.reset(); if (in.hasRemaining()) { log.info(in.remaining() + ""); return true; } } else { // 获取当前消息长度 int tmp = in.remaining(); //log.info("第二次读取的数据长度=" + tmp); matchCount = tmp; ctx.setMatchCount(matchCount); log.info("已获取消息的长度:" + matchCount + ",消息总长度:" + length); in.mark(); //// 已获取消息的长度小于消息总长度,断包了,需要重新获取一次 if (matchCount < length) { log.info("已获取报文的长度小于报文总长度,出现断包了"); in.reset(); return false; } //获取length长度的报文 String message = in.getString(length, decoder); session.write(message); // 粘包了 ctx.reset(); if (in.hasRemaining()) { log.info(in.remaining() + ""); return true; } } count=0; log.info("报文解析完成"); return false; } /** * * 定义一个内部类,用来封转当前解码器中的一些公共数据,主要是用于超长数据分包解析 */ private class Context { /** * 报文总长度 */ public int length = 0; /** * 已获取的报文长度 */ public int matchCount = 0; public Context() { } public void setLength(int length) { this.length = length; } public void setMatchCount(int matchCount) { this.matchCount = matchCount; } public int getLength() { return length; } public int getMatchCount() { return matchCount; } public void reset() { this.length = 0; this.matchCount = 0; } } }
public final class ConstantValues { /** * 消息的长度的长度。用数字字符串表示消息的长度。 */ public static int PREFIXLENGTH = 8; /** * 操作码的长度。 */ public static int OPERATORLENGTH = 6; /** * 默认的编码方式utf-8 */ public static final String DEFAULTCHARSET = "utf-8"; }