yanf4j引入了客户端非阻塞API

    xiaoxiao2024-02-20  120

    yanf4j发布一个 0.50-beta2版本,这个版本最重要的改进就是引入了 客户端连接非阻塞API,主要最近的工作要用到,所以添加了。两个核心类 TCPConnectorController和 UDPConnectorController分别用于TCP和UDP的客户端连接控制。例如,现在的UDP echo client可以写成:      // 客户端echo handler      class  EchoClientHandler  extends  HandlerAdapter {          public   void  onReceive(Session udpSession, Object t) {             DatagramPacket datagramPacket  =  (DatagramPacket) t;             System.out.println( " recv: "   +   new  String(datagramPacket.getData()));         }         @Override          public   void  onMessageSent(Session session, Object t) {             System.out.println( " send: "   +   new  String(( byte []) t));         }     }        // 连接代码,并发送UDP包         UDPConnectorController connector  =   new  UDPConnectorController();         connector.setSoTimeout( 1000 );         connector.setHandler( new  EchoClientHandler());         connector.connect( new  InetSocketAddress(InetAddress.getByName(host),                 port));          for  ( int  i  =   0 ; i  <   10000 ; i ++ ) {             String s  =   " hello  "   +  i;             DatagramPacket packet  =   new  DatagramPacket(s.getBytes(), s.length());             connector.send(packet);         }     UDP不是面向连接的,因此connect方法仅仅是调用了底层DatagramChannel.connect方法,用来限制接收和发送的packet的远程端点。     再来看看TCPConnectorController的使用,同样看Echo Client的实现: // 客户端的echo handler class  EchoHandler  extends  HandlerAdapter < String >  {         @Override          public   void  onConnected(Session session) {              try  {                  // 一连接就发送NUM个字符串                  for  ( int  i  =   0 ; i  <  NUM; i ++ )                     session.send(generateString(i));              }  catch  (Exception e) {              }         }          public  String generateString( int  len) {             StringBuffer sb  =   new  StringBuffer();              for  ( int  i  =   0 ; i  <  MESSAGE_LEN; i ++ )                 sb.append(i);              return  sb.toString();         }         @Override          public   void  onReceive(Session session, String t) {             // 打印接收到字符串              if  (DEBUG)                 System.out.println( " recv: "   +  t);                      }     } // ...连接API,TCPConnectorController示例     Configuration configuration  =   new  Configuration();         configuration.setTcpSessionReadBufferSize( 256   *   1024 );  //  设置读的缓冲区大小     TCPConnectorController    connector  =   new  TCPConnectorController(configuration,                  new  StringCodecFactory());     connector.setHandler( new  EchoHandler());     connector.setCodecFactory( new  StringCodecFactory());     try  {             connector.Connect( new  InetSocketAddress( " localhost " 8080 ));     }  catch  (IOExceptione) {             e.printStackTrace();     }     注意,connect方法 并不阻塞,而是立即返回,连接是否建立可以通过 TCPConnectorController.isConnected()方法来判断,因此通常你可能会这样使用: try  {             connector.Connect( new  InetSocketAddress( " localhost " 8080 ));              while ( ! connector.isConnected())                 ;         }  catch  (Exception e) {             e.printStackTrace();         }     来强制确保后面对connector的使用是已经连接上的connector,然而更好的做法是在Handler的onConnected()回调方法中处理逻辑,因为这个方法仅仅在连接建立后才会被调用。     两个ConnectorController都有系列send方法,用于发送数据: TCPConnectorController.send(Object msg)  throws  InterruptedException UDPConnectorController.send(DatagramPacket packet)  throws  InterruptedException UDPConnectorController.send(SocketAddress targetAddr, Object msg) throws  InterruptedException     0.50-beta2带来的另一个修改就是Session接口添加 setReadBufferByteOrder方法,用于设置session接收缓冲区的字节序,默认是网络字节序,也就是大端法。这个方法建议在Handler的onSessionStarted回调方法中调用。     在0.50-beta最重要的修改是引入了 session发送队列缓冲区的流量控制选项。默认情况下,session的发送缓冲队列是无界的,队列的push和pop也全然不会阻塞。在设置了缓冲队列的高低水位选项后即引入了发送流量控制,规则如下: a)当发送队列中的数据总量大于高水位标记(highWaterMark),Session.send将阻塞 b)在条件a的作用下,Session.send的阻塞将持续到发送队列中的数据总量小于于低水位标记(lowWaterMark)才解除。 缓冲队列高低水位的设置通过Controller的下列方法设置:      public   void  setSessionWriteQueueHighWaterMark( int  highWaterMark);       public   void  setSessionWriteQueueLowWaterMark( int  lowWaterMark);

      缓冲队列的流量控制想法来自ACE的ACE_Message_Queue,是通过com.google.code.yanf4j.util.MessageQueue类实现的。    0.50-beta还引入了Session.send(Object msg)的重载版本 Session.send(Object msg,long timeout),在超过timeout时间后send仍然阻塞时即终止send。注意,现在Session.send的这两个方法都返回一个bool值来表示send成功与否,并且都将响应中断(仅限启动了流量控制选项)抛出InterruptedException。

    文章转自庄周梦蝶  ,原文发布时间2009-02-19

    相关资源:敏捷开发V1.0.pptx
    最新回复(0)