仿hadoop RPC机制的代码实现

    xiaoxiao2022-07-12  149

    以下给出了所有的代码,除了protoc编译出的协议类的代码(这两个类代码太多

     

    了),包的组织方式同hadoop rpc一样,服务端启动时运行CalculatorService.java。功能很简单,就是实

     

    现加减法,但采用了类似hadoop rpc的机制,麻雀虽小,五脏俱全

     

    1,Server类,仿hadoop rpc的server实现

    package com.zxf.rpc.server;

     

    import java.io.IOException;

    import java.net.*;

    import java.nio.ByteBuffer;

    import java.nio.channels.SelectionKey;

    import java.nio.channels.Selector;

    import java.nio.channels.ServerSocketChannel;

    import java.nio.channels.SocketChannel;

    import java.util.Collections;

    import java.util.HashMap;

    import java.util.Iterator;

    import java.util.Map;

    import java.util.Random;

    import java.util.concurrent.BlockingQueue;

    import java.util.concurrent.LinkedBlockingQueue

    import com.zxf.rpc.proto.Calculator.CalculatorService;

    import com.zxf.rpc.proto.CalculatorMsg.CalRequest;

    import com.zxf.rpc.proto.CalculatorMsg.CalResponse;

    import com.google.protobuf.*;

    import com.google.protobuf.Descriptors.MethodDescriptor;

    public class Server {

    private Class protocol;

    private BlockingService impl;

    private int port;

    private ServerSocket ss;

    private boolean running = true;

    private Map map = Collections

    .synchronizedMap(new HashMap());

    private Listener listener;

    private Responder responder;

    private ServerSocket serverSocket;

    private BlockingQueue calls = new LinkedBlockingQueue();

     

    public Server(Class protocol, BlockingService protocolImpl, int port)

    throws IOException {

    this.protocol = protocol;

    this.impl = protocolImpl;

    this.port = port;

    InetSocketAddress address = new InetSocketAddress("localhost", port);

    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

    serverSocketChannel.bind(address);

    serverSocket = serverSocketChannel.socket();

    serverSocketChannel.configureBlocking(false);

    listener = new Listener(serverSocketChannel, 1);

    responder = new Responder();

     

    }

     

    public void start() {

    listener.start();

    responder.start();

    }

     

    public byte[] processOneRpc(byte[] data) throws Exception {

    CalRequest request = CalRequest.parseFrom(data);

    String methodName = request.getMethodName();

    MethodDescriptor methodDescriptor = impl.getDescriptorForType()

    .findMethodByName(methodName);

    Message response = impl.callBlockingMethod(methodDescriptor, null,

    request);

    return response.toByteArray();

    }

     

    private class Listener extends Thread {

     

    private ServerSocketChannel serverSocketChannel = null;

    private Random random = new Random();

    private Reader[] readers;

    private int currentReader;

    private Selector listenerSelector;

     

    public Listener(ServerSocketChannel channel, int numReader)

    throws IOException {

    serverSocketChannel = channel;

    listenerSelector = Selector.open();

    serverSocketChannel.register(listenerSelector,

    SelectionKey.OP_ACCEPT);

    readers = new Reader[numReader];

    for (int i = 0; i < numReader; i++) {

    readers[i] = new Reader();

    readers[i].start();

    }

    }

     

    @Override

    public void run() {

    while (running) {

    try {

    listenerSelector.select();

    Iterator iter = listenerSelector

    .selectedKeys().iterator();

    while (iter.hasNext()) {

    SelectionKey selectionKey = (SelectionKey) 

     

    iter.next();

    iter.remove();

    if (selectionKey.isValid()) {

     

    if (selectionKey.isAcceptable()) {

    doAccept(selectionKey);

    }

    }

    selectionKey = null;

    }

     

    } catch (IOException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    }

    }

     

    private void doAccept(SelectionKey key) throws IOException {

    SocketChannel channel = ((ServerSocketChannel) key.channel())

    .accept();

    channel.configureBlocking(false);

     

    Reader reader = getReader();

    reader.startAdd();

    SelectionKey key2 = reader.registerChannel(channel);

    Connection connection = new Connection(channel);

    key2.attach(connection);

    reader.finishAdd();

     

    }

     

    private synchronized Reader getReader() {

    return readers[(currentReader++) % readers.length];

    }

     

    private class Reader extends Thread {

    private Selector readSelector;

    private boolean adding;

     

    public Reader() throws IOException {

    readSelector = Selector.open();

    }

     

    @Override

    public void run() {

    while (running) {

    try {

    readSelector.select();

    synchronized (this) {

    while (adding) {

    this.wait(1000);

    }

    }

     

    Iterator iterator = 

     

    readSelector

    .selectedKeys().iterator();

    while (iterator.hasNext()) {

    SelectionKey selectionKey = 

     

    (SelectionKey) iterator

    .next();

    iterator.remove();

    if (selectionKey.isValid()) {

     

    if 

     

    (selectionKey.isReadable()) {

    doRead

     

    (selectionKey);

    }

    }

    selectionKey = null;

    }

    } catch (Exception e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

     

    }

    }

     

    private void doRead(SelectionKey key) throws IOException {

    Connection connection = (Connection) key.attachment();

    int count = connection.readAndProcess();

    if (-1 == count) {

    key.cancel();

    System.out.println

     

    ("===============================");

    }

     

    }

     

    public void startAdd() {

    adding = true;

    readSelector.wakeup();

    }

     

    public synchronized void finishAdd() {

    adding = false;

    this.notify();

    }

     

    public SelectionKey registerChannel(SocketChannel channel)

    throws IOException {

    return channel.register(readSelector, 

     

    SelectionKey.OP_READ);

    }

    }

    }

     

    private class Call {

    private Connection connection;

    byte[] messageData;

    long time;

    Message message;

     

    public Call(Connection connection, Message message, long time) {

    super();

    this.connection = connection;

    this.message = message;

    this.time = time;

    }

     

    public byte[] getMessageData() {

    return messageData;

    }

     

    public Message getMessage() {

    return message;

    }

     

    public long getTime() {

    return time;

    }

     

    }

     

    private class Client {

    Socket clientSocket;

    InetAddress host;

    int port;

    private Connection connection;

     

    public Client(Socket socket) {

    this.clientSocket = socket;

    this.host = socket.getInetAddress();

    this.port = socket.getPort();

     

    }

     

    public InetAddress getHost() {

    return host;

    }

     

    public int getPort() {

    return port;

    }

     

    public Socket getClientSocket() {

    return clientSocket;

    }

     

    public void setConnection(Connection connection) {

    this.connection = connection;

    }

     

    public Connection getConnection() {

    return connection;

    }

     

     

    }

     

    private class Connection {

     

    private ByteBuffer length = ByteBuffer.allocate(4);

    private ByteBuffer data;

    private SocketChannel channel;

     

    public Connection(SocketChannel channel) {

    this.channel = channel;

    }

     

    int readAndProcess() throws IOException {

    while (true) {

    int count = -1;

    try {

    if (length.hasRemaining()) {

    count = channel.read(length);

    if (count < 0 || length.hasRemaining()) {

    return count;

    }

    }

    if (null == data) {

    data = ByteBuffer.allocate(length.getInt

     

    (0));

    }

     

    if (data.hasRemaining()) {

    count = channel.read(data);

    }

    } catch (IOException e) {

    return -1;

    }

     

    if (!data.hasRemaining()) {

    length.clear();

    CalRequest calRequest = CalRequest.parseFrom

     

    (data.array());

    data = null;

    Call call = new Call(this, calRequest,

    System.currentTimeMillis());

    try {

    calls.put(call);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

     

    }

     

    return count;

    }

     

    }

    }

     

    private class Handler extends Thread {

    }

    private class Responder extends Thread {

    @Override

    public void run() {

     

    while (running) {

    try {

    Call call = calls.take();

    System.out.println(call.getMessage());

    CalRequest request = (CalRequest) call.getMessage

     

    ();

    MethodDescriptor methodDescriptor = 

     

    CalculatorService

    .getDescriptor().findMethodByName(

     

    request.getMethodName());

    CalResponse response = (CalResponse) impl

    .callBlockingMethod

     

    (methodDescriptor, null, request);

    byte[] data = response.toByteArray();

    ByteBuffer length = ByteBuffer.allocate(4);

    ByteBuffer databuf = ByteBuffer.allocate

     

    (data.length);

    length.putInt(data.length);

    length.flip();

    databuf.put(data);

    databuf.flip();

    SocketChannel channel = call.connection.channel;

    channel.write(length);

    channel.write(databuf);

    } catch (InterruptedException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    } catch (ServiceException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    } catch (IOException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    }

    }

    }

    }

     

    2 Calculator接口实现

    package com.zxf.rpc.api;

     

    public interface Calculator {

    public int add(int a,int b);

    public int subtraction(int a,int b);

    }

     

    3  CalculatorPB接口

    package com.zxf.rpc.api;

     

    import com.zxf.rpc.proto.Calculator.CalculatorService.BlockingInterface;

     

    public interface CalculatorPB extends BlockingInterface {

     

    }

    4 CalculatorPBClientImpl类

    package com.zxf.rpc.api.impl.pb.client;

     

    import java.io.DataInputStream;

    import java.io.DataOutputStream;

    import java.lang.reflect.InvocationHandler;

    import java.lang.reflect.Method;

    import java.lang.reflect.Proxy;

    import java.net.Socket;

    import com.google.protobuf.ServiceException;

    import com.zxf.rpc.api.Calculator;

    import com.zxf.rpc.api.CalculatorPB;

    import com.zxf.rpc.proto.CalculatorMsg.CalRequest;

    import com.zxf.rpc.proto.CalculatorMsg.CalResponse;

     

    public class CalculatorPBClientImpl implements Calculator {

     

    private CalculatorPB proxy;

     

    public CalculatorPBClientImpl() {

     

    proxy = (CalculatorPB) Proxy.newProxyInstance(

    CalculatorPB.class.getClassLoader(),

    new Class[] { CalculatorPB.class }, new Invoker());

     

    }

     

    public class Invoker implements InvocationHandler {

     

    @Override

    public Object invoke(Object proxy, Method method, Object[] args)

    throws Throwable {

    // TODO Auto-generated method stub

    Socket socket = new Socket("localhost", 8038);

    DataOutputStream out = new DataOutputStream(

    socket.getOutputStream());

    DataInputStream in = new DataInputStream(socket.getInputStream());

     

    byte[] bytes = ((CalRequest) args[1]).toByteArray();

    out.writeInt(bytes.length);

    out.write(bytes);

    out.flush();

    socket.shutdownOutput();

    // socket.close();

    int dataLen = in.readInt();

    byte[] data = new byte[dataLen];

    int count = in.read(data);

    CalResponse result = CalResponse.parseFrom(data);

    System.out.println(result);

     

    socket.close();

    return result;

    }

     

    }

     

    @Override

    public int add(int a, int b) {

    CalRequest.Builder builder = CalRequest.newBuilder();

    builder.setMethodName("add");

    builder.setNum1(a);

    builder.setNum2(b);

    CalRequest request = builder.build();

    CalResponse calResponse = null;

    try {

    calResponse = proxy.add(null, request);

    } catch (ServiceException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    return calResponse.getResult();

    }

     

    @Override

    public int subtraction(int a, int b) {

    CalRequest.Builder builder = CalRequest.newBuilder();

    builder.setMethodName("Subtraction");

    builder.setNum1(a);

    builder.setNum2(b);

    CalRequest request = builder.build();

    CalResponse calResponse = null;

    try {

    calResponse = proxy.add(null, request);

    } catch (ServiceException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    return calResponse.getResult();

    }

     

    }

    5 CalculatorPBServiceImpl类

    package com.zxf.rpc.api.impl.pb.service;

     

    import com.google.protobuf.RpcController;

    import com.google.protobuf.ServiceException;

    import com.zxf.rpc.api.Calculator;

    import com.zxf.rpc.api.CalculatorPB;

    import com.zxf.rpc.proto.CalculatorMsg.CalRequest;

    import com.zxf.rpc.proto.CalculatorMsg.CalResponse;

     

    public class CalculatorPBServiceImpl implements CalculatorPB {

    private Calculator real;

     

    public CalculatorPBServiceImpl(Calculator calculator) {

    this.real = calculator;

    }

     

    @Override

    public CalResponse add(RpcController controller, CalRequest request)

    throws ServiceException {

    // TODO Auto-generated method stub

    CalResponse.Builder builder = CalResponse.newBuilder();

    int num1 = request.getNum1();

    int num2 = request.getNum2();

    int result = real.add(num1, num2);

    builder.setResult(result);

     

    return builder.build();

    }

     

    @Override

    public CalResponse subtraction(RpcController controller, CalRequest request)

    throws ServiceException {

    // TODO Auto-generated method stub

    CalResponse.Builder builder = CalResponse.newBuilder();

    int num1 = request.getNum1();

    int num2 = request.getNum2();

    int result = real.subtraction(num1, num2);

    builder.setResult(result);

     

    return builder.build();

    }

     

    }

     

    6 CalculatorService类

    package com.zxf.service;

     

    import java.io.IOException;

     

    import com.google.protobuf.BlockingService;

    import com.zxf.rpc.api.Calculator;

    import com.zxf.rpc.api.impl.pb.service.CalculatorPBServiceImpl;

    import com.zxf.rpc.server.Server;

     

    public class CalculatorService implements Calculator {

     

    private Server server;

     

    public void start() throws IOException {

     

    CalculatorPBServiceImpl calculatorPBServiceImpl = new 

     

    CalculatorPBServiceImpl(

    this);

    BlockingService service = com.zxf.rpc.proto.Calculator.CalculatorService

    .newReflectiveBlockingService(calculatorPBServiceImpl);

     

    server = new Server(CalculatorPBServiceImpl.class.getInterfaces()[0],

    service, 8038);

    server.start();

     

    }

     

    @Override

    public int add(int a, int b) {

    return a + b;

    }

     

    @Override

    public int subtraction(int a, int b) {

    return a - b;

    }

     

    public static void main(String[] args) throws IOException {

    CalculatorService service = new CalculatorService();

    service.start();

    }

    }

     

    7 客户端测试类TestClient

    package com.zxf;

    import java.util.Random;

    import com.zxf.rpc.api.Calculator;

    import com.zxf.rpc.api.impl.pb.client.CalculatorPBClientImpl;

    public class TestClient {

    public static void main(String[] args) {

    final Calculator calculator = new CalculatorPBClientImpl();

    Random random = new Random(100);

    int num1 = random.nextInt(100);

    int num2 = random.nextInt(100);

    for (int i = 0; i < 5; i++) {

    calculator.add(num1, num2);

    num1 = random.nextInt(100);

    num2 = random.nextInt(100);

    }

    }

    }

    最新回复(0)