一 程序要求: 1 主机间通信用rabbitmq,客户端收到的执行结果放在redis队列里,用一个对应的序列号取出命令结果 2 多主机同时执行命令并返回结果 二 程序运行需求: 1 需要搭建rabbitmq及redis服务器 2 所有待连接主机及客户端必须能连接这两个服务:rabbit服务器需要新建用户名密码,用以连接。redis服务器需要取消外部访问限制 3 程序启动命令样例: python rpc多主机操作.py run “ifconfig” --host 192.168.163.1 192.168.163.128 看到命令序列号后即可输入序列号获得命令结果 4 两个服务起来以后的状态: 三 程序源码 client
#!/usr/bin/env python #--author lisheng-- import pika,uuid,gevent,redis,time,sys r = redis.Redis(host="192.168.163.128",port=6379) #连接redis服务器 class feibonaqi(object): def __init__(self): #credentials = pika.PlainCredentials() self.connet = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #连接rabbitmq服务器 self.channel = self.connet.channel() resault = self.channel.queue_declare(queue="",exclusive=True) #定义一个随机队列 self.callback_queue = resault.method.queue self.channel.basic_consume(queue=self.callback_queue,on_message_callback=self.no_message) #没有auto_ack=True,就得在callb函数最后加上channel1.basic_ack(delivery_tag=method.delivery_tag) 用于通知发布者端,可以删除消息了 def no_message(self,ch,method,prop,body): """ 当服务器端发回来的uuid随机数是原先客户端发去的随机数时,会将最终结果付给self.response :param ch: 有链接进来后,运行结果会付给callback函数,即on_message函数,这是其中的一个值 :param method:有链接进来后,运行结果会付给callback函数,即on_message函数,这是其中的一个值 :param prop:有链接进来后,运行结果会付给callback函数,即on_message函数,这是其中的一个值 :param body:返回的执行结果 :return: """ if self.uuidc == prop.correlation_id: self.respounse = body.decode() self.channel.basic_ack(delivery_tag=method.delivery_tag) #主动发callback函数已执行完成,服务器端已经可以删除消息了 def call(self,comm,queue_n): """ 没有收到服务器返回的执行结果时。本端会将要执行的命令发布给由服务器ip地址命名的队列,以便相应的主机能在自己监听的队列里收到要执行的命令 :param comm: :param queue_n: :return: """ self.respounse = None self.uuidc = str(uuid.uuid4()) self.channel.basic_publish(exchange="", routing_key=queue_n, properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.uuidc), body=str(comm) ) while self.respounse is None: self.connet.process_data_events() #没有收到服务器返回的执行结果,这里会循环查看有没有收到结果,不会像channel1.start_consuming()那样卡在哪里 self.ind = str(time.time())[14:] #用当前时间戳的后四位为命令结果命名 r.set(self.ind,self.respounse) #如果收到结果,就会将结果保存在redis服务器里 print("这是命令序列号,输入该号码得到命令%s %s结果:" %(comm,queue_n),feibo.ind) def resau(): """ 查询命令结果 :return: """ while True: commond = input(">>>") if commond == "q":break elif commond.isdigit(): if r.get(commond):print(r.get(commond).decode()) # python rpc多主机操作.py run "netstat -n" --host 192.168.163.1 192.168.163.128 #命令样例 b = sys.argv[2] c = sys.argv[4:] for qu in c: #有多少主机就开通多少携程,同时从多主机上获得命令结果 feibo = feibonaqi() gevent.joinall([gevent.spawn(feibo.call,b,qu)]) resau()server
#!/usr/bin/env python #--author lisheng-- import pika,os class feibonaqi_s(object): def __init__(self): #credentials = pika.PlainCredentials("lisheng", "1234") self.connet = pika.BlockingConnection(pika.ConnectionParameters("localhost")) self.channel = self.connet.channel() self.channel.queue_declare(queue="192.168.163.1") def fei(self,n): """ 斐波那契计算函数,此处只是为了调试,没有使用 :param n: :return: """ if n == 0:return 0 elif n == 1:return 1 else:return self.fei(n-1) + self.fei(n-2) def com(self,comm): """ 根据命令结果,返回相应的字符串 :param comm: 要执行的命令 :return: """ res = os.popen(str(comm)).read() print(comm) print(res) if res: return res else: return "none" def request(self,ch,meth,prop,body): """ 将命令结果返回给客户端 :param ch: 有链接进来后,运行结果会付给callback函数,即on_message函数,这是其中的一个值 :param meth: :param prop: :param body: :return: """ res = self.com(body.decode()) self.channel.basic_publish( exchange="", routing_key=prop.reply_to, body=res, properties=pika.BasicProperties(correlation_id=prop.correlation_id) #将收到的随机数列返回给客户端,以便其确认返回结果的来源 ) self.channel.basic_ack(delivery_tag=meth.delivery_tag) #主动发callback函数已执行完成,客户端已经可以删除消息了 fei_s = feibonaqi_s() fei_s.channel.basic_consume(on_message_callback=fei_s.request,queue="192.168.163.1") fei_s.channel.start_consuming()