自定义一个RPC框架

    xiaoxiao2022-07-13  175

    目录

    一、用到的技术:

    1.socket通信

    2.反射

    3.动态代理(jdk动态代理)

    4.线程池

    二、架构: 

    1.client:客户端,传递要调用的接口名给服务端,在服务端执行完成返回结果给客户端。原理是在客户端使用一个动态代理,可动态的切换要调用的接口和方法。

    2.客户端要调用的接口服务:接口服务都到ServiceCenter服务中心中册,注册后客户端只需要到ServiceCenter调用服务即可。

    3.servciceCenter:服务中心。内部维护一个map,用于保存接口服务。key是接口名,value是接口实现类。通过接口名可以获取接口实现类,然后执行相应方法,得到执行结果,返回给客户端即可。

     三、项目工程结构:

    四、具体实现:

    1.remote.procedure.call.client包

    1.1Client

    2.remote.procedure.call.server包

    2.1.HelloService 

    2.2.HelloServiceImpl

    2.3.Server

    2.4.ServerCenter

    3.remote.procedure.call.test包

    3.1.RPCServerTest

    3.2.RPCClientTest


    一、用到的技术:

    1.socket通信

    2.反射

    3.动态代理(jdk动态代理)

    4.线程池

    二、架构: 

    1.client:客户端,传递要调用的接口名给服务端,在服务端执行完成返回结果给客户端。原理是在客户端使用一个动态代理,可动态的切换要调用的接口和方法。

    2.客户端要调用的接口服务:接口服务都到ServiceCenter服务中心中册,注册后客户端只需要到ServiceCenter调用服务即可。

    3.servciceCenter:服务中心。内部维护一个map,用于保存接口服务。key是接口名,value是接口实现类。通过接口名可以获取接口实现类,然后执行相应方法,得到执行结果,返回给客户端即可。

     三、项目工程结构:

    四、具体实现:

    1.remote.procedure.call.client包

    1.1Client

    package remote.procedure.call.client; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket; public class Client { //serviceName:请求的接口名 //addr:待请求服务端的ip:端口 @SuppressWarnings("unchecked") public static <T>T getRemoteProxyObj(Class serviceInteface,InetSocketAddress addr){ //newProxyInteface(a,b,c) /** * a:类加载器:需要代理那个类(例如HelloService接口),就需要将HelloService的类加载器传如第一个参数 * b:需要代理的对象,具备那些功能 --这里写接口 * 为什么是数组:因为java中单继承,多实现,可能有多个接口 * 类似字符串数组:String[] str=new String[]{"aa","bb"} * c:handlerIntecepter */ return (T)Proxy.newProxyInstance( serviceInteface.getClassLoader(), new Class<?>[] {serviceInteface}, new InvocationHandler() { /** * a:proxy:代理的对象 * b:method:代理对象的具体方法 * c:args:方法中的参数数组 */ @Override public Object invoke(Object proxy, Method method, Object[] args){ Socket socket =new Socket(); ObjectOutputStream oos = null; ObjectInputStream ois = null; try { socket.connect(addr); oos = new ObjectOutputStream(socket.getOutputStream()) ; //发送接口名和方法名:字符串类型 oos.writeUTF(serviceInteface.getName()); oos.writeUTF(method.getName()); //发送方法参数类型和方法参数 oos.writeObject(method.getParameterTypes()); System.out.println("client要调用的参数类型是:"+method.getParameterTypes()); oos.writeObject(args); //等待服务端处理... //接收服务端处理后的返回值 ois = new ObjectInputStream(socket.getInputStream()); Object result = ois.readObject(); return result; }catch(Exception e) { e.printStackTrace(); return null; }finally { try { if(oos != null)oos.close(); if(ois != null)ois.close(); } catch (Exception e) { e.printStackTrace(); } } } }); } }

    2.remote.procedure.call.server包

    2.1.HelloService 

    package remote.procedure.call.server; public interface HelloService { public abstract String sayHi(String name); }

    2.2.HelloServiceImpl

    package remote.procedure.call.server; public class HelloServiceImpl implements HelloService{ @Override public String sayHi(String name) { return "hi "+name; } }

    2.3.Server

    package remote.procedure.call.server; import java.io.IOException; public interface Server { public void start(); public void stop(); public void register(Class service,Class serviceImpl); }

    2.4.ServerCenter

    package remote.procedure.call.server; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.imageio.spi.RegisterableService; @SuppressWarnings("unchecked") public class ServerCenter implements Server{ //map:服务端的所有可供客户端访问的接口,都注册到该map中 private static Map<String, Class> serviceRegister = new HashMap<String, Class>(); private static int port; //连接池:连接池中存在多个链接对象,每个连接对象都可以处理一个客户请求 private static ExecutorService executor = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors()); private static boolean isRunning = false; public ServerCenter(int port) { this.port = port; } //开启服务端服务 @Override public void start(){ ServerSocket server = null; try { server = new ServerSocket(); server.bind(new InetSocketAddress(port)); } catch (IOException e) { e.printStackTrace(); } isRunning = true; while(true) { //客户端每请求一个链接,则服务端从连接池中获取一个线程对象去处理 Socket socket = null; System.out.println("start server..."); try { socket = server.accept();//等待客户端连接 } catch (IOException e) { e.printStackTrace(); } executor.execute(new ServiceTask(socket)); } } @Override public void stop() { isRunning = false; executor.shutdown(); } @Override public void register(Class service,Class serviceImpl) { serviceRegister.put(service.getName(), serviceImpl); System.out.println("注册了方法:"+service.getName()); } //线程处理 private static class ServiceTask implements Runnable{ private Socket socket; public ServiceTask() { } public ServiceTask(Socket socket) { this.socket = socket; } @Override public void run() {//线程所作的事情 System.out.println("开启了一个新线程..."); ObjectOutputStream oos =null; ObjectInputStream ois = null; try { //接受客户端连接及请求,处理该请求 ois= new ObjectInputStream(socket.getInputStream()); //ObjectInputStream对接受顺序有严格要求 String serviceName = ois.readUTF(); String methodName = ois.readUTF(); Class[] parameterTypes = (Class[]) ois.readObject(); Object[] args = (Object[]) ois.readObject(); //根据客户请求找到具体的接口,如果没有注册,则注册一个 Class serviceClass = serviceRegister.get(serviceName); Method method = serviceClass.getMethod(methodName, parameterTypes); Object result=method.invoke(serviceClass.newInstance(), args); oos = new ObjectOutputStream(socket.getOutputStream()); oos.writeObject(result); }catch(Exception e) { e.printStackTrace(); } finally { try { if(oos != null)oos.close(); if(ois != null)ois.close(); } catch (IOException e) { e.printStackTrace(); } } } } }

    3.remote.procedure.call.test包

    3.1.RPCServerTest

    package remote.procedure.call.test; import remote.procedure.call.server.HelloService; import remote.procedure.call.server.HelloServiceImpl; import remote.procedure.call.server.Server; import remote.procedure.call.server.ServerCenter; public class RPCServerTest { public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { Server server = new ServerCenter(9999); server.register(HelloService.class, HelloServiceImpl.class); server.start(); } }).start(); } }

    3.2.RPCClientTest

    package remote.procedure.call.test; import java.net.InetSocketAddress; import remote.procedure.call.client.Client; import remote.procedure.call.server.HelloService; public class RPCClientTest { public static void main(String[] args) throws ClassNotFoundException { HelloService service = Client.getRemoteProxyObj( Class.forName("remote.procedure.call.server.HelloService"), new InetSocketAddress("127.0.0.1",9999)); System.out.println(service.sayHi("小明")); } }

     

    最新回复(0)