其次是请求对象的处理部分 package com.shock.rpc; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** * ${DESCRIPTION} * com.shock.rpc.${CLASS_NAME} * Created by zhengdong.lzd on 2016/11/29 0029. */ public class RpcHandler { ConcurrentHashMap<String, Object> registered = new ConcurrentHashMap<String, Object>(128); public RpcResponse handler(RpcCommand commond) { String className = commond.getClassName(); RpcResponse response = new RpcResponse(); try { Object obj = registered.get(className); String[] argTypes = commond.getArgumetsType(); Class aClass = Class.forName(className); List<Class> argsTypeList = new ArrayList<Class>(argTypes.length); for (String s : argTypes) { argsTypeList.add(Class.forName(s)); } Method method = aClass.getMethod(commond.getMethodName(), argsTypeList.toArray(new Class[argsTypeList.size()])); Object object = method.invoke(obj, commond.getParams()); response.setResult(object); } catch (Exception e) { e.printStackTrace(); response.setException(true); response.setException(e); } return response; } public void regist(Class interfa, Object object) { registered.put(interfa.getName(), object); } }
代码里面只有很粗暴的反射实现第三个是服务端启动和服务端协议处理代码
package com.shock.rpc; import com.shock.rpc.demo.IDemoImpl; import com.shock.rpc.demo.IDemoInterface; import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * ${DESCRIPTION} * com.shock.rpc.${CLASS_NAME} * Created by zhengdong.lzd on 2016/11/29 0029. */ public class RpcServer { int port; public RpcServer(int port, RpcHandler handler) { this.port = port; this.handler = handler; } RpcHandler handler; ExecutorService executorService = Executors.newFixedThreadPool(20); public void start() { try { ServerSocket serverSocket = new ServerSocket(port); while (true) { Socket socket = serverSocket.accept(); executorService.submit(new WorkThread(socket)); } } catch (IOException e) { e.printStackTrace(); } } public class WorkThread implements Runnable { Socket socket; WorkThread(Socket socket) { this.socket = socket; } @Override public void run() { try { InputStream inputStream = socket.getInputStream(); OutputStream outputStream = socket.getOutputStream(); while (true) { int magic = inputStream.read(); //魔数 if (magic == 0x5A) { //两个字节用来计算长度数据长度,服务传送的数据过大可能会出现截断问题 int length1 = inputStream.read(); int length2 = inputStream.read(); int length = (length1 << 8) + length2; ByteArrayOutputStream bout = new ByteArrayOutputStream(length); int sum = 0; byte[] bs = new byte[length]; while (true) { int readLength = inputStream.read(bs, 0, length - sum); if (readLength > 0) { bout.write(bs, 0, readLength); sum += readLength; } if (sum >= length) { break; } } ObjectInputStream objectInputStream = new ObjectInputStream( new ByteArrayInputStream(bout.toByteArray())); try { RpcCommand commond = (RpcCommand) objectInputStream.readObject(); RpcResponse response = handler.handler(commond); ByteArrayOutputStream objectout = new ByteArrayOutputStream(length); ObjectOutputStream objectOutputStream = new ObjectOutputStream(objectout); objectOutputStream.writeObject(response); objectOutputStream.flush(); byte[] commondBytes = objectout.toByteArray(); int len = commondBytes.length; outputStream.write(0x5A); outputStream.write(len >> 8); outputStream.write(len & 0x00FF); outputStream.write(commondBytes); outputStream.flush(); } catch (Exception e) { e.printStackTrace(); } } } } catch (IOException e) { e.printStackTrace(); System.out.println("和客户端连接断开了"); } finally { if (socket != null) { try { socket.close(); } catch (Exception e) { e.printStackTrace(); } } } } } public static void main(String[] args) { RpcHandler rpcHandler = new RpcHandler(); rpcHandler.regist(IDemoInterface.class, new IDemoImpl()); RpcServer servcer = new RpcServer(8081, rpcHandler); servcer.start(); } }代码实现也很简单,就是根据前面说的传输报文协议读取传输的报文,反序列化出请求对象RpcCommand,给处理类进行处理,如果做好兼容加上版本和不同协议的话,可以增加不同的处理实现。
最后是客户端传输和协议处理代码 package com.shock.rpc; import com.shock.rpc.demo.IDemoInterface; import java.io.*; import java.net.InetSocketAddress; import java.net.Socket; /** * ${DESCRIPTION} * com.shock.rpc.${CLASS_NAME} * Created by zhengdong.lzd on 2016/11/29 0029. */ public class RpcClient { String host; int port; Socket socket; InputStream inputStream; OutputStream outputStream; public RpcClient(String host, int port) { try { socket = new Socket(); socket.connect(new InetSocketAddress(host, port)); inputStream = socket.getInputStream(); outputStream = socket.getOutputStream(); } catch (IOException e) { e.printStackTrace(); } } //这个不能并发请求,否则会出现数据流乱的情况 public synchronized RpcResponse invoke(RpcCommand commond) { RpcResponse response = new RpcResponse(); try { ByteArrayOutputStream objectout = new ByteArrayOutputStream(); ObjectOutputStream objectOutputStream = new ObjectOutputStream(objectout); objectOutputStream.writeObject(commond); objectOutputStream.flush(); byte[] commondBytes = objectout.toByteArray(); outputStream.write(0x5A); int len = commondBytes.length; outputStream.write(len >> 8); outputStream.write(0x00FF & len); outputStream.write(commondBytes); outputStream.flush(); while (true) { int magic = inputStream.read(); if (magic == 0x5A) { int length1 = inputStream.read(); int length2 = inputStream.read(); int length = (length1 << 8) + length2; ByteArrayOutputStream bout = new ByteArrayOutputStream(length); int sum = 0; byte[] bs = new byte[length]; while (true) { int readLength = inputStream.read(bs, 0, length - sum); if (readLength > 0) { bout.write(bs, 0, readLength); sum += readLength; } if (sum >= length) { break; } } ObjectInputStream objectInputStream = new ObjectInputStream( new ByteArrayInputStream(bout.toByteArray())); RpcResponse response1 = (RpcResponse) objectInputStream.readObject(); return response1; } } } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } return response; } public static void main(String[] args) { RpcClient client = new RpcClient("localhost", 8081); RpcCommand command = new RpcCommand(); command.setClassName(IDemoInterface.class.getName()); command.setMethodName("noArgument"); command.setArgumetsType(new String[0]); RpcResponse response = client.invoke(command); RpcCommand command2 = new RpcCommand(); command2.setClassName(IDemoInterface.class.getName()); command2.setMethodName("withReturn"); command2.setArgumetsType(new String[] { "java.lang.String" }); command2.setParams(new String[] { "shocklee" }); RpcResponse response2 = client.invoke(command2); System.out.println(response.getResult()); System.out.println(response2.getResult()); } }至此整个框架部分已经完成,暂时还没有做常见的rpc客户端api包装,比如包装成从某个容器里面根据接口取出一个远程对象,直接调用远程对象的方法。 最后贴个测试类和接口 package com.shock.rpc.demo; /** * ${DESCRIPTION} * com.shock.rpc.demo.${CLASS_NAME} * Created by zhengdong.lzd on 2016/11/29 0029. */ public interface IDemoInterface { public String withReturn(String name); public void noReturn(String name); public String noArgument(); } package com.shock.rpc.demo; /** * ${DESCRIPTION} * com.shock.rpc.demo.${CLASS_NAME} * Created by zhengdong.lzd on 2016/11/29 0029. */ public class IDemoImpl implements IDemoInterface { @Override public String withReturn(String name) { System.out.println("withReturn "+name); return "hello " + name; } @Override public void noReturn(String name) { System.out.println("noReturn "+name); } @Override public String noArgument() { System.out.println("noArgument"); return "noArgument"; } }
整个RPC功能已经都贴出来了,代码没有做过整理,没有把序列化/反序列代码抽象,协议部分也没做抽象,只是想写的快点,能够在短时间内写出来和标题十分钟对应上,所以可能代码难看点,不过整体已经展示出来了,关键代码是不需要使用任何第三方框架和工具包的。 欢迎大家进行拍砖。 另外打个广告吧,本人写了一个稍微复杂点的RPC放在github上,有木有同学想一起进行写着玩的的,赶紧约起啊,代码地址是 https://github.com/shocklee6315/simpleRpcServer
