mina 2 心跳包

    xiaoxiao2026-04-16  2

    接收到心跳后先解码,先不调用经过messageReceived()方法,先触发心跳接收发送类KeepAliveMessageFactoryImpl中的isRequest()方法,当判断是心跳时,就会发一个心跳,不再调用messageReceived(),当判断不是心跳时,回调messageReceived()方法,输出内容。 //服务器 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.keepalive.KeepAliveFilter; import org.apache.mina.filter.keepalive.KeepAliveMessageFactory; import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.SocketAcceptor; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import public_protocol.MsgProtocol; import server.KeepAliveRequestTimeoutHandlerImpl; import serverUntil.ServerInfo; /** * 服务端启动 * @author Lhy * */ public class ServerStart { //设置服务器的端口,从全局变量类中获取服务器端口 private static final int PORT =ServerInfo.getServerPort(); /** 30秒后超时 */ //private static final int IDELTIMEOUT = 15; /** 15秒发送一次心跳包 */ private static final int HEARTBEATRATE = 5; //心跳超时时间 private static final int RequestTimeout=15; private static SocketAcceptor acceptor; private ServerStart() {} public static SocketAcceptor getAcceptor(){ if(null==acceptor){ // 创建非阻塞的server端的Socket连接 acceptor = new NioSocketAcceptor(); } return acceptor; } public static boolean serverStart() { IoAcceptor acceptor = getAcceptor(); //缓冲区大小 acceptor.getSessionConfig().setReadBufferSize(1024); //添加日志过滤器 acceptor.getFilterChain().addLast("logger", new LoggingFilter()); //添加编码过滤器 acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new MsgProtocol())); //心跳包类,用于接收和发送心跳 KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl(); //心跳超时类 KeepAliveRequestTimeoutHandler heartBeatHandler = new KeepAliveRequestTimeoutHandlerImpl(); //心跳超时过滤器,IdleStatus.BOTH_IDLE表示在此连接上的读写操作 KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory,IdleStatus.BOTH_IDLE,heartBeatHandler); //当设置为true时同时执行心跳判断和空闲判断的MyIoHandler类的sessionIdle()方法 //当为false时就不在调用sessionIdle()方法 heartBeat.setForwardEvent(false); //设置多长时间发送一条心跳 heartBeat.setRequestInterval(HEARTBEATRATE); //心跳超时时间,当超时后调用自己写的KeepAliveRequestTimeoutHandlerImpl类进行操作 heartBeat.setRequestTimeout(RequestTimeout); //把心跳加入过滤器 acceptor.getFilterChain().addLast("heartbeat", heartBeat); //设置,用于事件触发回调的类 acceptor.setHandler(new MyIoHandler()); try { acceptor.bind(new InetSocketAddress(PORT)); return true; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Server started on port: " + PORT); return false; } } //客户端 package lhy.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import lhy.client_util.ServerInfo; import lhy.protocol.MsgProtocol; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.keepalive.KeepAliveFilter; import org.apache.mina.filter.keepalive.KeepAliveMessageFactory; import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 客户端启动连接 * @author Lhy * */ public class ClientStart { private static IoSession is; /** 30秒后超时 */ // private static final int IDELTIMEOUT = 15; /** 15秒发送一次心跳包 */ private static final int HEARTBEATRATE = 15; //心跳超时时间 private static final int RequestTimeout=15; private static NioSocketConnector connector ; public static NioSocketConnector getConnector(){ if(null==connector){ // 创建非阻塞的server端的Socket连接 connector = new NioSocketConnector(); } return connector; } public static IoSession getIoSession(){ return is; } public static boolean clientStart() { //从全局变量类中获取服务器ip,port String serverIp=ServerInfo.getServerIp(); int serverPort=ServerInfo.getServerPort(); System.out.println(serverIp+" "+serverPort); NioSocketConnector connector = getConnector(); connector.getSessionConfig().setReadBufferSize(1024); // connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new MsgProtocol())); KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl(); KeepAliveRequestTimeoutHandler heartBeatHandler = new KeepAliveRequestTimeoutHandlerImpl(); KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory,IdleStatus.BOTH_IDLE,heartBeatHandler); heartBeat.setRequestInterval(HEARTBEATRATE); //心跳超时 heartBeat.setRequestTimeout(RequestTimeout); connector.getFilterChain().addLast("heartbeat", heartBeat); connector.setHandler(new MyIoHandler()); // connector.getSessionConfig().setBothIdleTime(IDELTIMEOUT); // connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, // IDELTIMEOUT); //设置连接超时时间 connector.setConnectTimeoutMillis(5000); ConnectFuture cf = connector.connect(new InetSocketAddress(serverIp,serverPort));// 建立连接 cf.awaitUninterruptibly();// 等待连接创建完成 try { is=cf.getSession(); //getIoSession().write(new String(XmlUtils.getXml().getBytes("UTF-8")));// 发送消息 } catch (Exception e) { connector.getFilterChain().remove("codec"); connector.getFilterChain().remove("heartbeat"); System.out.println("连接超时"); return false; } return true; } /** * @ClassName KeepAliveMessageFactoryImpl * @Description 内部类,实现KeepAliveMessageFactory(心跳工厂) * @author cruise * */ // // } } //心跳发送接收类 package server; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.keepalive.KeepAliveMessageFactory; import server_domain.HeartReq102; import server_domain.HeartRes101; import server_domain.MsgPack; /** * @see 发送心跳包的内容 */ public class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory{ //心跳包内容 private static HeartReq102 hq=new HeartReq102(); private static HeartRes101 hs=new HeartRes101(); private static final MsgPack HEARTBEATREQUEST = new MsgPack(hq.getCode(),hq.getData()); private static final MsgPack HEARTBEATRESPONSE = new MsgPack(hs.getCode(),hs.getData()); /** * @see 返回给客户端的心跳包数据 return 返回结果才是客户端收到的心跳包数据 * @author Herman.Xiong */ @Override public boolean isRequest(IoSession session, Object message) { MsgPack msg=(MsgPack)message; if (msg.getMsgCode()==HEARTBEATREQUEST.getMsgCode()) { //System.out.println("获得请求心跳"+message.toString()); return true; } return false; } @Override public boolean isResponse(IoSession session, Object message) { MsgPack msg=(MsgPack)message; if(msg.getMsgCode()==HEARTBEATRESPONSE.getMsgCode()) { //System.out.println("收到心跳响应"+message); return true; } return false; } @Override public Object getRequest(IoSession session) { //System.out.println("发送请求心跳"+HEARTBEATREQUEST); return HEARTBEATREQUEST; } @Override public Object getResponse(IoSession session, Object request) { //System.out.println("回复心跳"+HEARTBEATRESPONSE); return HEARTBEATRESPONSE; // return null; } } //事件触发类 package server; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import serverUntil.ClientRequestInfo; import server_command.ServerCommandFactory; import server_domain.MsgPack; /** * 连接IoHeadler * @author Lhy * */ public class MyIoHandler extends IoHandlerAdapter{ private final static Logger log = LoggerFactory .getLogger(MyIoHandler.class); @Override public void sessionOpened(IoSession session) throws Exception { } @Override public void sessionClosed(IoSession session) throws Exception { System.out.println("当前连接已经关闭"+session.getRemoteAddress()); //删除全局变量中的为sessionId的映射 ClientRequestInfo.deleteWithSessionId(session.getId()); System.out.println("保存的当前用于服务器定时刷新的映射为"+ClientRequestInfo.cReqInfo); } @Override public void messageReceived(IoSession session, Object message) throws Exception { MsgPack msg=(MsgPack)message; System.out.println("收到消息"+msg.toString()); int code=msg.getMsgCode(); String data=msg.getMsgPack(); ServerCommandFactory serverCmd=new ServerCommandFactory(); serverCmd.CodeToDo(session,code,data); } public void sessionCreated(IoSession session) throws Exception { System.out.println("创建一个新连接:"+ session.getRemoteAddress()+" id: "+session.getId()); //session.write("welcome to the chat room !"); } public void sessionIdle(IoSession session, IdleStatus status) throws Exception { System.out.println("当前连接处于空闲状态:"+ session.getRemoteAddress()+ status); // session.close(true); } } 相关资源:mina心跳包机制
    最新回复(0)