example for NIO

    xiaoxiao2022-07-16  147

    NIO-Socket通讯,为我们解决了server端多线程设计方面的性能/吞吐量等多方面的问题,它提供了以非阻塞模式 + 线程池的方式来解决Server端高并发问题..NIO并不能显著的提升Client-server的通讯性能(其中包括全局性耗时总和,Server物理机资源开销和实际计算量),但是它可以确保Server端在支撑相应的并发量情况下,对物理资源的使用处于可控状态.对于开发者而言,NIO合理的使用了平台(OS/VM/Http协议)的特性并提供了高效的便捷的编程级别的API.

     

    为了展示,NIO交互的基本特性,我们模拟了一个简单的场景:Client端向server端建立连接,并持续交付大量数据,Server负载client的数据传输和处理.此程序实例并没有太多的关注异常处理和业务性处理,也没有使用线程池作为server端socket句柄管理,不过你可以简单的修改代码也实现它.

    TestMain.java:引导类ClientControllor.java:client连接处理类,负责队列化数据提交,并负责维护socket句柄.Packet.java:对于读取或者写入的buffer,进行二次封装,使其具有更好的可读性.ServerControllor.java:server端连接处理类,负责接收连接和数据处理ServerHandler.java:server端连接维护类.

    TestMain.java:

     

    Java代码   package com.test.web;      public class TestMain {        /**      * @param args      */      public static void main(String[] args) throws Exception{          int port = 30008;          ServerControllor sc = new ServerControllor(port);          sc.start();          Thread.sleep(2000);          ClientControllor cc = new ClientControllor("127.0.0.1", port);          cc.start();          Packet p1 = Packet.wrap("Hello,I am first!");          cc.put(p1);          Packet p2 = Packet.wrap("Hello,I am second!");          cc.put(p2);          Packet p3 = Packet.wrap("Hello,I am thread!");          cc.put(p3);        }    }  

     

     

    ClientControllor.java

     

     

    Java代码   package com.test.web;    import java.net.InetSocketAddress;  import java.net.SocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.SocketChannel;  import java.util.concurrent.BlockingQueue;  import java.util.concurrent.LinkedBlockingQueue;  import java.util.zip.Adler32;  import java.util.zip.Checksum;    public class ClientControllor {        private BlockingQueue<Packet> inner = new LinkedBlockingQueue<Packet>(100);//no any more      private Object lock = new Object();      private InetSocketAddress remote;      private Thread thread = new ClientThread(remote);      public ClientControllor(String host,int port){          remote = new InetSocketAddress(host, port);      }            public void start(){          if(thread.isAlive() || remote == null){              return;          }          synchronized (lock) {              thread.start();          }                              }      public boolean put(Packet packet){          return inner.offer(packet);      }            public void clear(){          inner.clear();      }            class ClientThread extends Thread {          SocketAddress remote;          SocketChannel channel;          ClientThread(SocketAddress remote){              this.remote = remote;          }          @Override          public void run(){              try{                  try{                      channel = SocketChannel.open();                      channel.configureBlocking(true);                      boolean isSuccess = channel.connect(new InetSocketAddress(30008));                      if(!isSuccess){                          while(!channel.finishConnect()){                              System.out.println("Client is connecting...");                          }                      }                      System.out.println("Client is connected.");  //                  Selector selector = Selector.open();  //                  channel.register(selector, SelectionKey.OP_WRITE);  //                  while(selector.isOpen()){  //                      selector.select();  //                      Iterator<SelectionKey> it = selector.selectedKeys().iterator();  //                      while(it.hasNext()){  //                          SelectionKey key = it.next();  //                          it.remove();  //                          if(!key.isValid()){  //                              continue;  //                          }  //                          if(key.isWritable()){  //                              write();  //                          }  //                      }  //                  }                      while(channel.isOpen()){                          write();                      }                  }catch(Exception e){                      e.printStackTrace();                  }finally{                      if(channel != null){                          try{                              channel.close();                          }catch(Exception ex){                              ex.printStackTrace();                          }                      }                  }              }catch(Exception e){                  e.printStackTrace();                  inner.clear();              }          }                    private void write() throws Exception{              Packet packet = inner.take();              synchronized (lock) {                  ByteBuffer body = packet.getBuffer();//                  ByteBuffer head = ByteBuffer.allocate(4);                  head.putInt(body.limit());                  head.flip();                  while(head.hasRemaining()){                      channel.write(head);                  }                  Checksum checksum = new Adler32();                  while(body.hasRemaining()){                      checksum.update(body.get());                  }                  body.rewind();                  while(body.hasRemaining()){                      channel.write(body);                  }                  long cks = checksum.getValue();                  ByteBuffer tail = ByteBuffer.allocate(8);                  tail.putLong(cks);                  tail.flip();                  while(tail.hasRemaining()){                      channel.write(tail);                  }              }                        }      }  }  

     

     

    Handler.java(接口,面向设计):

     

    Java代码   package com.test.web;    import java.nio.channels.SocketChannel;    public interface Handler {        public void handle(SocketChannel channel);  }  

     

     

    Packet.java

     

    Java代码   package com.test.web;    import java.io.Serializable;  import java.nio.ByteBuffer;  import java.nio.charset.Charset;    public class Packet implements Serializable {        /**      *       */      private static final long serialVersionUID = 7719389291885063462L;            private ByteBuffer buffer;            private static Charset charset = Charset.defaultCharset();            private Packet(ByteBuffer buffer){          this.buffer = buffer;      }            public String getDataAsString(){          return charset.decode(buffer).toString();      }            public byte[] getData(){          return buffer.array();      }            public ByteBuffer getBuffer(){          return this.buffer;      }                  public static Packet wrap(ByteBuffer buffer){          return new Packet(buffer);      }            public static Packet wrap(String data){          ByteBuffer source = charset.encode(data);          return new Packet(source);      }  }  

     

     

    ServerControllor.java

     

    Java代码   package com.test.web;    import java.net.InetSocketAddress;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.ServerSocketChannel;  import java.nio.channels.SocketChannel;  import java.util.Iterator;    public class ServerControllor {      private int port;      private Thread thread = new ServerThread();;      private Object lock = new Object();      public ServerControllor(){          this(0);      }      public ServerControllor(int port){          this.port = port;      }            public void start(){          if(thread.isAlive()){              return;          }          synchronized (lock) {              thread.start();              System.out.println("Server starting....");          }      }                  class ServerThread extends Thread {          private static final int TIMEOUT = 3000;          private ServerHandler handler = new ServerHandler();          @Override          public void run(){              try{                  ServerSocketChannel channel = null;                  try{                      channel = ServerSocketChannel.open();                      channel.configureBlocking(false);                      channel.socket().setReuseAddress(true);                      channel.socket().bind(new InetSocketAddress(port));                      Selector selector = Selector.open();                      channel.register(selector, SelectionKey.OP_ACCEPT);                      while(selector.isOpen()){                          System.out.println("Server is running,port:" + channel.socket().getLocalPort());                          if(selector.select(TIMEOUT) == 0){                              continue;                          }                          Iterator<SelectionKey> it = selector.selectedKeys().iterator();                          while(it.hasNext()){                              SelectionKey key = it.next();                              it.remove();                              if(!key.isValid()){                                  continue;                              }                              if(key.isAcceptable()){                                  accept(key);                              }else if(key.isReadable()){                                  read(key);                              }                          }                      }                  }catch(Exception e){                      e.printStackTrace();                  }finally{                      if(channel != null){                          try{                              channel.close();                          }catch(Exception ex){                              ex.printStackTrace();                          }                      }                  }              }catch(Exception e){                  e.printStackTrace();              }          }                    private void accept(SelectionKey key) throws Exception{              SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();              socketChannel.configureBlocking(true);              //socketChannel.register(key.selector(), SelectionKey.OP_READ);              handler.handle(socketChannel);          }                    private void read(SelectionKey key) throws Exception{              SocketChannel channel = (SocketChannel)key.channel();              //handler.handle(channel);          }      }  }  

     

     

    ServerHandler.java

     

     

    Java代码   package com.test.web;    import java.nio.ByteBuffer;  import java.nio.channels.SocketChannel;  import java.util.HashMap;  import java.util.Map;  import java.util.concurrent.Semaphore;  import java.util.zip.Adler32;  import java.util.zip.Checksum;    class ServerHandler implements Handler {        private static Semaphore semaphore = new Semaphore(Runtime.getRuntime().availableProcessors() + 1);            private static Map<SocketChannel,Thread> holder = new HashMap<SocketChannel,Thread>(32);            @Override      public void handle(SocketChannel channel) {          synchronized (holder) {              if(holder.containsKey(channel)){                  return;              }              Thread t = new ReadThread(channel);              holder.put(channel, t);              t.start();          }      }                  static class ReadThread extends Thread{          SocketChannel channel;          ReadThread(SocketChannel channel){              this.channel = channel;          }          @Override          public void run(){              try{                  semaphore.acquire();                  boolean eof = false;                  while(channel.isOpen()){                      //ByteBuffer byteBuffer = new ByteBuffer(1024);                      ByteBuffer head = ByteBuffer.allocate(4);//int for data-size                      while(true){                          int cb = channel.read(head);                          if(cb == -1){                              throw new RuntimeException("EOF error,data lost!");                          }                          if(isFull(head)){                              break;                          }                      }                      head.flip();                      int dataSize = head.getInt();                      if(dataSize <= 0){                          throw new RuntimeException("Data format error,something lost???");                      }                      ByteBuffer body = ByteBuffer.allocate(dataSize);                      while(true){                          int cb = channel.read(body);                          if(cb == -1){                              throw new RuntimeException("EOF error,data lost!");                          }else if(cb == 0 && this.isFull(body)){                              break;                          }                      }                      ByteBuffer tail = ByteBuffer.allocate(8);//int for data-size                      while(true){                          int cb = channel.read(tail);                          if(cb == -1){                              eof = true;                          }                          if(isFull(tail)){                              break;                          }                      }                      tail.flip();                      long sck = tail.getLong();                      Checksum checksum = new Adler32();                      checksum.update(body.array(), 0, dataSize);                      long cck = checksum.getValue();                      if(sck != cck){                          throw new RuntimeException("Sorry,some data lost or be modified,please check!");                      }                      body.flip();                      Packet packet = Packet.wrap(body);                      System.out.println(packet.getDataAsString());                      if(eof){                          break;                      }                  }              }catch(Exception e){                  e.printStackTrace();              }finally{                  if(channel != null){                      try{                          channel.close();                      }catch(Exception ex){                          ex.printStackTrace();                      }                  }                  holder.remove(channel);                  semaphore.release();              }          }                    private boolean isFull(ByteBuffer byteBuffer){              return byteBuffer.position() == byteBuffer.capacity() ? true : false;          }      }    }  

     

     原文链接:[http://wely.iteye.com/blog/2227865]

    最新回复(0)