因为通讯部门同事让我帮忙刷机40台,我应该的手动刷太慢了,等干等的时间太多,所有就写了这个半自动刷机程序图形化的代码,每进行一步操作,结束后都有弹框提醒,就不用傻傻等待
wx: 图形化界面 threading 多线程管理 os 系统调用 subprocess cmd打印回传参数 time 用为调用时间,及延时用 selenium 爬虫
moxa_gui.py
import wx from threading import Thread from moxa_main import * class MyFrame(wx.Frame): def __init__(self, *args, **kw): super(MyFrame, self).__init__(*args, **kw) self.SetSize((500, 450)) self.SetMaxSize((500, 450)) self.SetMinSize((500, 450)) self.SetTitle("MOXA刷机程序") # 初始化变量 self.file = "" self.ip = "http://192.168.127.253" # 面板设计 self._init_frame() # 事件绑定 self._func_event() self.Center() self.Show() def _init_frame(self): # 设置画板 pnl = wx.Panel(self) vb = wx.BoxSizer(wx.VERTICAL) hb1 = wx.BoxSizer(wx.HORIZONTAL) hb2 = wx.BoxSizer(wx.VERTICAL) hb3 = wx.BoxSizer(wx.VERTICAL) ###################################### # 第一层 配置文件 ###################################### text1 = wx.StaticText(pnl, wx.ID_ANY, u"配置文件") self.hb1_tc2 = wx.TextCtrl(pnl) self.hb1_button1 = wx.Button(pnl, label=u"打开") self.hb1_button2 = wx.Button(pnl, label=u"网络检测") hb1.AddMany([(text1, 0, wx.ALIGN_CENTER | wx.ALL, 5), (self.hb1_tc2, 1, wx.EXPAND | wx.ALL, 5), (self.hb1_button1, 0, wx.EXPAND | wx.ALL, 5), (self.hb1_button2, 0, wx.EXPAND | wx.ALL, 5),]) ###################################### # 第二层 测试区域 ###################################### box3 = wx.StaticBoxSizer(wx.StaticBox(pnl, 0, label="测试区域")) text4 = wx.StaticText(box3.GetStaticBox(), wx.ID_ANY, u"交换机IP:") self.hb2_tc5 = wx.TextCtrl(box3.GetStaticBox()) self.hb2_tc5.AppendText(self.ip) # 写入初始化IP地址 self.hb2_button2 = wx.Button(box3.GetStaticBox(), wx.ID_ANY, u"刷机", wx.DefaultPosition, wx.Size(80, 30), 0) self.hb2_button2.Disable() self.hb2_button3 = wx.Button(box3.GetStaticBox(), wx.ID_ANY, u"检查", wx.DefaultPosition, wx.Size(80, 30), 0) box3.AddMany([(text4, 0, wx.ALIGN_CENTER | wx.ALL, 5), (self.hb2_tc5, 1, wx.EXPAND | wx.ALL, 5), (self.hb2_button2, 0, wx.EXPAND | wx.ALL, 5), (self.hb2_button3, 0, wx.EXPAND | wx.ALL, 5), ]) hb2.AddMany([(box3, 0, wx.EXPAND | wx.ALIGN_CENTER | wx.ALL, 5), ]) ###################################### # 第三层 打印区域 ###################################### self.pt_text = wx.TextCtrl(pnl, style=wx.TE_MULTILINE) vb.AddMany([(hb1, 0, wx.EXPAND | wx.ALIGN_CENTER | wx.ALL, 5), (hb2, 0, wx.EXPAND | wx.ALIGN_CENTER | wx.ALL, 5), (self.pt_text, 1, wx.EXPAND | wx.ALIGN_CENTER | wx.ALL, 5)]) pnl.SetSizer(vb) self.CreateStatusBar() def _func_event(self): # 对事件绑定 self.hb1_button1.Bind(wx.EVT_BUTTON, self._open_file) self.hb1_button2.Bind(wx.EVT_BUTTON, self._ping_event) self.hb2_button2.Bind(wx.EVT_BUTTON, self._start_event) self.hb2_button3.Bind(wx.EVT_BUTTON, self._check_event) def _open_file(self, e): """ 选择固件升级的文件的事件 """ wildcard = "*.*" dlg = wx.FileDialog(self, "Choose a file", os.getcwd(), "", wildcard, wx.FD_OPEN) if dlg.ShowModal() == wx.ID_OK: file_name = dlg.GetFilename() file_path = dlg.GetDirectory() self.hb1_tc2.AppendText(file_name) self.file = os.path.join(file_path, file_name) dlg.Destroy() def _ping_event(self, e): # 网络检测的事件,并打开刷机的按键 ip = self.hb2_tc5.GetValue() ip = ip.split("//")[1] var = 1 self.hb2_button2.Enable() ping_thread = Thread(target=self._check_ping, args=(ip, var)) ping_thread.start() def _start_event(self, e): # 刷机的事件,并关闭刷机按键 ip = self.hb2_tc5.GetValue() self.hb2_button2.Disable() start_thread = Thread(target=self._start_moxa, args=(ip, self.file)) start_thread.start() def _check_event(self, e): # 检查的事件 ip = self.hb2_tc5.GetValue() check_thread = Thread(target=self._check_moxa, args=(ip,)) check_thread.start() def _start_moxa(self, ip, file): """ 刷机的函数体,先对文件传递进来的文件进行判断,而后调用函数进行刷机, 对交换机持续进行ping,网络通后,打印结束,退出 """ if not file: self.pt_text.AppendText(u"%s文件不存在或为空,请重新选择\n" % time_now()) return else: self.pt_text.Clear() web = moxa_info(self.ip) # 登陆 self.pt_text.AppendText(u"%s刷机开始\n"%time_now()) moxa_up(web, file) # 升级 ping_ip = ip.split("//")[1] for i in range(65): time.sleep(1) res = check_ip_ping(ping_ip) self.pt_text.AppendText("%d,"%i) if not res: self.pt_text.AppendText("\n交换机正在启动.......\n") break self._check_ping(ping_ip) self.pt_text.AppendText(u"%s刷机已经成功,请点击检查\n" % time_now()) def _check_moxa(self, ip): """ 检查的函数体,调用函数进行检查,并返回检查结果 :param ip: :return: """ web = moxa_info(ip) # 登陆 res = moxa_check(web) # 检查 if isinstance(res,int): text = "检查完毕,对应模块数量为%d\n"%res self.pt_text.AppendText(text) else: self.pt_text.AppendText("%s%s"%(time_now(),res)) def _check_ping(self, ip, var=0): """ 网络检查函数体,并返回结果 """ while True: time.sleep(1) res = check_ip_ping(ip) if res: self.pt_text.AppendText(u"%s网络重连成功\n" % time_now()) if var: wx.MessageBox(u"交换机重启成功,点击“刷机”", u"成功", wx.OK | wx.ICON_INFORMATION) else: wx.MessageBox(u"交换机重启成功,点击“检查”", u"成功", wx.OK | wx.ICON_INFORMATION) return else: self.pt_text.AppendText(u"%s网络重连中...\n" % time_now()) if __name__ == '__main__': app = wx.App() win = MyFrame(None) app.MainLoop()moxa_main.py
import os import subprocess import time from selenium import webdriver def moxa_info(ip): """ 初始化浏览器,并进行登陆操作 """ # 获取谷歌插件的位置 chr_path = os.path.join(os.getcwd(),"chromedriver") # executable_path用来指定插件的位置 driver = webdriver.Chrome(executable_path=chr_path) driver.get(ip) driver.find_element_by_id("Username").clear() driver.find_element_by_id("Username").send_keys("admin") driver.find_element_by_id("Password").clear() driver.find_element_by_id("Password").send_keys("moxa") driver.find_element_by_name("Submit").click() return driver def moxa_up(web, file_path): """ 进行文件上传和升级的操作 """ try: # 展开左侧网页 web.switch_to.frame(1) web.find_element_by_id("folder59").click() web.find_element_by_id("item62").click() # 进入右侧网页并上传文件 web.switch_to.parent_frame() web.switch_to.frame(2) web.find_element_by_name("filename").send_keys(file_path) web.find_element_by_name("Submit").click() except Exception as e: return "刷机失败,检查文件是否正确\n" else: web.quit() def moxa_check(web): """ 进行升级后参数的检查工作 :param web: :return: 参数存在时返回参数的值,否则报错提示 """ try: web.switch_to.parent_frame() web.switch_to.frame(1) web.find_element_by_id("folder7").click() web.find_element_by_xpath("//*[@id='folder10']/tbody/tr/td[1]/a[1]/img").click() web.find_element_by_id("item13").click() web.switch_to.parent_frame() web.switch_to.frame(2) web.find_element_by_xpath("//*[@id='turboRoaming_enable']").click() table = web.find_element_by_xpath("//*[@id='iw_turboRoaming_channels']") t_rows = table.find_elements_by_tag_name("tr") except Exception as e: return "指定位置无参数\n" else: web.quit() return len(t_rows) def check_ip_ping(ip): """ 进行ping交换机IP的操作,并对网络是否畅通返回值 :param ip: :return: 0:网络不通 1:网络畅通 """ cmd = "ping -n 1 " + ip response = subprocess.Popen(cmd, stdout=subprocess.PIPE) lines = response.stdout.readlines() res = lines[2].decode("gbk") if res.strip() == "请求超时。": return 0 else: res = res.split(":")[1].strip() if res == "无法访问目标主机。": return 0 else: return 1 def time_now(): """ :return:返回固定时间格式 如:<20:35:29> """ return time.strftime('<%H:%M:%S>', time.localtime()) if __name__ == '__main__': file = "D:\临时\moxa升级\附件三AWK_Build.rom" """ web = moxa_info() moxa_up(web, file) t = check_ip_ping() if t: web = moxa_info() moxa_check(web) else: print("超时未启动") exit() """本人的经验分享,希望可以帮助到你们,如何不对的地方,可以评论留言,帮我指正一下,如果帮助了你,请给我点个赞吧