《NETTY官方文档》4.0的新特性及注意点(二)

    xiaoxiao2023-12-05  175

    半关闭套接字(Half-closed sockets)

    TCP及SCTP允许在不完全关闭socket的前提下关闭socket的出站传输。这样的socket称之为 ‘a half-closed socket’,用户可以通过调用 SocketChannel.shutdownOutput() 方法来产生半关闭socket。如果远端节点关闭了出站传输,SocketChannel.read(..) 就会返回 -1,看起来跟关闭的连接似乎没区别。

    3.x没有 shutdownOutput() 操作。并且 当 SocketChannel.read(..) 返回 -1 时总是会关闭连接。

    4.0中加入了 SocketChannel.shutdownOutput() 方法来支持半关闭socket,同时,用户可以设置 ChannelOption 为 ‘ALLOW_HALF_CLOSURE’ 来防止Netty在 SocketChannel.read(..) 返回 -1 时自动关闭连接。

    灵活的 I/O 线程分配

    3.x通过 ChannelFactory 创建 Channel,并且新创建的 Channel 会自动注册到一个隐藏的 I/O 线程上。4.0用新接口 EventLoopGroup 替代了 ChannelFactory,它由一个或者多个 EventLoop 组成。并且,新建的 Channel 不会自动注册到 EventLoopGroup,你必须显式调用 EventLoopGroup.register() 来完成注册。

    基于此变更(即:ChannelFactory 与 I/O 线程的分离)就可以把不同的 Channel 实现注册到同样的 EventLoopGroup 上,或者同样的 Channel 实现注册到不同的 EventLoopGroup 上。例如,你可以运行NIO server socket, NIO client sockets, NIO UDP sockets及in-VM local channels在同样的 I/O 线程上。当编写需要极低延迟的代理服务器的时候,这将十分有用。

    从已存在的 JDK socket 中创建Channel

    3.x无法从已存在的 JDK socket 中创建 Channel,如 java.nio.channels.SocketChannel。4.0可以了。

    从 I/O 线程中注销及重新注册Channel

    3.x中,一旦 Channel 创建了,它就会绑定到一个 I/O 线程上,直到这个线程关闭为止。4.0中,用户可以把 Channel 从它的 I/O 线程中注销来获得它底层的 JDK sokcet 的完全控制权。比如,你可以利用高级non-blocking I/O Netty支持( high-level non-blocking I/O Netty provides)来处理复杂的协议,然后可以注销 Channel ,再切换为阻塞模式来传输文件,以达到最大的吞吐。当然,也可以把 Channel 再重新注册回去。

    java.nio.channels.FileChannel myFile = ...; java.nio.channels.SocketChannel mySocket = java.nio.channels.SocketChannel.open(); // 执行一些阻塞操作 ... // Netty 接管 SocketChannel ch = new NioSocketChannel(mySocket); EventLoopGroup group = ...; group.register(ch); ... // 从 Netty 注销 ch.deregister().sync(); // 执行一些阻塞操作 mySocket.configureBlocking(true); myFile.transferFrom(mySocket, ...); // 重新注册到另一个 event loop group EventLoopGroup anotherGroup = ...; anotherGroup.register(ch);

    使用 I/O 线程调度任意任务

    把 Channel 注册到 EventLoopGroup 时,实际上是注册到了 EventLoopGroup 管理的一个 EventLoop 上。EventLoop 实现了 java.util.concurrent.ScheduledExecutorService。这意味着,用户可以在该channel所属的 I/O 线程上执行或者调度(execute or schedule)任意 Runnable 或者 Callable。基于后面会讲到的新的设计良好的线程模型,实现一个线程安全的handler将会十分容易。

    public class MyHandler extends ChannelOutboundHandlerAdapter { ... public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise p) { ... ctx.write(msg, p); // 调度一个写超时任务 ctx.executor().schedule(new MyWriteTimeoutTask(p), 30, TimeUnit.SECONDS); ... } } public class Main { public static void main(String[] args) throws Exception { // 使用 I/O 线程运行任意任务 Channel ch = ...; ch.executor().execute(new Runnable() { ... }); } }

    简化了的关闭

    没有 releaseExternalResources() 了。你可以使用 EventLoopGroup.shutdownGracefully() 立即关闭所有已经打开的channel以及让所有的 I/O 线程自行停止。

    类型安全的 ChannelOption

    Netty有两种方式可以配置 Channel 的socket参数。一种是显式调用 ChannelConfig 的setters,如 SocketChannelConfig.setTcpNoDelay(true)。这是最类型安全的方式了。另一种是调用 ChannelConfig.setOption() 方法。有时候你认为有些socket选项是运行时配置的,这个方法刚好适用于这种场景。但3.x中,因为用户传入一个string和一个object,所以很容易出错。当用户传入错误的选项名或者值时,用户可能会收到一个 ClassCastException 错误,或者干脆只是被默默忽略掉。

    4.0引入了新的类 ChannelOption 来提供类型安全的socket配置。

    ChannelConfig cfg = ...; // Before: cfg.setOption("tcpNoDelay", true); cfg.setOption("tcpNoDelay", 0); // 运行时 ClassCastException cfg.setOption("tcpNoDelays", true); // 打错了配置名 —— 静默忽略 // After: cfg.setOption(ChannelOption.TCP_NODELAY, true); cfg.setOption(ChannelOption.TCP_NODELAY, 0); // 编译错误

    AttributeMap

    应用户要求,现在你可以在 Channel 及 ChannelHandlerContext 上附加任何对象了。Channel 及 ChannelHandlerContext都实现了 AttributeMap 这个新接口。同时,ChannelLocal 及 Channel.attachment 被移除了。当 Channel 被GC时,其相应的属性值会被一起GC。

    public class MyHandler extends ChannelInboundHandlerAdapter { private static final AttributeKey<MyState> STATE = AttributeKey.valueOf("MyHandler.state"); @Override public void channelRegistered(ChannelHandlerContext ctx) { ctx.attr(STATE).set(new MyState()); ctx.fireChannelRegistered(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { MyState state = ctx.attr(STATE).get(); } ... }

    新的 bootstrap API

    bootstrap API 被完全重写了,当然,用途跟原来是一样的。它遵循了常见的样例代码中运行server或client的典型步骤。

    新的bootstrap还支持流式API。

    public static void main(String[] args) throws Exception { // 配置 server. EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .localAddress(8080) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(handler1, handler2, ...); } }); // 启动 server. ChannelFuture f = b.bind().sync(); // 等待socket关闭 f.channel().closeFuture().sync(); } finally { // 关闭所有的event loop来终止所有线程 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); // 等待所有线程终止 bossGroup.terminationFuture().sync(); workerGroup.terminationFuture().sync(); } }

    ChannelPipelineFactory → ChannelInitializer

    你可能注意到上面例子中已经没有 ChannelPipelineFactory 了。它已经替换为支持更多 Channel 及 ChannelPipeline 配置的ChannelInitializer 了。

    注意,不要自己创建 ChannelPipeline。Netty项目组根据至今报道的大量用例推断,用户创建自己的pipline实现或者继承其默认实现都不会带来什么好处。因此,ChannelPipeline 不再由用户创建了,而会被 Channel 自动创建。

    ChannelFuture → ChannelFuture 及ChannelPromise

    ChannelFuture 被拆分为 ChannelFuture 和 ChannelPromise。这不仅是生产者与消费者的异步操作的明确约定,同时可以更安全的使用链中(如filtering)返回的 ChannelFuture 了。因为 ChannelFuture 的状态是不可变的。

    基于此变化,部分方法现在接受 ChannelPromise 而不是 ChannelFuture 来修改状态。

    良好定义的线程模型

    3.x中线程模型定义的并不好,尽管3.5尝试进行了改良也仍然不好。4.0定义了严格的线程模型,这样用户在编写ChannelHandler时不用再过多的担忧线程安全了。

    Netty不会并发的调用 ChannelHandler 的方法,除非加了 @Sharable 注解。无论入站,出站或者生命周期事件handler方法都一样。 用户不再需要同步入站或者出站事件handler方法了。4.0仅允许标记 @Sharable 注解的 ChannelHandler 被添加多次。每个Netty的 ChannelHandler 方法的调用都存在 happens-before 关系。 用户不需要定义 volatile 字段来保存handler的状态用户在添加handler到 ChannelPipeline 时可以指定 EventExecutor 如果指定了, 则总会使用指定的 EventExecutor 来调用 ChannelHandler 的方法如果未指定,则总是使用其关联的 Channel 中注册的 EventLoop 来调用handler的方法分配给handler或者channel的 EventExecutor 及 EventLoop 线程总是单个线程 handler的方法总会在同一个线程中执行如果指定了多线程的 EventExecutor 或者 EventLoop,首先会选中一个线程,并且直到注销为止都会使用这个线程如果同一个pipeline中的两个handler分配了不同的 EventExecutor,他们会被同时调用。用户就需要关注pipeline中的共享数据的线程安全,即使共享数据只是被读取。附加到 ChannelFuture 上的 ChannelFutureListeners 总是运行在future关联的 Channel 被分配的 EventLoop 线程上The ChannelFutureListeners added to ChannelFuture are always invoked by the EventLoop thread assigned to the future’s associated Channel.可以使用 ChannelHandlerInvoker 控制 Channel 的事件顺序。DefaultChannelHandlerInvoker 会立即执行 EventLoop 线程的事件和其他线程提交到 EventExecutor 的 Runnable 对象。下面的例子展示了在 EventLoop 线程中以及其他线程中与Channel交互时的潜在影响。
    写排序 – 混合了 EventLoop 线程和其他线程
    Channel ch = ...; ByteBuf a, b, c = ...; // 线程1 - 非EventLoop线程 ch.write(a); ch.write(b); // .. 发生一些事情 // EventLoop线程 ch.write(c); // a,b,c写入底层传输通道的顺序是未定义的。 // 如果出现了线程间交互而顺序又很重要,那么如何保证顺序性就是用户的职责了

    没有 ExecutionHandler 了——移到了核心模块里

    在添加 ChannelHandler 到 ChannelPipeline 的时候,可以指定 EventExecutor。这样pipeline 就总会使用指定的 EventExecutor 来调用handler方法。

    Channel ch = ...; ChannelPipeline p = ch.pipeline(); EventExecutor e1 = new DefaultEventExecutor(16); EventExecutor e2 = new DefaultEventExecutor(8); p.addLast(new MyProtocolCodec()); p.addLast(e1, new MyDatabaseAccessingHandler()); p.addLast(e2, new MyHardDiskAccessingHandler());

    编解码器框架变更

    基于4.0中handler创建和管理它自己的buffer(参考本文档中的Per-handler buffer章节),因此编解码框架内部进行了大量的变更。不过用户层面的变化倒不是很大。

    核心编解码器类移到了 io.netty.handler.codec 包中 FrameDecoder 重命名为 ByteToMessageDecoder OneToOneEncoder 及 OneToOneDecoder 替换为 MessageToMessageEncoder 及 MessageToMessageDecoder decode(), decodeLast(), encode() 的方法签名进行了些许调整,可支持泛型了,并且移除了多余的参数

    Codec embedder → EmbeddedChannel

    Codec embedder 替换为 io.netty.channel.embedded.EmbeddedChannel,用户可以测试包含编解码器在内的的任何类型的pipline了。

    HTTP 编解码器

    HTTP解码器会将单条HTTP消息解码为多个消息对象。

    1 * HttpRequest / HttpResponse 0 - n * HttpContent 1 * LastHttpContent

    参照最新的 HttpSnoopServer 样例获取更多细节。如果对于单条HTTP消息你不想处理多个消息对象,你可以传入 HttpObjectAggregator 到pipline中。HttpObjectAggregator 会将多个消息对象转变为单个 FullHttpRequest 或者 FullHttpResponse。

    传输实现的变更

    新增加的transport:

    OIO SCTP transportUDT transport

    用例学习:移植Factorial样例

    本节简单的展示了如何将Factorial样例从3.x移植到4.0。移植到4.0的Factorial样例已经放到了 io.netty.example.factorial 包里。请查看源码来了解所有细节修改。

    移植服务端

    使用新的bootstrap API来重写 FactorialServer.run() 没有 ChannelFactory 了,请自行实例化 NioEventLoopGroup (一个是接受入站连接,另一个则是处理已接受的连接)重命名 FactorialServerPipelineFactory 为 FactorialServerInitializer 使其继承 ChannelInitializer<Channel> 通过 Channel.pipeline() 获取 ChannelPipeline 而不是新建一个使 FactorialServerHandler 继承 ChannelInboundHandlerAdapter 用 channelInactive() 替换 channelDisconnected() handleUpstream() 没用了 messageReceived() 重命名为 channelRead(),并且请根据方法签名调整参数 ctx.write() 替换为 ctx.writeAndFlush() 使 BigIntegerDecoder 继承 ByteToMessageDecoder<BigInteger> 使 NumberEncoder 继承 MessageToByteEncoder<Number> encode() 不返回buffer了。使用 ByteToMessageDecoder 填充encode过的数据到buffer里。

    移植客户端

    大部分跟移植服务端一样,不过当你要写入的流很大时则需要多加注意。

    使用新的bootstrap API重写 FactorialClient.run() FactorialClientPipelineFactory 重命名为 FactorialClientInitializer 使 FactorialClientHandler 继承 ChannelInboundHandler

    转载自 并发编程网 - ifeve.com

    相关资源:敏捷开发V1.0.pptx
    最新回复(0)