网络-Java中NIO详解(BufferSelectorChannel三大组件原理)

    xiaoxiao2022-07-07  196

    NIO(non-blocking IO)概述:

    NIO实现的IO模型是IO复用,而不是非阻塞模型。

    关于五种IO模型请点击此篇博客

    BIO为jdk1.4及以上版本里提供的新api ,为所有原始类型(boolean除外) 提供缓存支持的容器。 NIO服务器实现模式为所有请求一个线程,即客户端发送的连接请求都会被注册到多路复用器上,多路复用器轮询到连接有IO请求时才启动一个线程进行处理。并不是说所有的任务都是一个线程进行处理,有的任务可能业务逻辑较长,别的客户端请求可能无法被即时处理,这也是一种变相阻塞。我们可以采用NIO+线程池 的处理模式,一个线程专门用于客户端的连接,将别的操作作为任务提交给线程池。 后文有业务代码演示。 NIO主要想解决的是BIO的大并发问题: 在使用同步IO的网络应用中,如果要同时和多个服务器进行通信,就必须使用多线程进行处理。也就是说,将每一个客户端请求分配给一个线程来单独处理。这样做虽然可以达到我们的要求,但是同时又会带来另外一个问题:由于每创建一个线程,就要为这个线程分配一定的内存空间(也叫工作存储器),而操作系统本身对线程的总数也会有一定的限制,如果客户端的请求过多,服务端程序可能因为不堪重负而拒绝客户端请求 ,服务器甚至也可能会因此而瘫痪。 NIO基于Reactor(反应器) ,当socket有流可读或者可写时,操作系统会响应通知应用程序进行处理,应用再将流读取到缓冲区或者写入操作系统。 NIO最重要的一个地方是当一个连接创建后,不需要对应一个线程,这个连接会被注册到多路复用器上面,所以所有连接只需要一个线程就可以搞定 ,当这个线程中的多路复用器进行轮询的时候,发现连接上有请求的话,才开启一个线程进行处理,也就是一个请求一个线程模式 。

    流与块:

    IO和NIO最重要的区别就是数据打包和传输的方式,IO以流的方式处理数据,而NIO以块的方式处理数据。 面向流的IO一次处理一个字节数据:一个输入流产生一个字节数据,一个输出流消费一个字节数据。为流式数据创建过滤器非常容易,连接一个过滤器,以便每个过滤器只负责复杂处理机制的一部分。不利的一面是,面向流的IO非常慢。 面向块的IO一次处理一个数据块,按照处理数据比按照流处理数据要块很多。但是面向块的IO缺少一些面向流的IO所具有的优雅性和简单性。 IO包和NIO已经很好的集合了,java.io.* 已经以NIO为基础重新实现了,所以它现在可以利用NIO的一些特性。例如,java.io.* 包中的一些类包含以快的形式读取数据的方法,这使得即使在面向流的系统中,处理速度也会更快。

    channel和stream的区别:

    方向性:channel数据是双向通行(read/write) ,stream是单向通行的。channel是必须和buffer结合使用的,stream可以和buffer配套,也可以不用。channel是可以设置为阻塞和非阻塞的。流本身就是阻塞的。

    NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器/IO复用器)

    传统IO式基于字节流和字符流进行操作,而NIO基于Channel和Buffer进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道。

    Channel(通道):称之为通道,和IO相连,通信双方进行数据交流的通道,需要和buffer结合使用。

    读数据:channel-》buffer-》read

    写数据:write-》buffer-》channel

    通道Channel是对原IO包中的流的模拟,可以通过它读取和写入数据。 通道与流的不同之处在于,流只能在一个方向上移动(一个流必须是InputStream或者OutputStream的子类) ,而通道是双向的,可以用于读,写或者同时用于读写。 通道包括以下类型:

    FileChannel: 从文件中读取数据。DatagramChannel : 通过UDP读取网络中数据。SocketChannel: 通过TCP读写网络中数据,用来建立TCP连接(connect)ServerSocketChannel: 可以监听新进来的TCP连接,对每个新进来的连接都会创建SocketChannel ,一般在服务端使用。
    通道工具类:Channels

    Buffer(缓冲区):对数据的读取/写入需要使用buffer,buffer本质就是一个数组。

    发送给一个通道的所有数据都必须首先放到缓冲区中,同样的,从通道中读取的任何数据都要先读到缓冲区中。也就是说,不会直接对通道进行读写数据,而是要先经过缓冲区。 缓冲区实质上是一个数据,但它不仅仅是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读写进程。 缓冲区包括以下类型:

    ByteButterCharBufferShortBufferIntBufferLongBufferFloatBufferDoubleBuffer
    缓冲区状态变量
    capacity(容量): 最大容量position(位置): 当前已经读写的字节数,也就是当前读写的位置。limit(上界): 缓冲区中第一个不能被读写的元素。或者说,缓冲区现存元素的计数。Mark(标记): 一个标记位置,调用mark()来设定mark=postion 。 调用reset() 设定 position=mark 。 标记在没有设定前是undefined(未定义的) 。

    这四个属性之间总是遵循以下关系:

    0<=mark<=position<=limit<=capacity

    状态变量的改变过程举例:
    新建一个大小为8字节的缓冲区,此时position为0,而limit=capacity=8 。 capacity变量不会改变,下面将忽略它。 从输入通道中读取五个字节数据写入缓冲区,此时position移动设置为5,limit保持不变。 在将缓冲区的数据写道输出通道之前,需要先调用flip()方法,这个方法将limit设置为当前position,并将position设置为0。 从缓冲区中取4个字节到输出缓冲中,此时position设为4。 最后需要调用clear()方法清空缓冲区,此时position和limit都被设置为最初位置。 缓冲区并不是线程安全的。如果想用多线程同时存取特定的缓冲区,你需要在存取缓冲区之前进行同步。
    两个缓冲区相等的必要条件:
    两个对象类型相同。包含不同数据类型的buffer永远不会相等,而且buffer绝不会等于非buffer对象。两个对象都剩余同样数量的元素。buffer的容量不需要相同,而且缓冲区中剩余数据的索引也不必相同。但每个缓冲区中剩余元素的数目(position->limit) 必须相同。在每个缓冲区中应被Get()函数返回的剩余数据元素序列必须一致。 总结起来就是,缓冲区有效的数据和顺序必须一样,position到limit元素个数一致。
    复制缓冲区:
    Duplicate(): 创建一个与原始缓冲区相似的新缓冲区,两个缓冲区共享数据元素,拥有相等的元素 ,但是每个缓冲区拥有各自的位置,上界和标记属性。 对一个缓冲区的数据元素所做的改变会反映到另外一个缓冲区上。这一副本缓冲区具有与原始缓冲区同样的数据视图。如果原始缓冲区为只读,或者为直接缓冲区,新的缓冲区将继承这些权限。复制缓冲区会创建一个新的buffer对象,但是并不复制数据。原始缓冲区与副本操作同样的数据元素 。 asReadOnlyBuffer(): 生成只读缓冲区。如果一个只读缓冲区与一个可写缓冲区共享数据,或者有包装好的备份数组,那么对这个可写的缓冲区或直接对这个数组的改变将反映在所有关联的缓冲区上,包括只读缓冲区。slice(): 分割缓冲区,创建一个从原始缓冲区的当前位置开始的新缓冲区,并且其容量是原始缓冲区的剩余元素数量(limit-position)这个新缓冲区与原始缓冲区共享一段数据元素子序列。分割出来的缓冲区也会继承只读属性和直接属性

    buffer实例方法:

    allocate(capacity): 在堆上创建指定大小的对象空间。allocateDirect(capacity): 在堆外空间创建指定大小的空间。有时buffer作用时间超长,在堆内创建会影响gc效率,在堆外创建的话,不受gc的影响。wrap(byte()): 通过存在的数组创建对象。wrap(byte[] ,offerset,length): 通过存在的数组创建对象。

    buffer普通方法:

    buffer.put(): 往buffer中写入数据,position指针移动。buffer.flip(): 读写模式切换,limit指向pos,pos指向mark。buffer.get(): pos指针移动。buffer.clear(): 清空buffer缓存 mark=undefined。 pos=0,limit=capacity

    使用中始终关注读写切换。

    文件NIO实例:快速复制文件
    package Internet.NIODemo; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; public class FileNIO { public static void fastCopy(String src,String dist)throws IOException{ //获得源文件输入字节流 FileInputStream inputStream=new FileInputStream(src); //获取输入字节流的文件通道 FileChannel inChannel=inputStream.getChannel(); //获取目标文件的输出字节流 FileOutputStream outputStream=new FileOutputStream(dist); //获取输出字节流的文件通道 FileChannel outChannel=outputStream.getChannel(); //分配缓冲区 ByteBuffer buffer=ByteBuffer.allocate(1024); while(true){ //从输入通道中读取数据到缓冲区中 int n=inChannel.read(buffer); //read返回-1表示EOF,文件结束。 if(n==-1) break; //读写切换 buffer.flip(); //把缓冲区内容写入输出文件中 outChannel.write(buffer); //清空缓冲区 buffer.clear(); } } public static void main(String[] args) { try { fastCopy(); } catch (IOException e) { e.printStackTrace(); } } }

    选择器(Selector):

    NIO常常被叫做非阻塞IO,主要是因为NIO在网络通信中的非阻塞特性被广泛使用。 NIO实现了IO多路复用中的Reactor模型,一个线程Thread使用选择器Selector通过轮询的方式去监听多个通道Channel上的事件,从而让一个线程可以处理多个事件。 通过配置监听的通道Channel非阻塞,那么当Channel上的事件还未达到时,就不会进入阻塞状态一直等待而是继续轮询其它Channel,找到IO事件已经到达的channel执行。 因为创建和切换线程的开销很大,因此使用一个线程来处理多个事件而不是一个线程处理一个事件,这样具有良好的性能。 需要注意的是,只有套接字Channel才能配置为非阻塞,而FileChannel不能 为FileChannel配置为非阻塞也没有意义。因为本机的文件,完全不存在延时达到的问题。

    Selector对象键的集合:一个选择器可以同时来管理多个用户连接,将关注的事件注册到选择器上,通过选择器来帮我们监听事件是否完成,在此期间用户可以做自己的事情。

    选择器维护着注册过的通道的集合,并且这些注册关系中的任意一个都是封装在SelectionKey 对象中的,每一个selector对象维护者三个键的集合。

    已注册键的集合(Registered key set):

    与选择器关联的已经注册的键的集合。并不是所有注册过的键都仍然后效。这个集合通过keys() 方法返回,可是会是空的。已注册的键的集合不可以直接修改。

    已选择键的集合(Selected key set):

    已注册的键的集合的子集。这个集合的每个成员都是相关的通道被选择器(前一个选择操作中) 判断为已经准备好的,并且包含键的interest集合中的操作。这个集合通过selectedKeys()方法返回(可能为空)。 不要将已选择键集合和ready集合搞混。这是一个键的集合,每个键都关联一个已经准备好至少一种操作的通道(一个键关联一个channel)。每个键都有一个内嵌的ready集合,指示了所关联的通道已经准备好的操作。 当我们遍历这个集合时,如果对事件没有处理,那么将丢弃这个事件 键可以从这个集合中移除,但不能添加。添加将抛出异常。

    已取消的键的集合(Cancelled key set):

    已注册的键的集合的子集,这个集合包含了cancel()方法被调用过的键(这个键已经被无效化),但是还没有被注销。这个集合是选择器对象的私有成员,无法直接访问 。 已取消键的集合在select() 操作期间将被清空。

    选择器的选择过程:

    在一个刚初始化Selector对象中,这三个集合都是空的。 选择器是对select(),poll()等本地调用或者类似的操作系统特定的系统调用的一个包装。但是selector所做不是简单向本地代码传送参数。他对每个选择操作应用了特定的过程。也就是合理地管理键和它们所表示的状态信息基础。

    select()/selectNow()调用业务逻辑:
    已取消键的集合将被检查。如果是非空的,每个已取消键的集合中的键将从另外两个集合中移除,并且相关的通道将被注销。这个步骤结束后,已取消键的集合将是空的。已注册键的集合中的键的interest集合将被检查。在这个步骤中的检查执行过后,对interest集合的改动不会影响剩余的检查过程。 一旦就绪条件被定写下来,底层操作系统将会进行查询,以确定每个通道所关心的操作的真实就绪状态。依赖于特定select()方法调用,如果没有通道准备好,线程可能会在这里阻塞。 直到系统调用完成为止,这个过程可能会使得调用线程睡眠一段事件,然后当前每个通道的就绪状态将确定下来。这对于那些还没准备好的通道将不会执行任何操作。对于至少准备好interest集合中的一种操作的通道,将执行下面两种操作的一种: a: 如果通道的键还没有处于已选择键的集合中,那么键的ready集合将被清空,然后表示操作系统发现的当前通道已经准备好的操作的比特掩码将被设置。(通俗来说,就是将已准备好的事件如read,write等在ready集合中做一个标记)。 b: 否则,也就是键在已选择键的集合中。键的ready集合将被表示操作系统发现的当前通道已经准备好的操作的比特掩码更新。所有之前的已经不再是就绪状态的操作不会被清除。事实上比特位都不会被清理。由操作系统决定的ready集合与之前的ready集合按位分离的,一旦被放置于选择器的已选择集合中,它的ready操作将是累积的。比特位只会被设置,不会被清理。步骤2可能会花费很长时间,特别是所激发的线程处于休眠状态时。与该选择器相关的键可能会同时被取消。当步骤2结束后,步骤1将被重新执行,以完成任意一个在选择进行过程中,键已经被取消的通道的注销。selector操作返回的值是ready集合在步骤2中被修改的键的数量,而不是已选择键的集合中的通道总数。返回值不是已准备好的通道总数,而是从上一次select()调用之后进入就绪状态的通道的数量。之前调用就绪的,并且本次调用仍然就绪的通道不会被计入,而那些在前一次调用就绪但本次调用不处于就绪状态的通道也不会被记入。这些通道可能仍然在已选择的键的集合中,但不会进入返回值。返回值可能为0 。通俗来说,返回值就是新进选择集合的键的个数,并且以前选择集合中的键只能靠我们人工删除,系统不会帮我们处理

    使用内部的已取消键的集合来延迟注销 ,是一种防止线程在取消键时阻塞 ,并防止与正在进行的选择操作冲突的优化。注销通道是一个潜在的代价很高的操作,这可能需要重新分配资源(键是与通道相关的,并且可能与他们相关的通道对象之间有复杂的交互)。清理已取消的键,并在选择操作之前和之后立即注销通道,可以消除它们可能正好在选择过程中执行的潜在棘手问题。

    停止选择过程:

    wakeup(): 使选择器上第一个还没有返回的操作立即返回。如果当前没有正在进行的选择,那么下一次对select()方法的一种形式的调用将立即返回,后续的选择操作将正常进行。在选择操作之间进行多次调用wakeup()方法与调用一次没有区别。close(): 如果选择器的close()方法被调用,那么任何一个在选择操作中阻塞的线程都将被唤醒,与选择器有关的通道将被注销,而键将被取消。interrupt(): 睡眠中的线程interrupt()方法被调用,它的返回状态将被设置(只是设置中断位,不是立即中断)。如果被唤醒的线程之后试图在通道上进行IO操作,通道将立即关闭,然后线程将捕捉到一个异常。selector对象将捕获异常并调用wakeup()方法。
    这几个方法任意一个都不会去关闭任何一个相关的通道。中断一个选择器与中断一个通道是不一样的。选择器不会改变任意一个相关的通道,他只会检查channel的状态。当一个在seletor()方法中睡眠的线程中断时,对于通道的状态而言,是不会产生歧义的。

    Selector使用步骤:

    获取选择器实例:Selector.open();将通道注册到选择器上,并确定关注的事件: channel.configureBlocking(false):通道设置为非阻塞。 channel.register(selector,SelectionKey.OP_READ)。选择器监听事件: selector.selector(): 监听方法有三个: 返回结果表示感兴趣的事件个数。 int select(): 该方法会阻塞住,直到有感兴趣事件完成之后才会发生。int select(long timeout): 该方法会在指定的时间内返回,可能有时间返回,也可能没有时间发生。int selectNow(): 该方法不会阻塞,会立马返回,可能有事件发生,也可能没有事件发生。 遍历感兴趣事件集合,判断哪种事件完成。 Iterator< SelectorKey > iteraator=selector.selectedKeys().iterator();关闭selector的资源:

    SelectionKey类方法详解:

    channel(): 将注册到selector中的通道在SelectionKey中也维护一个。selector(): 将当前选择器实例在SelectionKey中维护一个。boolean isValid(): 表示当前的selectionKey是否有效。cancel() : 取消对当前key的关注。boolean isWritable(): 可写事件。boolean isReadable(): 可读事件。boolean isConnectable(): 可连接事件。boolean isAcceptable(): 可接受事件。
    SelectionKey下维护的事件: 四种。
    int OP_READ=1<<0; //读事件 int OP_WRITE=1<<2;//写事件 int OP_CONNECT=1<<3;//可连接事件 int OP_ACCEPT=1<<4;//可接受事件

    NIO网络通信编程流程:

    服务端:
    创建ServerSocketChannel实例。对通道绑定端口(bind)将通道设置为非阻塞(configureBlocking) 通道必须配置为非阻塞模型,否则使用选择器就没有任何意义了,因为如果通道在某个事件上被阻塞,那么服务器就不能响应其它事件,必须等待这个事件处理完毕才能去处理其它事件,显然这和选择器的作用背道而驰。实例化selector选择器(Selector.open())将ServerSocketChannel实例注册(regiser)到selector选择器中。 再将通道注册到选择器上时,还需要指定要注册的具体事件,主要有以下几类: SelectionKey.OP_CONNECTSelectionKey.OP_ACCEPTSelectionKey.OP_READSelectionKey.OP_WRITE 他们在SelectionKey的定义如下:

    可以看出每个事件可以被当成一个位域,从而组成事件集整数。例如:

    int interestSet=Selection.OP_READ|Selection.OP.WRITE; 选择器进行监听,有事件则返回。selector.select(),它会一直阻塞直到至少有一个事件到达。遍历感兴趣事件集合,判断是否有可接收事件。有可接收事件发生,获取对应通道,调用accept()方法,SocketChannel实例将serverSocket实例设置为非阻塞,将其注册到选择器,并关注读事件循环第6步,遍历集合,判断是否有可读事件发生。通过Buffer从channel中读取数据。关闭打开的资源,包括selector选择器,关闭SocketChannel实例,ServerSocketChannel实例。
    客户端:
    创建SocketChannel实例。将SocketChannel实例设置为非阻塞。创建Selector实例。连接服务端,立即返回结果不成功,将socketchannel注册到selector,并关注可连接事件。selector实例监听事件,有事件发生则返回。若改事件是可连接事件,则完成连接(finishconnect)发送消息,接收消息。关闭资源。

    NIO(TCP)网络通信实例:单线程处理

    服务端:
    package Internet.NIODemo; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class NIOServer { public static void main(String[] args) { try { //创建服务端ServerSocketChannel实例 ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); //绑定端口 //ServerSocket socket=serverSocketChannel.socket(); serverSocketChannel.bind(new InetSocketAddress(8888)); System.out.println("服务端启动"); //创建selector实例 Selector selector=Selector.open(); //设置ServerSocketChannel实例为非阻塞 serverSocketChannel.configureBlocking(false); //不设置默认为阻塞。 //将该实例注册到选择器上,并设置关注accept事件。 serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); //让选择器帮助关注事件。select()会阻塞,直至有关注事件发生才返回。 //返回值表示关注地事件有多少个发生了。 //int num=selector.select();//参数可以放入时间,不放则一直阻塞。 while(selector.select()>0){ System.out.println("有关注的事件发生"); //表示关注事件的集合。 Iterator<SelectionKey> iterator=selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key=iterator.next(); if(key.isValid()&&key.isAcceptable()){ System.out.println("有可接收事件发生"); //表示可接收事件完成 ServerSocketChannel ssc=(ServerSocketChannel)key.channel(); //接收客户端的连接的accept操作 SocketChannel socketChannel=ssc.accept(); //设置SocketChannel实例为非阻塞 socketChannel.configureBlocking(false); //将socketChannel注册到选择器中,并关注读事件。 socketChannel.register(selector, SelectionKey.OP_READ); } if(key.isValid()&&key.isReadable()){ System.out.println("有可读事件发生"); //可读事件完成 SocketChannel socketChannel=(SocketChannel)key.channel(); //创建Buffer缓存 ByteBuffer buffer=ByteBuffer.allocate(1024);//指定容量创建 //将数据通过channel读取到buffer StringBuilder stringBuilder=new StringBuilder(); int count=0; while ((count=socketChannel.read(buffer))>0) { //读写模式的切换,因为上面写完,指针在后面,需要将其放到最开始去读 buffer.flip(); //将数据读到byte数组 byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg = new String(bytes); stringBuilder.append(msg); //直接写操作 //清空缓存 buffer.clear(); //向缓存写入数据 buffer.put((msg).getBytes()); //读写指针切换 buffer.flip(); //给客户端回复数据 socketChannel.write(buffer); buffer.clear(); } System.out.println("Client->send: "+stringBuilder.toString()); if(count<0){ socketChannel.close(); } } } //删除已选择集合中的键 iterator.remove(); System.out.println("集合中剩余事件个数: "+selector.selectedKeys().size()); } } catch (IOException e) { e.printStackTrace(); } } }
    客户端:
    package Internet.NIODemo; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; public class NIOClient { public static void main(String[] args) { try { //创建socketchannel实例 SocketChannel socketChannel = SocketChannel.open(); //将socketchannel实例设置为非阻塞 socketChannel.configureBlocking(false); //创建selector实例 Selector selector = Selector.open(); //连接服务端,因为前面设置的是非阻塞,所以调用这个方法会立即返回. //因此如果返回true就不需要放到select中 if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888))){ //将socketchannel实例注册到选择器中,并关注connect事件 socketChannel.register(selector, SelectionKey.OP_CONNECT); //感兴趣操作是否完成 selector.select(); System.out.println("有可接收事件发生"); Iterator<SelectionKey> iterator=selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key=iterator.next(); iterator.remove(); if(key.isValid()&&key.isConnectable()){ SocketChannel Channel=(SocketChannel)key.channel(); //如果连接成功则完成连接 if(Channel.isConnectionPending()) Channel.finishConnect(); System.out.println("客户端连接成功"); Channel.configureBlocking(false); //监听SocketChannel读事件。 Channel.register(selector,SelectionKey.OP_READ); } } } //设置buffer缓冲区 ByteBuffer buffer=ByteBuffer.allocate(1024); //创建控制台实例 Scanner scanner=new Scanner(System.in); while(true){ System.out.println("请输入"); String msg=scanner.nextLine(); //客户端结束条件 if("exit".equals(msg)) break; //向缓存写入数据 buffer.put(msg.getBytes()); buffer.flip(); //发送数据 socketChannel.write(buffer); //清空缓存 buffer.clear(); //监听读事件,没有则阻塞 selector.select(); socketChannel.read(buffer); //读写模式切换 buffer.flip(); //通过缓冲区有效元素大小确定接收数组大小 byte [] bytes=new byte[buffer.remaining()]; //从缓存读取数据 buffer.get(bytes); String msg1=new String(bytes); System.out.println("recv "+msg); buffer.clear(); } socketChannel.close(); selector.close(); System.out.println("客户端关闭"); } catch (IOException e) { e.printStackTrace(); } } }

    NIO+线程池网络通信实例

    服务端:

    package Internet.NIODemo; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.Executor; import java.util.concurrent.Executors; class Run implements Runnable{ private Selector selector; SocketChannel socketChannel; public Run(Selector selector,SocketChannel socketChannel){ this.selector=selector; this.socketChannel=socketChannel; } @Override public void run() { try { socketChannel.configureBlocking(false); socketChannel.register(selector,SelectionKey.OP_READ); while(selector.select()>0){ System.out.println("有关注事件发生"); Iterator<SelectionKey> iterator=selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key=iterator.next(); iterator.remove(); if(key.isReadable()){ System.out.println("有可读事件发生"); SocketChannel socketChannel=(SocketChannel)key.channel(); //创建buffer实例,一般创建1024大小,这里写成2是为了测试 ByteBuffer buffer=ByteBuffer.allocate(2); buffer.clear(); int count=0; StringBuilder s=new StringBuilder(); //使用缓冲区将客户端发送的数据提取完毕。 while((count=socketChannel.read(buffer))>0){ socketChannel.read(buffer); System.out.println("循环接收"); buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg = new String(bytes); s.append(msg); buffer.clear(); buffer.put(msg.getBytes()); buffer.flip(); socketChannel.write(buffer); buffer.clear(); } System.out.println(s.toString()); //关闭读写通道 if(count<0){ socketChannel.close(); } } } } }catch (IOException e){ } } } public class NIOServerDemo { public static void main(String[] args) { try { Selector selector=Selector.open(); ServerSocketChannel ssc=ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.register(selector,SelectionKey.OP_ACCEPT); //ServerSocket serverSocket=ssc.socket(); //serverSocket.bind(new InetSocketAddress(8888)); //可以直接绑定,也可以使用ServerSocket进行绑定 ssc.bind(new InetSocketAddress(8888)); System.out.println("客户端启动成功"); //创建线程池 Executor executor=Executors.newFixedThreadPool(10); while(selector.select()>0){ System.out.println("有关注事件发生"); Iterator<SelectionKey> iterator=selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key=iterator.next(); iterator.remove(); if(key.isAcceptable()){ System.out.println("有可接收事件发生"); ServerSocketChannel serverSocketChannel=(ServerSocketChannel)key.channel() ; //得到SocketChannel实例 SocketChannel socketChannel=serverSocketChannel.accept(); //将socketChannel实例交给子线程去处理。 executor.execute(new Run(Selector.open(),socketChannel)); } } } } catch (IOException e) { e.printStackTrace(); } } }

    客户端:

    package Internet.NIODemo; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; public class NIOClient { public static void main(String[] args) { try { //创建socketchannel实例 SocketChannel socketChannel = SocketChannel.open(); //将socketchannel实例设置为非阻塞 socketChannel.configureBlocking(false); //创建selector实例 Selector selector = Selector.open(); //连接服务端,因为前面设置的是非阻塞,所以调用这个方法会立即返回. //因此如果返回true就不需要放到select中 if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888))){ //将socketchannel实例注册到选择器中,并关注connect事件 socketChannel.register(selector, SelectionKey.OP_CONNECT); //感兴趣操作是否完成 selector.select(); System.out.println("有可接收事件发生"); Iterator<SelectionKey> iterator=selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key=iterator.next(); iterator.remove(); if(key.isValid()&&key.isConnectable()){ SocketChannel Channel=(SocketChannel)key.channel(); //如果连接成功则完成连接 if(Channel.isConnectionPending()) Channel.finishConnect(); System.out.println("客户端连接成功"); Channel.configureBlocking(false); //监听SocketChannel读事件。 Channel.register(selector,SelectionKey.OP_READ); } } } //设置buffer缓冲区 ByteBuffer buffer=ByteBuffer.allocate(1024); //创建控制台实例 Scanner scanner=new Scanner(System.in); while(true){ System.out.println("请输入"); String msg=scanner.nextLine(); //客户端结束条件 if("exit".equals(msg)) break; //向缓存写入数据 buffer.put(msg.getBytes()); buffer.flip(); //发送数据 socketChannel.write(buffer); //清空缓存 buffer.clear(); //监听读事件,没有则阻塞 selector.select(); socketChannel.read(buffer); //读写模式切换 buffer.flip(); //通过缓冲区有效元素大小确定接收数组大小 byte [] bytes=new byte[buffer.remaining()]; //从缓存读取数据 buffer.get(bytes); String msg1=new String(bytes); System.out.println("recv "+msg); buffer.clear(); } socketChannel.close(); selector.close(); System.out.println("客户端关闭"); } catch (IOException e) { e.printStackTrace(); } } }

    内存映射文件:

    内存映射文件IO是一种读和写文件数据的方法,它可以比常规的基于流或者基于通道的IO快得多。 向内存映射文件写入可能是危险的,只是改变数组的单个元素这样的简单操作,就可以会直接修改磁盘上的文件。修改数据与将数据保存到磁盘是没有分开的。 下面代码将文件的前1024个字节映射到文件中,map()方法返回一个MappedByteBuffer,它是ByteBuffer的子类。因此,可以像其它任何ByteBuffer一样使用新映射的缓冲区,操作系统会在需要的时候负责映射。

    MappedByteBuffer mbb=fc.map(FileChannel.MapMode.READ_WRITE,0,1024);
    最新回复(0)