yanf4j发布一个
0.50-beta2版本,这个版本最重要的改进就是引入了
客户端连接非阻塞API,主要最近的工作要用到,所以添加了。两个核心类
TCPConnectorController和
UDPConnectorController分别用于TCP和UDP的客户端连接控制。例如,现在的UDP echo client可以写成:
//
客户端echo handler
class
EchoClientHandler
extends
HandlerAdapter {
public
void
onReceive(Session udpSession, Object t) { DatagramPacket datagramPacket
=
(DatagramPacket) t; System.out.println(
"
recv:
"
+
new
String(datagramPacket.getData())); } @Override
public
void
onMessageSent(Session session, Object t) { System.out.println(
"
send:
"
+
new
String((
byte
[]) t)); } }
//
连接代码,并发送UDP包
UDPConnectorController connector
=
new
UDPConnectorController(); connector.setSoTimeout(
1000
); connector.setHandler(
new
EchoClientHandler()); connector.connect(
new
InetSocketAddress(InetAddress.getByName(host), port));
for
(
int
i
=
0
; i
<
10000
; i
++
) { String s
=
"
hello
"
+
i; DatagramPacket packet
=
new
DatagramPacket(s.getBytes(), s.length()); connector.send(packet); }
UDP不是面向连接的,因此connect方法仅仅是调用了底层DatagramChannel.connect方法,用来限制接收和发送的packet的远程端点。
再来看看TCPConnectorController的使用,同样看Echo Client的实现:
//
客户端的echo handler
class
EchoHandler
extends
HandlerAdapter
<
String
>
{ @Override
public
void
onConnected(Session session) {
try
{
//
一连接就发送NUM个字符串
for
(
int
i
=
0
; i
<
NUM; i
++
) session.send(generateString(i)); }
catch
(Exception e) { } }
public
String generateString(
int
len) { StringBuffer sb
=
new
StringBuffer();
for
(
int
i
=
0
; i
<
MESSAGE_LEN; i
++
) sb.append(i);
return
sb.toString(); } @Override
public
void
onReceive(Session session, String t) {
//
打印接收到字符串
if
(DEBUG) System.out.println(
"
recv:
"
+
t); } }
//
...连接API,TCPConnectorController示例
Configuration configuration
=
new
Configuration(); configuration.setTcpSessionReadBufferSize(
256
*
1024
);
//
设置读的缓冲区大小
TCPConnectorController connector
=
new
TCPConnectorController(configuration,
new
StringCodecFactory()); connector.setHandler(
new
EchoHandler()); connector.setCodecFactory(
new
StringCodecFactory());
try
{ connector.Connect(
new
InetSocketAddress(
"
localhost
"
,
8080
)); }
catch
(IOExceptione) { e.printStackTrace(); }
注意,connect方法
并不阻塞,而是立即返回,连接是否建立可以通过
TCPConnectorController.isConnected()方法来判断,因此通常你可能会这样使用:
try
{ connector.Connect(
new
InetSocketAddress(
"
localhost
"
,
8080
));
while
(
!
connector.isConnected()) ; }
catch
(Exception e) { e.printStackTrace(); }
来强制确保后面对connector的使用是已经连接上的connector,然而更好的做法是在Handler的onConnected()回调方法中处理逻辑,因为这个方法仅仅在连接建立后才会被调用。
两个ConnectorController都有系列send方法,用于发送数据:
TCPConnectorController.send(Object msg)
throws
InterruptedException UDPConnectorController.send(DatagramPacket packet)
throws
InterruptedException UDPConnectorController.send(SocketAddress targetAddr, Object msg)
throws
InterruptedException
0.50-beta2带来的另一个修改就是Session接口添加
setReadBufferByteOrder方法,用于设置session接收缓冲区的字节序,默认是网络字节序,也就是大端法。这个方法建议在Handler的onSessionStarted回调方法中调用。
在0.50-beta最重要的修改是引入了
session发送队列缓冲区的流量控制选项。默认情况下,session的发送缓冲队列是无界的,队列的push和pop也全然不会阻塞。在设置了缓冲队列的高低水位选项后即引入了发送流量控制,规则如下:
a)当发送队列中的数据总量大于高水位标记(highWaterMark),Session.send将阻塞 b)在条件a的作用下,Session.send的阻塞将持续到发送队列中的数据总量小于于低水位标记(lowWaterMark)才解除。
缓冲队列高低水位的设置通过Controller的下列方法设置:
public
void
setSessionWriteQueueHighWaterMark(
int
highWaterMark);
public
void
setSessionWriteQueueLowWaterMark(
int
lowWaterMark);
缓冲队列的流量控制想法来自ACE的ACE_Message_Queue,是通过com.google.code.yanf4j.util.MessageQueue类实现的。 0.50-beta还引入了Session.send(Object msg)的重载版本 Session.send(Object msg,long timeout),在超过timeout时间后send仍然阻塞时即终止send。注意,现在Session.send的这两个方法都返回一个bool值来表示send成功与否,并且都将响应中断(仅限启动了流量控制选项)抛出InterruptedException。
文章转自庄周梦蝶 ,原文发布时间2009-02-19
相关资源:敏捷开发V1.0.pptx