网络-NIO+线程池编程实例

    xiaoxiao2025-04-04  19

    概述:

    NIO主要想解决的是BIO的大并发问题:

    在使用同步IO的网络应用中,如果要同时和多个服务器进行通信,就必须使用多线程进行处理。也就是说,将每一个客户端请求分配给一个线程来单独处理。这样做虽然可以达到我们的要求,但是同时又会带来另外一个问题:由于每创建一个线程,就要为这个线程分配一定的内存空间(也叫工作存储器),而操作系统本身对线程的总数也会有一定的限制,如果客户端的请求过多,服务端程序可能因为不堪重负而拒绝客户端请求 ,服务器甚至也可能会因此而瘫痪。 NIO基于Reactor(反应器) ,当socket有流可读或者可写时,操作系统会响应通知应用程序进行处理,应用再将流读取到缓冲区或者写入操作系统。

    NIO服务器实现模式为

    所有请求一个线程,即客户端发送的连接请求都会被注册到多路复用器上,多路复用器轮询到连接有IO请求时才启动一个线程进行处理。并不是说所有的任务都是一个线程进行处理,有的任务可能业务逻辑较长,别的客户端请求可能无法被即时处理,这也是一种变相阻塞。我们可以采用NIO+线程池 的处理模式,一个线程专门用于客户端的连接,将别的操作作为任务提交给线程池。

    关于NIO具体释义请点击 Java中NIO详解(Buffer/Selector/Channel三大核心原理)

    线程池+NIO

    服务端:

    package Internet.NIODemo; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.Executor; import java.util.concurrent.Executors; class Run implements Runnable{ private Selector selector; SocketChannel socketChannel; public Run(Selector selector,SocketChannel socketChannel){ this.selector=selector; this.socketChannel=socketChannel; } @Override public void run() { try { socketChannel.configureBlocking(false); socketChannel.register(selector,SelectionKey.OP_READ); while(selector.select()>0){ System.out.println("有关注事件发生"); Iterator<SelectionKey> iterator=selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key=iterator.next(); iterator.remove(); if(key.isReadable()){ System.out.println("有可读事件发生"); SocketChannel socketChannel=(SocketChannel)key.channel(); //创建buffer实例,一般创建1024大小,这里写成2是为了测试 ByteBuffer buffer=ByteBuffer.allocate(2); buffer.clear(); int count=0; StringBuilder s=new StringBuilder(); //使用缓冲区将客户端发送的数据提取完毕。 while((count=socketChannel.read(buffer))>0){ socketChannel.read(buffer); System.out.println("循环接收"); buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg = new String(bytes); s.append(msg); buffer.clear(); buffer.put(msg.getBytes()); buffer.flip(); socketChannel.write(buffer); buffer.clear(); } System.out.println(s.toString()); //关闭读写通道 if(count<0){ socketChannel.close(); } } } } }catch (IOException e){ } } } public class NIOServerDemo { public static void main(String[] args) { try { Selector selector=Selector.open(); ServerSocketChannel ssc=ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.register(selector,SelectionKey.OP_ACCEPT); //ServerSocket serverSocket=ssc.socket(); //serverSocket.bind(new InetSocketAddress(8888)); //可以直接绑定,也可以使用ServerSocket进行绑定 ssc.bind(new InetSocketAddress(8888)); System.out.println("客户端启动成功"); //创建线程池 Executor executor=Executors.newFixedThreadPool(10); while(selector.select()>0){ System.out.println("有关注事件发生"); Iterator<SelectionKey> iterator=selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key=iterator.next(); iterator.remove(); if(key.isAcceptable()){ System.out.println("有可接收事件发生"); ServerSocketChannel serverSocketChannel=(ServerSocketChannel)key.channel() ; //得到SocketChannel实例 SocketChannel socketChannel=serverSocketChannel.accept(); //将socketChannel实例交给子线程去处理。 executor.execute(new Run(Selector.open(),socketChannel)); } } } } catch (IOException e) { e.printStackTrace(); } } }

    客户端:

    package Internet.NIODemo; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; public class NIOClient { public static void main(String[] args) { try { //创建socketchannel实例 SocketChannel socketChannel = SocketChannel.open(); //将socketchannel实例设置为非阻塞 socketChannel.configureBlocking(false); //创建selector实例 Selector selector = Selector.open(); //连接服务端,因为前面设置的是非阻塞,所以调用这个方法会立即返回. //因此如果返回true就不需要放到select中 if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888))){ //将socketchannel实例注册到选择器中,并关注connect事件 socketChannel.register(selector, SelectionKey.OP_CONNECT); //感兴趣操作是否完成 selector.select(); System.out.println("有可接收事件发生"); Iterator<SelectionKey> iterator=selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key=iterator.next(); iterator.remove(); if(key.isValid()&&key.isConnectable()){ SocketChannel Channel=(SocketChannel)key.channel(); //如果连接成功则完成连接 if(Channel.isConnectionPending()) Channel.finishConnect(); System.out.println("客户端连接成功"); Channel.configureBlocking(false); //监听SocketChannel读事件。 Channel.register(selector,SelectionKey.OP_READ); } } } //设置buffer缓冲区 ByteBuffer buffer=ByteBuffer.allocate(1024); //创建控制台实例 Scanner scanner=new Scanner(System.in); while(true){ System.out.println("请输入"); String msg=scanner.nextLine(); //客户端结束条件 if("exit".equals(msg)) break; //向缓存写入数据 buffer.put(msg.getBytes()); buffer.flip(); //发送数据 socketChannel.write(buffer); //清空缓存 buffer.clear(); //监听读事件,没有则阻塞 selector.select(); socketChannel.read(buffer); //读写模式切换 buffer.flip(); //通过缓冲区有效元素大小确定接收数组大小 byte [] bytes=new byte[buffer.remaining()]; //从缓存读取数据 buffer.get(bytes); String msg1=new String(bytes); System.out.println("recv "+msg); buffer.clear(); } socketChannel.close(); selector.close(); System.out.println("客户端关闭"); } catch (IOException e) { e.printStackTrace(); } } }
    最新回复(0)