概述:
NIO主要想解决的是BIO的大并发问题:
在使用同步IO的网络应用中,如果要同时和多个服务器进行通信,就必须使用多线程进行处理。也就是说,将每一个客户端请求分配给一个线程来单独处理。这样做虽然可以达到我们的要求,但是同时又会带来另外一个问题:由于每创建一个线程,就要为这个线程分配一定的内存空间(也叫工作存储器),而操作系统本身对线程的总数也会有一定的限制,如果客户端的请求过多,服务端程序可能因为不堪重负而拒绝客户端请求 ,服务器甚至也可能会因此而瘫痪。 NIO基于Reactor(反应器) ,当socket有流可读或者可写时,操作系统会响应通知应用程序进行处理,应用再将流读取到缓冲区或者写入操作系统。
NIO服务器实现模式为
所有请求一个线程,即客户端发送的连接请求都会被注册到多路复用器上,多路复用器轮询到连接有IO请求时才启动一个线程进行处理。并不是说所有的任务都是一个线程进行处理,有的任务可能业务逻辑较长,别的客户端请求可能无法被即时处理,这也是一种变相阻塞。我们可以采用NIO+线程池 的处理模式,一个线程专门用于客户端的连接,将别的操作作为任务提交给线程池。
关于NIO具体释义请点击 Java中NIO详解(Buffer/Selector/Channel三大核心原理)
线程池+NIO
服务端:
package Internet
.NIODemo
;
import java
.io
.IOException
;
import java
.net
.InetSocketAddress
;
import java
.net
.ServerSocket
;
import java
.nio
.ByteBuffer
;
import java
.nio
.channels
.SelectionKey
;
import java
.nio
.channels
.Selector
;
import java
.nio
.channels
.ServerSocketChannel
;
import java
.nio
.channels
.SocketChannel
;
import java
.util
.Iterator
;
import java
.util
.concurrent
.Executor
;
import java
.util
.concurrent
.Executors
;
class Run implements Runnable{
private Selector selector
;
SocketChannel socketChannel
;
public Run(Selector selector
,SocketChannel socketChannel
){
this.selector
=selector
;
this.socketChannel
=socketChannel
;
}
@Override
public void run() {
try {
socketChannel
.configureBlocking(false);
socketChannel
.register(selector
,SelectionKey
.OP_READ
);
while(selector
.select()>0){
System
.out
.println("有关注事件发生");
Iterator
<SelectionKey> iterator
=selector
.selectedKeys().iterator();
while(iterator
.hasNext()){
SelectionKey key
=iterator
.next();
iterator
.remove();
if(key
.isReadable()){
System
.out
.println("有可读事件发生");
SocketChannel socketChannel
=(SocketChannel
)key
.channel();
ByteBuffer buffer
=ByteBuffer
.allocate(2);
buffer
.clear();
int count
=0;
StringBuilder s
=new StringBuilder();
while((count
=socketChannel
.read(buffer
))>0){
socketChannel
.read(buffer
);
System
.out
.println("循环接收");
buffer
.flip();
byte[] bytes
= new byte[buffer
.remaining()];
buffer
.get(bytes
);
String msg
= new String(bytes
);
s
.append(msg
);
buffer
.clear();
buffer
.put(msg
.getBytes());
buffer
.flip();
socketChannel
.write(buffer
);
buffer
.clear();
}
System
.out
.println(s
.toString());
if(count
<0){
socketChannel
.close();
}
}
}
}
}catch (IOException e
){
}
}
}
public class NIOServerDemo {
public static void main(String
[] args
) {
try {
Selector selector
=Selector
.open();
ServerSocketChannel ssc
=ServerSocketChannel
.open();
ssc
.configureBlocking(false);
ssc
.register(selector
,SelectionKey
.OP_ACCEPT
);
ssc
.bind(new InetSocketAddress(8888));
System
.out
.println("客户端启动成功");
Executor executor
=Executors
.newFixedThreadPool(10);
while(selector
.select()>0){
System
.out
.println("有关注事件发生");
Iterator
<SelectionKey> iterator
=selector
.selectedKeys().iterator();
while(iterator
.hasNext()){
SelectionKey key
=iterator
.next();
iterator
.remove();
if(key
.isAcceptable()){
System
.out
.println("有可接收事件发生");
ServerSocketChannel serverSocketChannel
=(ServerSocketChannel
)key
.channel() ;
SocketChannel socketChannel
=serverSocketChannel
.accept();
executor
.execute(new Run(Selector
.open(),socketChannel
));
}
}
}
} catch (IOException e
) {
e
.printStackTrace();
}
}
}
客户端:
package Internet
.NIODemo
;
import java
.io
.IOException
;
import java
.net
.InetSocketAddress
;
import java
.nio
.ByteBuffer
;
import java
.nio
.channels
.SelectionKey
;
import java
.nio
.channels
.Selector
;
import java
.nio
.channels
.SocketChannel
;
import java
.util
.Iterator
;
import java
.util
.Scanner
;
public class NIOClient {
public static void main(String
[] args
) {
try {
SocketChannel socketChannel
= SocketChannel
.open();
socketChannel
.configureBlocking(false);
Selector selector
= Selector
.open();
if (!socketChannel
.connect(new InetSocketAddress("127.0.0.1", 8888))){
socketChannel
.register(selector
, SelectionKey
.OP_CONNECT
);
selector
.select();
System
.out
.println("有可接收事件发生");
Iterator
<SelectionKey> iterator
=selector
.selectedKeys().iterator();
while(iterator
.hasNext()){
SelectionKey key
=iterator
.next();
iterator
.remove();
if(key
.isValid()&&key
.isConnectable()){
SocketChannel Channel
=(SocketChannel
)key
.channel();
if(Channel
.isConnectionPending())
Channel
.finishConnect();
System
.out
.println("客户端连接成功");
Channel
.configureBlocking(false);
Channel
.register(selector
,SelectionKey
.OP_READ
);
}
}
}
ByteBuffer buffer
=ByteBuffer
.allocate(1024);
Scanner scanner
=new Scanner(System
.in
);
while(true){
System
.out
.println("请输入");
String msg
=scanner
.nextLine();
if("exit".equals(msg
))
break;
buffer
.put(msg
.getBytes());
buffer
.flip();
socketChannel
.write(buffer
);
buffer
.clear();
selector
.select();
socketChannel
.read(buffer
);
buffer
.flip();
byte [] bytes
=new byte[buffer
.remaining()];
buffer
.get(bytes
);
String msg1
=new String(bytes
);
System
.out
.println("recv "+msg
);
buffer
.clear();
}
socketChannel
.close();
selector
.close();
System
.out
.println("客户端关闭");
} catch (IOException e
) {
e
.printStackTrace();
}
}
}