一种搭建分布式测试环境和批量性能测试的思路

    xiaoxiao2022-07-16  138

    背景

      在搜索引擎的测试过程中,经常会遇到以下两个问题:

      ● 需要搭建和更新分布式测试环境

      ● 在性能测试时,我们需要测试不同集群规模和配置下的环境时,如何自动更新测试环境和批量进行性能测试

      因此,我们需要设计一个脚本,这个脚本可以帮我来完成这些事。

      在这里,我推荐使用Python,理由有:

      ● 写起来比较快(测试时间本来就比较紧张),不可能用C或者Java了

      ● 语法比较清晰,Shell、Perl这些维护起来太乱

      ● 自带的库、第三方的库比较丰富

      ● 另外,我个人比较喜欢Python的mako模版引擎和paramikossh2库。

      其实不用paramiko也可以,只要把机器ssh打通就可以。但我个人不太喜欢这种方式,觉得耦合性太强(只能在Linux下运行了)。

      设计

      批量性能测试的设计

      我很喜欢采用YAML格式,YAML格式的一大好处就是可以很方便的定义List、Map等类型

    tasks:  # 第一个测试用例,我可能需要测试单线程的情况  -     id:1# ID的作用是你在脚本中可以拿id作为结果存放的目录     parallelNum:1# 并发数     seconds:1800# 压半个小时     targetHost:10.20.137.22 # 目标主机     targetPort:9999     queryFilePath:/home/admin/access-log/add-600w.query  # 请求放在这儿  # 第2个测试用例,我可能需要测试2线程的情况,这时我就只要再写一个,然后parallelNum: 2就可以了  -     id:1     parallelNum:2     seconds:1800     targetHost:10.20.137.22     targetPort:9999     queryFilePath:/home/admin/access-log/add-600w.query

      在阿里的搜索平台这边,我们大多使用abench作为性能测试工具,它是一个命令行工具,只要命令+参数就可以了,比起JMeter要写JMeter脚本简单。因此,我再在配置文件中设计一下abench的命令格式。

      因为在运行命令中,有很多参数需要替换成上述测试用例设定的参数,因此需要采用模版引擎的方式。Python的模版引擎很多,我个人比较推荐mako。

    abenchPath:/opt/usr/bin/abench  # abench在哪儿? abenchCommand:"${abenchPath} -p ${parallelNum} -s ${seconds} -k --http -o /dev/null ${targetHost} ${targetPort} ${queryFilePath}"  配置文件设计好了,下面我们来写我们的Python脚本了(这里仅仅给出一些主要代码,大致明白意思就可以了)

    import subprocess from mako.template import Template import yaml  # 运行一个测试任务 def runTask(config, task):     runAbench(config, task)  def runAbench(config, task):      # 得到完成的abench运行命令      command = Template(config["abenchCommand"]).render(          abenchPath=config["abenchPath"],          parallelNum=task["parallelNum"],          seconds=task["seconds"],          targetHost=task["targetHost"],          targetPort=task["targetPort"],          queryFilePath=task["queryFilePath"],          )      pipe = subprocess.Popen(command,          stdin=subprocess.PIPE,          stdout=subprocess.PIPE,          stderr=subprocess.PIPE,          shell=True          )      # 读取abench的运行结果,因为可能得保存下来吧      result = pipe.stdout.read()      # 下面可能是保存结果什么的  if __name__ == "__main__":     config = yaml.load(file(configFile))     for task in config["tasks"]:        runTask(config, task)

      自动更新测试环境

      在我实际测试过程中,因为要更新的环境其实相当复杂,最多的时侯需要去10几台机器上做更新环境、停止/启动进程的操作。但我这里主要介绍思路,多一些机器和进程其实都一样。

      接着刚才的配置文件,我们只是在每一个task中设计了加压任务,但在加压前需要更新哪些环境没有涉及,按照阿里巴巴的ISearch架构,我 就启动一个一行两列的Searcher环境,2列Searcher上有一个Merger,然后再有一个clustermap来监控。

    abenchPath: /opt/usr/bin/abench  # abench在哪儿? abenchCommand: "${abenchPath} -p ${parallelNum} -s ${seconds} -k --http -o /dev/null ${targetHost} ${targetPort} ${queryFilePath}" # 关于Searcher的一些通用配置 searcher:     templateConfigFile: /home/admin/access-log/searcher_server.cfg  # 因为启动时的监听端口等信息需要从下面的运行任务中读取,因此这个也设计成一个模版文件     templateLogConfigFile: /home/admin/access-log/searcher_log.cfg     # 在Search机器上操作的命令     commands:         - "${searchRoot}/bin/is_searcher_server -c ${configFile} -l ${logConfigFile} -k stop > /dev/null 2>&1"         - "${searchRoot}/bin/is_searcher_server -c ${configFile} -l ${logConfigFile} -k start -d > /dev/null 2>&1" # 关于Merger的一些通用配置,和Searcher差不多,就不写了  tasks:  # 第一个测试用例,我可能需要测试单线程的情况  -     id: 1 # ID的作用是你在脚本中可以拿id作为结果存放的目录     parallelNum: 1 # 并发数     seconds: 1800 # 压半个小时     targetHost: 10.20.137.22 # 目标主机     targetPort: 9999     queryFilePath: /home/admin/access-log/add-600w.query  # 请求放在这儿      # 两台Search机器,定义一个List     searchers:        -          host: 10.20.150.61          port: 6322 # 监听的端口          username: test # 因为需要通过ssh协议登录上去操作,因此需要用户名密码。如果你已经把机器ssh都打通了,那就不需要了          password: 12345          configFile: "${searchRoot}/scripts/conf/searcher_server.cfg" # 启动时运行的配置文件          logConfigFile: "${searchRoot}/scripts/conf/searcher_log.cfg" # 启动时运行的日志文件        -          host: 10.20.150.60          port: 6322          username: test          password: 12345          configFile: "${searchRoot}/scripts/conf/searcher_server.cfg"          logConfigFile: "${searchRoot}/scripts/conf/searcher_log.cfg"      # 我这边只有一台merger,如果merger也是有多台的话,也可以把这个设计成一个List     merger:        host: 10.20.137.22        port: 6088        username: test        password: 12345        configFile: "${searchRoot}/scripts/conf/merger_server.cfg" 然后比如关于Searcher的配置文件,在上面也是一个模版文件阿,我们可以把这个文件设计成:

    se_conf_file=${searchRoot}/scripts/conf/se.conf simon_conf_path=${searchRoot}/scripts/conf/simon_searcher.xml sort_config=${searchRoot}/scripts/conf/searcher_sort.xml cache_size=0 cache_min_doc=0 conn_queue_limit=500 [services] tcp ${port} # 主要就是为了替换监听的端口,其实要做得通用一点的话,很多配置都可以搞成变量,但就是可能你自己的配置文件变得很复杂。因此我们能不改的就尽量不改。  [clustermap] local_config_path=${searchRoot}/scripts/conf/clustermap.xml

      上述就是关于searcher和merger多行多列的配置,下面我们完善一下我们刚才的Python脚本

    # 得的一个ssh登录后的client对象,用于调用远程机器上的命令 def getClient(host, port, username, password):     client = paramiko.SSHClient()     client.load_system_host_keys()     client.set_missing_host_key_policy(paramiko.WarningPolicy()     client.connect(hostname, port, username, password)     return client  # 得到一个sftp对象,因为需要scp渲染好的配置文件什么的,因此需要sftp对象,它的put方法其实就类似scp def getSftp(host, port, username, password):     transport = paramiko.Transport((hostname, port))     transport.connect(username=username, password=password)     sftp = paramiko.SFTPClient.from_transport(transport)     return sftp  # 更新和部署Searchers def cleanSearchers(config, searchers):     for searcher in searchers:         # 得到渲染好的配置文件的内容         templateLine = Template(file(config["searcher"]["templateConfigFile"]).read()).render(             port=searcher["port"],             searchRoot=config["searchRoot"]             )         # 将渲染好的配置文件写入一个临时文件         tmpConfigFile = tempfile.NamedTemporaryFile(delete=False)         tmpConfigFile.file.write(templateLine)         tmpConfigFile.file.close()         # 将这个临时文件scp拷远程机器上的哪儿         targetConfigFile = Template(searcher["configFile"]).render(searchRoot=config["searchRoot"])         sftp = getSftp(searcher["host"], 22, searcher["username"], searcher["password"])         sftp.put(tmpConfigFile.name, targetConfigFile)         sftp.close()         # 删除掉之前的临时文件         os.remove(tmpConfigFile.name)         # 运行启动searcher的命令         client = getClient(searcher["host"], 22, searcher["username"], searcher["password"])         for command in config["searcher"]["commands"]:             command = Template(command).render(                 searchRoot=config["searchRoot"],                 configFile=targetConfigFile,                 logConfigFile=targetLogConfigFile                 )             client.exec_command(cmd)         client.close()  关于clustermap的配置

      在阿里巴巴的ISearch架构中,searchers几行几列是由clustermap来配置的,我们这边也稍微简单话一点,不考虑 merger有多台的情况,就设计searchers几行几列的情况。更新一下刚才在task中的配置,加上关于clustermap的配置

         clustermap:        host: 10.20.137.22        username: test        password: 12345        configFile: "${searchRoot}/scripts/conf/clustermap.xml"        # 一台merge        merger:          host: 10.20.137.22          port: 6088        # 关于searcher的配置,其实是一个二维数组。第一个纬度是列,第2个纬度是行。以下这个例子是1列2行        searchers:          -            servers: # 同一列下的机器              -                host: 10.20.150.61                port: 6322          -            servers:              -                host: 10.20.150.60                port: 6322

      上述是1列2行的例子,如果要配成2行2列就只要在searchers部分配成:

           searchers:          -            servers: # 同一列下的机器              -                host: 10.20.150.61                port: 6322              -                host: 10.20.150.59                port: 6322          -            servers:              -                host: 10.20.150.60                port: 6322              -                host: 10.20.150.62                port: 6322  然后为了迎合clustermap配置的这种设计,在clustermap的模版配置文件也需要改一下:

    <?xml version="1.0"?> <clustermap>     <!-- 关于Merger的配置,这里我暂时不考虑merger多台的情况 -->     <merger_list>             <merger_cluster name=m1 level=1>                 <merger ip=${merger["host"]} port=${merger["port"]} protocol=http/>             </merger_cluster>     </merger_list>  <!-- 下面是searcher的多行行列的配置,是一个二维数组 --> <search_list> <% id = 1 # 这个值是纪录searcher列的名字 %> <!-- 第一个纬度,同一列的 --> % for searcher in searchers: <search_cluster name=c${id} docsep=false level=1 partition=0>     <!-- 第二个纬度,同一行的 -->     % for server in searcher["servers"]:     <search ip=${server["host"]} port=${server["port"]} protocol=tcp type=mix />     % endfor </search_cluster>     <%     id += 1     %> % endfor </search_list>      <merger_cluster_list>             <merger_cluster name=m1>                 % for i in range(1, id):                 <search_cluster name=c${i} />                 % endfor             </merger_cluster>     </merger_cluster_list> </clustermap>

      这样比如1行2列渲染出来成了:

    <?xml version="1.0"?> <clustermap>     <merger_list>             <merger_cluster name=m1 level=1>                 <merger ip=10.20.137.22 port=6088 protocol=http/>             </merger_cluster>     </merger_list>  <search_list> <search_cluster name=c1 docsep=false level=1 partition=0>     <search ip=10.20.150.60 port=6322 protocol=tcp type=mix /> </search_cluster> <search_cluster name=c1 docsep=false level=1 partition=0>     <search ip=10.20.150.61 port=6322 protocol=tcp type=mix /> </search_cluster> </search_list>      <merger_cluster_list>             <merger_cluster name=m1>                 <search_cluster name=1 />             </merger_cluster>     </merger_cluster_list> </clustermap>

      总结

      上述就是我在测试中,对分布式环境的自动更新和批量性能测试,这样大大减少了我们来回捣固机器、修改配置的时间。而且对测试结果的自动收集和解析也可以帮助我们来分析测试结果。我觉得这是一个不错的尝试,大家可以都可以试试看。

    ====================================分割线================================

    最新内容请见作者的GitHub页:http://qaseven.github.io/

    相关资源:性能测试中“用户数”的概念浅析
    最新回复(0)