Hadoop2.X中使用RPC

    xiaoxiao2025-10-13  5

    1.1 协议

    import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.VersionedProtocol; public interface MyProtocol extends VersionedProtocol { public static final long versionID = 1L; public Text test(Text t); } 1.2 客户端

    import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; public class MyClient { public MyProtocol myProtocol; public MyClient() throws IOException { InetSocketAddress addr = new InetSocketAddress("localhost", 8888); myProtocol = (MyProtocol) RPC.waitForProxy(MyProtocol.class, 1, addr, new Configuration()); } public void sendClientMsg() { System.out.println("I am client,waiting server's data..."); Text result = myProtocol.test(new Text("message from client")); System.out.println(result.toString()); } public static void main(String[] args) throws IOException { MyClient client = new MyClient(); client.sendClientMsg(); } } 1.3 服务端

    import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; public class MyServer implements MyProtocol { public void initialize() throws InterruptedException, IOException { // numHandlers表示服务器端处理请求的线程数目 Server server = new RPC.Builder(new Configuration()) .setProtocol(MyProtocol.class).setInstance(this) .setBindAddress("localhost").setPort(8888).setNumHandlers(5) .build(); server.start(); } // 获取自定义的协议版本号 @Override public long getProtocolVersion(String protocol,long clientVersion) throws IOException { return MyProtocol.versionID; } // 获取协议签名 @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException { return new ProtocolSignature(MyProtocol.versionID, null); } @Override public Text test(Text t) { if ("message from client".equals(t.toString())) return new Text("data from server"); return new Text("who are you"); } public static void main(String[] args) throws InterruptedException, IOException { MyServer server = new MyServer(); server.initialize(); } }

    相关资源:python入门教程(PDF版)
    最新回复(0)