《ELK Stack权威指南 》第2章 插件配置

    xiaoxiao2024-03-31  111

    本节书摘来自华章出版社《ELK Stack权威指南 》一书中的第1章,第2节,作者饶琛琳,更多章节内容可以访问云栖社区“华章计算机”公众号查看。

    插 件 配 置

    插件是Logstash最大的特色。各种不同的插件源源不断地被创造出来,发布到社区中供大家使用。本章会按照插件的类别,对一般场景下的一些常用插件做详细的配置和用例介绍。本章介绍的插件包括:1)输入插件。基于shipper端场景,主要介绍STDIN、TCP、File等插件。2)编解码插件。编解码通常是会被遗忘的环节,但是运用好了,会大大提高工作效率,本节介绍最常用的JSON和multiline插件。3)过滤器插件。名为过滤器,其实各种数据裁剪和计算都可以在这类插件里完成,是Logstash最强大的一环。本节会详细介绍grok、date、mutate、ruby、metrics等插件的妙用。4)输出插件。Logstash虽然经常跟Elasticsearch并称,但是作为一个日志传输框架,它其实可以输出数据到各种不同的地方。比如Graphite、HDFS、Nagios等等。本章会介绍这些常用的输出插件用法。

    2.1 输入插件

    在“Hello World”示例中,我们已经见到并介绍了Logstash的运行流程和配置的基础语法。从这章开始,我们就要逐一介绍Logstash流程中比较常用的一些插件,并在介绍中针对其主要适用的场景、推荐的配置,作一些说明。

    限于篇幅,接下来内容中,配置示例不一定能贴完整。请记住一个原则:Logstash配置一定要有一个input和一个output。在演示过程中,如果没有写明input,默认就会使用“Hello World”里我们已经演示过的logstash-input-stdin,同理,没有写明的output就是logstash-output-stdout。

    以上请读者自明。

    2.1.1 标准输入

    我们已经见过好几个示例使用stdin了。这也应该是Logstash里最简单和基础的插件了。所以,在这段中,我们先介绍一些未来每个插件都会有的一些方法。

    配置示例如下:

    input {

        stdin {

            add_field => {"key" =>"value"}

            codec =>"plain"

            tags => ["add"]

            type =>"std"

        }

    }

    用上面的新stdin设置重新运行一次最开始的Hello World示例。我建议大家把整段配置都写入一个文本文件,然后运行命令:bin/logstash -f stdin.conf。输入“Hello World”并回车后,你会在终端看到如下输出:

    {

    "message" =>"hello world",

    "@version" =>"1",

    "@timestamp" =>"2014-08-08T06:48:47.789Z",

    "type" =>"std",

    "tags" => [

            [0] "add"

        ],

    "key" =>"value",

    "host" =>"raochenlindeMacBook-Air.local"

    }

    type和tags是Logstash事件中两个特殊的字段。通常来说,我们会在“输入区段”中通过type来标记事件类型—我们肯定是提前能知道这个事件属于什么类型的。而tags则是在数据处理过程中,由具体的插件来添加或者删除的。

    最常见的用法是像下面这样:

    input {

        stdin {

            type =>"web"

        }

    }

    filter {

        if [type] == "web" {

            grok {

                match => ["message", %{COMBINEDAPACHELOG}]

            }

        }

    }

    output {

        if "_grokparsefailure" in [tags] {

            nagios_nsca {

                nagios_status =>"1"

            }

        } else {

            elasticsearch {

            }

        }

    }

    看起来蛮复杂的,对吧?

    继续学习,你也可以写出来的。

    2.1.2 文件输入

    分析网站访问日志应该是一个运维工程师最常见的工作了。所以我们先学习一下怎么用Logstash来处理日志文件。

    Logstash使用一个名叫FileWatch的Ruby Gem库来监听文件变化。这个库支持glob展开文件路径,而且会记录一个叫.sincedb的数据库文件来跟踪被监听日志文件的当前读取位置。所以,不要担心Logstash会漏过你的数据。

    sincedb文件中记录了每个被监听的文件的inode, major number, minor number和pos。

    配置示例如下:

    input {

        file {

            path => ["/var/log/*.log", "/var/log/message"]

            type =>"system"

            start_position =>"beginning"

        }

    }

    有一些比较有用的配置项,可以用来指定FileWatch库的行为:

    discover_interval:Logstash每隔多久去检查一次被监听的path下是否有新文件,默认值是15秒。

    exclude:不想被监听的文件可以排除出去,这里跟path一样支持glob展开。

    sincedb_path:如果你不想用默认的$HOME/.sincedb(Windows平台上为%USERPROFILE%\.sincedb,该变量默认值是:C:\Windows\System32\config\systemprofile),可以通过这个配置定义sincedb文件到其他位置。

    sincedb_write_interval:Logstash每隔多久写一次sincedb文件,默认是15秒。

    stat_interval:Logstash每隔多久检查一次被监听文件状态(是否有更新),默认是1秒。

    start_position:Logstash从什么位置开始读取文件数据,默认是结束位置,也就是说,Logstash进程会以类似tail-F的形式运行。如果你是要导入原有数据,把这个设定改成"beginning",Logstash进程就从头开始读取,有点类似于less+F命令的形式运行。

    close_older:一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3 600秒,即一小时。

    ignore_older:在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是86 400秒,即一天。

    注意事项如下:

    1)通常你要导入原有数据进Elasticsearch的话,你还需要filter/date插件来修改默认的"@timestamp"字段值。稍后会学习这方面的知识。

    2)FileWatch只支持文件的绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。

    3)LogStash::Inputs::File只是在进程运行的注册阶段初始化一个FileWatch对象。所以它不能支持类似fluentd那样的path =>"/path/to/%{+yyyy/MM/dd/hh}.log"写法。达到相同目的,你只能写成path =>"/path/to/*/*/*/*.log"。FileWatch模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用**来缩写表示递归全部子目录。

    4)start_position仅在该文件从未被监听过的时候起作用。如果sincedb文件中已经有这个文件的inode记录了,那么Logstash依然会从记录过的pos开始读取数据。所以重复测试的时候每回需要删除sincedb文件。此外,官方博客上提供了另一个巧妙的思路:https://www.elastic.co/blog/logstash-configuration-tuning。把sincedb_path定义为/dev/null,则每次重启自动从头开始读。

    5)因为Windows平台上没有inode的概念,Logstash某些版本在Windows平台上监听文件不是很靠谱。Windows平台上,推荐考虑使用nxlog作为收集端,参见后面5.5节。

    2.1.3 TCP输入

    未来你可能会用Redis服务器或者其他的消息队列系统来作为Logstash Broker的角色。不过Logstash其实也有自己的TCP/UDP插件,在临时任务的时候,也算能用,尤其是测试环境。

    虽然LogStash::Inputs::TCP用Ruby的Socket和OpenSSL库实现了高级的SSL功能,但Logstash本身只能在SizedQueue中缓存20个事件。这就是我们建议在生产环境中换用其他消息队列的原因。

    配置示例如下:

    input {

        tcp {

            port => 8888

            mode =>"server"

            ssl_enable => false

        }

    }

    目前来看,LogStash::Inputs::TCP最常见的用法就是配合nc命令导入旧数据。在启动Logstash进程后,在另一个终端运行如下命令即可导入数据:

    # nc 127.0.0.1 8888 < olddata

    这种做法比用LogStash::Inputs::File好,因为当nc命令结束,我们就知道数据导入完毕了。而用input/file方式,Logstash进程还会一直等待新数据输入被监听的文件,不能直接看出是否任务完成了。

    2.1.4 syslog输入

    syslog可能是运维领域最流行的数据传输协议了。当你想从设备上收集系统日志的时候,syslog应该会是你的第一选择。尤其是网络设备,比如思科中syslog几乎是唯一可行的办法。

    我们这里不解释如何配置你的syslog.conf、rsyslog.conf或者syslog-ng.conf来发送数据,而只讲如何把Logstash配置成一个syslog服务器来接收数据。

    有关rsyslog的用法,后面的5.4节会有更详细的介绍。

    配置示例如下:

    input {

        syslog {

            port =>"514"

        }

    }

    作为最简单的测试,我们先暂停一下本机的syslogd(或rsyslogd)进程,然后启动Logstash进程(这样就不会有端口冲突问题)。现在,本机的syslog就会默认发送到Logstash里了。我们可以用自带的logger命令行工具发送一条“Hello World”信息到syslog里(即Logstash里)。看到的Logstash输出像下面这样:

    {

    "message" =>"Hello World",

    "@version" =>"1",

    "@timestamp" =>"2014-08-08T09:01:15.911Z",

    "host" =>"127.0.0.1",

    "priority" =>31,

    "timestamp" =>"Aug  8 17:01:15",

    "logsource" =>"raochenlindeMacBook-Air.local",

    "program" =>"com.apple.metadata.mdflagwriter",

    "pid" =>"381",

    "severity" =>7,

    "facility" =>3,

    "facility_label" =>"system",

    "severity_label" =>"Debug"

    }

    Logstash是用UDPSocket、TCPServer和LogStash::Filters::Grok来实现LogStash::Inputs::Syslog的。所以你其实可以直接用Logstash配置实现一样的效果,如下所示:

    input {

        tcp {

            port =>"8514"

        }

    }

    filter {

        grok {

            match => ["message", "%{SYSLOGLINE}" ]

        }

        syslog_pri { }

    }

    建议在使用LogStash::Inputs::Syslog的时候走TCP协议来传输数据。

    因为具体实现中,UDP监听器只用了一个线程,而TCP监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。

    如果你已经在使用UDP监听器收集日志,用下行命令检查你的UDP接收队列大小:

    # netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'

    Recv-Q

    228096

    228096是UDP接收队列的默认最大大小,这时候linux内核开始丢弃数据包了!

    强烈建议使用LogStash::Inputs::TCP和LogStash::Filters::Grok配合实现同样的syslog功能!

    虽然LogStash::Inputs::Syslog在使用TCPServer的时候可以采用多线程处理数据的接收,但是在同一个客户端数据的处理中,其grok和date是一直在该线程中完成的,这会导致总体上的处理性能几何级的下降—经过测试,TCPServer每秒可以接收50 000条数据,而在同一线程中启用grok后每秒只能处理5 000条,再加上date只能达到500条!

    才将这两步拆分到filters阶段后,Logstash支持对该阶段插件单独设置多线程运行,大大提高了总体处理性能。在相同环境下,logstash -f tcp.conf -w 20的测试中,总体处理性能可以达到每秒30 000条数据!

    测试采用Logstash作者提供的命令:

    yes "<44>May 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000

    出处见:https:// github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/Makefile

    如果你实在没法切换到TCP协议,可以自己写程序,或者使用其他基于异步I/O框架(比如libev)的项目。下面是一个简单的异步I/O实现UDP监听数据输入Elasticsearch的示例:https://gist.github.com/chenryn/7c922ac424324ee0d695。

    2.1.5 http_poller抓取

    Logstash作为数据采集系统,也支持自己作为一个HTTP客户端去抓取网页数据或者接口数据。这方面有一个很明显的IT运维应用场景:很多业务系统软件本身提供了RESTful的内部运行状态接口,可以直接通过接口采集这些监控信息。

    更长期的方案应该是编写对应的metricbeat模块,但是直接采用logstash-input-http_poller显然更快捷。

    比如Nginx的性能状态,社区有一个非常全面的性能状态监控模块:nginx-module-vts。在新浪微博,后端池分为核心接口、非核心接口两块,我们要分别监控的话,nginx-module-vts的配置如下:

    http {

        vhost_traffic_status_zone;

        map $uri $filter_uri {

            default 'non-core';

            /2/api/timeline core;

            ~^/2/api/unread core;

        }

        server {

            vhost_traffic_status_filter_by_set_key $filter_uri;

            location /status {

                auth_basic "Restricted";

                auth_basic_user_file pass_file;

                vhost_traffic_status_display;

                vhost_traffic_status_display_format json;

            }

        }

    }

    则对应的logstash-input-http_poller配置如下:

    input {

        http_poller {

            urls => {

                0 => {

                    method => get

                    url => "http://localhost:80/status/format/json"

                    headers => {

                        Accept => "application/json"

                    }

                    auth => {

                        user => "YouKnowIKnow"

                        password => "IKnowYouDonotKnow"

                    }

                }

                1 => {

                    method => get

                    url => "http://localhost:80/status/con ... up=*"

                    headers => {

                        Accept => "application/json"

                    }

                    auth => {

                        user => "YouKnowIKnow"

                        password => "IKnowYouDonotKnow"

                    }

                }

            }

            request_timeout => 60

            interval => 60

            codec => "json"

        }

    }

    这段配置就可以就可以每60秒获得一次 vts 数据,并重置计数了。

    注意,url是一个Hash值,所以它的执行顺序是根据Hash.map来的,为了确保我们是先获取数据再重置,这里干脆用0, 1来作为Hash的key,这样顺序就没问题了。

    2.2 编解码配置

    Codec是Logstash从1.3.0版开始新引入的概念(Codec来自Coder/decoder两个单词的首字母缩写)。

    在此之前,Logstash只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入期处理不同类型的数据,这全是因为有了Codec设置。

    所以,这里需要纠正之前的一个概念。Logstash不只是一个input | filter | output的数据流,而是一个input | decode | filter | encode | output的数据流!Codec就是用来decode、encode事件的。

    Codec的引入,使得Logstash可以更好、更方便地与其他有自定义数据格式的运维产品共存,比如graphite、fluent、netflow、collectd,以及使用msgpack、json、edn等通用数据格式的其他产品等。

    事实上,我们在第一个“Hello World”用例中就已经用过Codec了—rubydebug就是一种Codec!虽然它一般只会用在stdout插件中,作为配置测试或者调试的工具。

    这个五段式的流程说明源自Perl版的Logstash(后来改名叫Message::Passing模块)的设计。本书稍后5.8节会对该模块稍作介绍。

    2.2.1 JSON编解码

    在早期的版本中,有一种降低Logstash过滤器的CPU负载消耗的做法盛行于社区(在当时的Cookbook上有专门的一节介绍):直接输入预定义好的JSON数据,这样就可以省略掉filter/grok配置!

    这个建议依然有效,不过在当前版本中需要稍微做一点配置变动,因为现在有专门的Codec设置。

    1.配置示例

    社区常见的示例都是用的Apache的customlog,不过我觉得Nginx是一个比Apache更常用的新型Web服务器,所以我这里会用nginx.conf做示例:

    logformat json '{"@timestamp":"$time_iso8601",'

                   '"@version":"1",'

                   '"host":"$server_addr",'

                   '"client":"$remote_addr",'

                   '"size":$body_bytes_sent,'

                   '"responsetime":$request_time,'

                   '"domain":"$host",'

                   '"url":"$uri",'

                   '"status":"$status"}';

    access_log /var/log/nginx/access.log_json json;

    注意,在$request_time和$body_bytes_sent变量两头没有双引号",这两个数据在JSON里应该是数值类型!

    重启Nginx应用,然后修改你的input/file区段配置成下面这样:

    input {

        file {

            path =>"/var/log/nginx/access.log_json""

            codec =>"json"

        }

    }

    2.运行结果

    下面访问一下用Nginx发布的Web页面,然后你会看到Logstash进程输出类似下面这样的内容:

    {

    "@timestamp" =>"2014-03-21T18:52:25.000+08:00",

    "@version" =>"1",

    "host" =>"raochenlindeMacBook-Air.local",

    "client" =>"123.125.74.53",

    "size" =>8096,

    "responsetime" =>0.04,

    "domain" =>"www.domain.com",

    "url" =>"/path/to/file.suffix",

    "status" =>"200"

    }

    3. Nginx代理服务的日志格式问题

    对于一个Web服务器的访问日志,看起来已经可以很好的工作了。不过如果Nginx是作为一个代理服务器运行的话,访问日志里有些变量,比如说$upstream_response_time,可能不会一直是数字,它也可能是一个“-”字符串!这会直接导致Logstash对输入数据验证报异常。

    有两个办法解决这个问题:

    1)用sed在输入之前先替换-成0。运行Logstash进程时不再读取文件而是标准输入,这样命令就成了下面这个样子:

    tail -F /var/log/nginx/proxy_access.log_json \

        | sed 's/upstreamtime":-/upstreamtime":0/' \

        | /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/proxylog.conf

    2)日志格式中统一记录为字符串格式(即都带上双引号),然后再在Logstash中用filter/mutate插件来变更应该是数值类型的字符字段的值类型。

    有关LogStash::Filters::Mutate的内容,本书稍后会有介绍。

    2.2.2 多行事件编码

    有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。

    而Logstash正为此准备好了codec/multiline插件!当然,multiline插件也可以用于其他类似的堆栈式信息,比如Linux的内核日志。

    配置示例如下:

    input {

        stdin {

            codec => multiline {

                pattern =>"^\["

                negate => true

                what =>"previous"

            }

        }

    }

    运行Logstash进程,然后在等待输入的终端中输入如下几行数据:

    [Aug/08/08 14:54:03] hello world

    [Aug/08/09 14:54:04] hello logstash

        hello best practice

        hello raochenlin

    [Aug/08/10 14:54:05] the end

    你会发现Logstash输出下面这样的返回:

    {

    "@timestamp" =>"2014-08-09T13:32:03.368Z",

    "message" =>"[Aug/08/08 14:54:03] hello world\n",

    "@version" =>"1",

    "host" =>"raochenlindeMacBook-Air.local"

    }

    {

    "@timestamp" =>"2014-08-09T13:32:24.359Z",

    "message" =>"[Aug/08/09 14:54:04] hello logstash\n\n    hello best practice\n\n

                hello raochenlin\n",

    "@version" =>"1",

        "tags" => [

            [0] "multiline"

        ],

    "host" =>"raochenlindeMacBook-Air.local"

    }

    你看,后面这个事件,在“message”字段里存储了三行数据!

    输出的事件中没有最后一行的"the end"字符串,这是因为你最后输入的回车符\n并不匹配设定的^\[正则表达式,Logstash还得等下一行数据直到匹配成功后才会输出这个事件。

    其实这个插件的原理很简单,就是把当前行的数据添加到前面一行后面,直到新进的当前行匹配^\[正则为止。这个正则还可以用grok表达式,稍后你就会学习这方面的内容。具体的Java日志正则见:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/java

    说到应用程序日志,Log4j肯定是第一个被大家想到的,使用codec/multiline也确实是一个办法。

    不过,如果你本身就是开发人员,或者可以推动程序修改变更的话,Logstash还提供了另一种处理Log4j的方式:input/log4j。与codec/multiline不同,这个插件是直接调用了org.apache.log4j.spi.LoggingEvent处理TCP端口接收的数据。后面3.6节会详细讲述Log4j的用法。

    2.2.3 网络流编码

    NetFlow是Cisco发明的一种数据交换方式。NetFlow提供网络流量的会话级视图,记录下每个TCP/IP事务的信息。它的目的不是像tcpdump那样提供网络流量的完整记录,而是汇集起来形成更易于管理和易读的流向和容量的分析监控。

    Cisco上配置NetFlow的方法,请参照具体的设备说明,主要是设定采集服务器的地址和端口,为运行Logstash服务的主机地址和端口(示例中为9995)。

    采集NetFlow数据的Logstash服务配置示例如下:

    input {

        udp {

            port => 9995

            codec => netflow {

                definitions =>"/opt/logstash-1.4.2/lib/logstash/codecs/netflow/netflow.yaml"

                versions => [5]

            }

        }

    }

    output {

        elasticsearch {

            index =>"logstash_netflow5-%{+YYYY.MM.dd}"

            host =>"localhost"

        }

    }

    由于该插件生成的字段较多,所以建议对应的Elasticsesarch索引模板也需要单独提交:

    # curl -XPUT localhost:9200/_template/logstash_netflow5 -d '{

    "template" : "logstash_netflow5-*",

    "settings": {

            "index.refresh_interval": "5s"

    },

    "mappings" : {

            "_default_" : {

            "_all" : {"enabled" : false},

            "properties" : {

                "@version": { "index": "analyzed", "type": "integer" },

                "@timestamp": { "index": "analyzed", "type": "date" },

                "netflow": {

                "dynamic": true,

                "type": "object",

                "properties": {

                    "version": { "index": "analyzed", "type": "integer" },

                    "flow_seq_num": { "index": "not_analyzed", "type": "long" },

                    "engine_type": { "index": "not_analyzed", "type": "integer" },

                    "engine_id": { "index": "not_analyzed", "type": "integer" },

                    "sampling_algorithm": { "index": "not_analyzed", "type": "integer" },

                    "sampling_interval": { "index": "not_analyzed", "type": "integer" },

                    "flow_records": { "index": "not_analyzed", "type": "integer" },

                    "ipv4_src_addr": { "index": "analyzed", "type": "ip" },

                    "ipv4_dst_addr": { "index": "analyzed", "type": "ip" },

                    "ipv4_next_hop": { "index": "analyzed", "type": "ip" },

                    "input_snmp": { "index": "not_analyzed", "type": "long" },

                    "output_snmp": { "index": "not_analyzed", "type": "long" },

                    "in_pkts": { "index": "analyzed", "type": "long" },

                    "in_bytes": { "index": "analyzed", "type": "long" },

                    "first_switched": { "index": "not_analyzed", "type": "date" },

                    "last_switched": { "index": "not_analyzed", "type": "date" },

                    "l4_src_port": { "index": "analyzed", "type": "long" },

                    "l4_dst_port": { "index": "analyzed", "type": "long" },

                    "tcp_flags": { "index": "analyzed", "type": "integer" },

                    "protocol": { "index": "analyzed", "type": "integer" },

                    "src_tos": { "index": "analyzed", "type": "integer" },

                    "src_as": { "index": "analyzed", "type": "integer" },

                    "dst_as": { "index": "analyzed", "type": "integer" },

                    "src_mask": { "index": "analyzed", "type": "integer" },

                    "dst_mask": { "index": "analyzed", "type": "integer" }

                  }

              }

           }

        }

      }

    }'

    Elasticsearch索引模板的功能,本书稍后12.6节会有详细介绍。

    2.2.4 collectd输入

    collectd是一个守护(daemon)进程,用来收集系统性能和提供各种存储方式来存储不同值的机制。它会在系统运行和存储信息时周期性的统计系统的相关统计信息。利用这些信息有助于查找当前系统性能瓶颈(如作为性能分析performance analysis)和预测系统未来的load(如能力部署capacity planning)等

    下面简单介绍一下:collectd的部署以及与Logstash对接的相关配置实例。

    1. collectd的安装

    collectd的安装同样有两种方式,使用官方软件仓库安装或源代码安装。

    使用官方软件仓库安装(推荐)

    collectd官方有一个隐藏的软件仓库:https://pkg.ci.collectd.org,构建有RHEL/CentOS (rpm),Debian/Ubuntu(deb)的软件包,如果你使用的操作系统属于上述的,那么推荐使用软件仓库安装。

    目前collectd官方维护3个版本:5.4、5.5、5.6。根据需要选择合适的版本仓库。下面示例安装5.5版本的方法。

    Debian/Ubuntu仓库安装:

    echo "deb http://pkg.ci.collectd.org/deb $(lsb_release -sc) collectd-5.5" | sudo tee /etc/apt/sources.list.d/collectd.list

    curl -s https://pkg.ci.collectd.org/pubkey.asc | sudo apt-key add -

    sudo apt-get update && sudo apt-get install -y collectd

    注意,Debian/Ubuntu软件仓库自带有collectd软件包,如果软件仓库自带的版本足够你使用,那么可以不用添加仓库,直接通过apt-get install collectd即可。

    RHEL/CentOS仓库安装:

    cat > /etc/yum.repos.d/collectd.repo <<EOF

    [collectd-5.5]

    name=collectd-5.5

    baseurl=http://pkg.ci.collectd.org/rpm/collectd-5.5/epel-\$releasever-\$basearch/

    gpgcheck=1

    gpgkey=http://pkg.ci.collectd.org/pubkey.asc

    EOF

    yum install -y collectd

    你如果需要使用其他collectd插件,此时也可一并安装对应的collectd-xxxx软件包。

    源码安装collectd

    collectd目前维护3个版本:5.4、5.5、5.6。源代码编译安装时同样可以根据自己需要选择对应版本的源码下载:

    wget http://collectd.org/files/collectd-5.4.1.tar.gz

    tar zxvf collectd-5.4.1.tar.gz

    cd collectd-5.4.1

    ./configure --prefix=/usr --sysconfdir=/etc --localstatedir=/var --libdir=/usr/lib --mandir=/usr/share/man --enable-all-plugins

    make && make install

    源代码编译安装需要预先解决好各种环境依赖,在RedHat平台上要提前安装如下软件包:

    rpm -ivh "http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm"

    yum -y install libcurl libcurl-devel rrdtool rrdtool-devel perl-rrdtool rrdtool-prel libgcrypt-devel gcc make gcc-c++ liboping liboping-devel perl-CPAN net-snmp net-snmp-devel

    安装启动脚本如下:

    cp contrib/redhat/init.d-collectd /etc/init.d/collectd

    chmod +x /etc/init.d/collectd

    启动collectd如下:

    service collectd start

    2. collectd的配置

    以下配置可以实现对服务器基本的CPU、内存、网卡流量、磁盘I/O以及磁盘空间占用情况的监控:

    Hostname "host.example.com"

    LoadPlugin interface

    LoadPlugin cpu

    LoadPlugin memory

    LoadPlugin network

    LoadPlugin df

    LoadPlugin disk

    <Plugin interface>

        Interface "eth0"

        IgnoreSelected false

    </Plugin>

    <Plugin network>

    <Server "10.0.0.1""25826"> ## Logstash 的 IP 地址和 collectd 的数据接收端口号

    </Server>

    </Plugin>

    3. Logstash的配置

    以下配置实现通过Logstash监听25826端口,接收从collectd发送过来的各项检测数据。

    注意,logstash-filter-collectd插件本身需要单独安装,logstash插件安装说明之前已经讲过:

    bin/logstash-plugin install logstash-filter-collectd

    Logstash默认自带有collectd的codec编码插件。

    推荐配置示例如下:

    udp {

        port => 25826

        buffer_size => 1452

        workers => 3          # Default is 2

        queue_size => 30000   # Default is 2000

        codec => collectd { }

        type =>"collectd"

    }

    下面是简单的一个输出结果:

    {

    "_index": "logstash-2014.12.11",

    "_type": "collectd",

    "_id": "dS6vVz4aRtK5xS86kwjZnw",

    "_score": null,

    "_source": {

    "host": "host.example.com",

    "@timestamp": "2014-12-11T06:28:52.118Z",

    "plugin": "interface",

    "plugin_instance": "eth0",

    "collectd_type": "if_packets",

    "rx": 19147144,

    "tx": 3608629,

    "@version": "1",

    "type": "collectd",

    "tags": [

    "_grokparsefailure"

            ]

        },

    "sort": [

            1418279332118

        ]

    }

    参考资料

    collectd支持收集的数据类型:http://git.verplant.org/?p=collectd.git;a=blob;hb=master; f=README

    collectd收集各数据类型的配置参考资料:http://collectd.org/documentation/manpages/collectd.conf.5.shtml

    collectd简单配置文件示例:https://gist.github.com/untergeek/ab85cb86a9bf39f1fc6d

    2.3 过滤器配置

    有丰富的过滤器插件,是Logstash威力如此强大的重要因素。名为过滤器,其实提供的不单单是过滤的功能。下面我们就会重点介绍几个插件,它们扩展了进入过滤器的原始数据,进行复杂的逻辑处理,甚至可以无中生有地添加新的Logstash事件到后续的流程中去!

    2.3.1 date时间处理

    之前章节已经提过,logstash-filter-date插件可以用来转换你的日志记录中的时间字符串,变成LogStash::Timestamp对象,然后转存到@timestamp字段里。

    因为在稍后的logstash-output-elasticsearch中常用的%{+YYYY.MM.dd}这种写法必须读取@timestamp数据,所以一定不要直接删掉这个字段保留自己的字段,而是应该用logstash-filter-date转换后删除自己的字段!

    这在导入旧数据的时候固然非常有用,而在实时数据处理的时候同样有效,因为一般情况下数据流程中我们都会有缓冲区,导致最终的实际处理时间跟事件产生时间略有偏差。

    强烈建议打开Nginx的access_log配置项的buffer参数,对极限响应性能有极大提升!

    1.配置示例

    logstash-filter-date插件支持五种时间格式:

    ISO8601:类似“2011-04-19T03:44:01.103Z”这样的格式。具体Z后面可以有“08:00”也可以没有,“.103”这个也可以没有。常用场景里来说,Nginx的log_format配置里就可以使用$time_iso8601变量来记录请求时间成这种格式。

    UNIX:UNIX时间戳格式,记录的是从1970年起始至今的总秒数。Squid默认日志格式中就使用了这种格式。

    UNIX_MS:这个时间戳则是从1970年起始至今的总毫秒数。据我所知,JavaScript里经常使用这个时间格式。

    TAI64N:TAI64N格式比较少见,是这个样子的:@4000000052f88ea32489532c。我目前只知道常见应用中,qmail会用这个格式。

    Joda-Time库:Logstash内部使用了Java的Joda时间库来作时间处理。所以我们可以使用Joda库所支持的时间格式来作具体定义。Joda时间格式定义见表2-1。

    表2-1 Joda时间库格式

    格式符  含  义    描  述    示  例

    G   era text    AD

    C   century of era (>=0)    number  20

    Y   year of era (>=0)   year    1996

    x   weekyear    year    1996

    w   week of weekyear    number  27

    e   day of week number  2

    E   day of week text    Tuesday; Tue

    y   year    year    1996

    D   day of year number  189

    M   month of year   month   July; Jul; 07

    d   day of month    number  10

    a   halfday of day  text    PM

    K   hour of halfday (0~11)  number  0

    h   clockhour of halfday (1~12) number  12

    H   hour of day (0~23)  number  0

    k   clockhour of day (1~24) number  24

    m   minute of hour  number  30

    s   second of minute    number  55

    S   fraction of second  number  978

    z   time zone   text    Pacific Standard Time; PST

    Z   time zone offset/id zone    -0800; -08:00; America/Los_Angeles

    '   escape for text delimiter  

    ''  single quote    literal '

     

    下面我们写一个Joda时间格式的配置作为示例:

    filter {

        grok {

            match => ["message", "%{HTTPDATE:logdate}"]

        }

        date {

            match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]

        }

    }

    注意,时区偏移量只需要用一个字母Z即可。

    2.时区问题的解释

    很多中国用户经常提一个问题:为什么@timestamp比我们晚了8个小时?怎么修改成北京时间?

    其实,Elasticsearch内部,对时间类型字段,是统一采用UTC时间,存成long长整形数据的!对日志统一采用UTC时间存储,是国际安全/运维界的一个通识—欧美公司的服务器普遍广泛分布在多个时区里—不像中国,地域横跨五个时区却只用北京时间。

    对于页面查看,ELK的解决方案是在Kibana上,读取浏览器的当前时区,然后在页面上转换时间内容的显示。

    所以,建议大家接受这种设定。否则,即便你用.getLocalTime修改,也还要面临在Kibana过去修改,以及Elasticsearch原有的["now-1h" TO "now"]这种方便的搜索语句无法正常使用的尴尬。

    以上,请读者自行斟酌。

    2.3.2 grok正则捕获

    grok是Logstash最重要的插件。你可以在grok里预定义好命名正则表达式,在稍后(grok参数或者其他正则表达式里)引用它。

    1.正则表达式语法

    运维工程师多多少少都会一点正则。你可以在grok里写标准的正则,像下面这样:

    \s+(?<request_time>\d+(?:\.\d+)?)\s+

    这个正则表达式写法对于Perl或者Ruby程序员应该很熟悉了,Python程序员可能更习惯写(?P<name>pattern),没办法,适应一下吧。

    现在给我们的配置文件添加第一个过滤器区段配置。配置要添加在输入和输出区段之间(Logstash执行区段的时候并不依赖于次序,不过为了自己看得方便,还是按次序书写吧):

    input {stdin{}}

    filter {

        grok {

            match => {

            "message" =>"\s+(?<request_time>\d+(?:\.\d+)?)\s+"

            }

        }

    }

    output {stdout{codec=>rubydebug}}

    运行Logstash进程然后输入“begin 123.456 end”,你会看到类似下面这样的输出:

    {

    "message" =>"begin 123.456 end",

    "@version" =>"1",

    "@timestamp" =>"2014-08-09T11:55:38.186Z",

    "host" =>"raochenlindeMacBook-Air.local",

    "request_time" =>"123.456"

    }

    漂亮!不过数据类型好像不太满意……request_time应该是数值而不是字符串。

    我们已经提过稍后会学习用LogStash::Filters::Mutate来转换字段值类型,不过在grok 里,其实有自己的魔法来实现这个功能!

    2. grok表达式语法

    grok支持把预定义的grok表达式写入到文件中,官方提供的预定义grok表达式见:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。

    下面是从官方文件中摘抄的最简单但是足够说明用法的示例:

    USERNAME [a-zA-Z0-9._-]+

    USER %{USERNAME}

    第一行,用普通的正则表达式来定义一个grok表达式;第二行,通过打印赋值格式(sprintf format),用前面定义好的grok表达式来定义另一个grok表达式。

    grok表达式的打印复制格式的完整语法见下行示例。其中data_type目前只支持两个值:int和float。

    %{PATTERN_NAME:capture_name:data_type}

    所以我们可以改进我们的配置成下面这样:

    filter {

        grok {

            match => {

            "message" =>"%{WORD} %{NUMBER:request_time:float} %{WORD}"

            }

        }

    }

    重新运行进程然后可以得到如下结果:

    {

    "message" =>"begin 123.456 end",

    "@version" =>"1",

    "@timestamp" =>"2014-08-09T12:23:36.634Z",

    "host" =>"raochenlindeMacBook-Air.local",

    "request_time" => 123.456

    }

    这次request_time变成数值类型了。

    3.最佳实践

    实际运用中,我们需要处理各种各样的日志文件,如果你都是在配置文件里各自写一行自己的表达式,就完全不可管理了。所以,我们建议是把所有的grok表达式统一写入到一个地方。然后用filter/grok的patterns_dir选项来指明。

    如果你把“message”里所有的信息都grok到不同的字段了,数据实质上就相当于是重复存储了两份。所以你可以用remove_field参数来删除掉message字段,或者用overwrite参数来重写默认的message字段,只保留最重要的部分。

    重写参数的示例如下:

    filter {

        grok {

            patterns_dir =>["/path/to/your/own/patterns"]

            match => {

            "message" =>"%{SYSLOGBASE} %{DATA:message}"

            }

            overwrite => ["message"]

        }

    }

    更多有关grok正则性能的最佳实践(比如timeout_millis等配置参数),请参考:https://www.elastic.co/blog/do-you-grok-grok。

    4.高级用法

    多行匹配 在和codec/multiline搭配使用的时候,需要注意一个问题,grok正则和普通正则一样,默认是不支持匹配回车换行的。就像你需要=~ // m一样也需要单独指定,具体写法是在表达式开始位置加(?m)标记。如下所示:

    match => {

    "message" =>"(?m)\s+(?<request_time>\d+(?:\.\d+)?)\s+"

    }

    多项选择 有时候我们会碰上一个日志有多种可能格式的情况。这时候要写成单一正则就比较困难,或者全用|隔开又比较丑陋。这时候,Logstash的语法提供给我们一个有趣的解决方式。

    文档中,都说明logstash-filters-grok插件的match参数应该接受的是一个Hash值。但是因为早期的Logstash语法中Hash值也是用[]这种方式书写的,所以其实现在传递Array值给match参数也完全没问题。所以,我们这里其实可以传递多个正则来匹配同一个字段:

    match => [

    "message", "(?<request_time>\d+(?:\.\d+)?)",

    "message", "%{SYSLOGBASE} %{DATA:message}",

    "message", "(?m)%{WORD}"

    ]

    Logstash会按照这个定义次序依次尝试匹配,到匹配成功为止。虽说效果跟用|分割写个大大的正则是一样的,但是可阅读性好了很多。

    我强烈建议每个人都要使用Grok Debugger (http://grokdebug.herokuapp.com/)来调试自己的grok表达式。

    2.3.3 dissect解析

    grok作为Logstash最广为人知的插件,在性能和资源损耗方面同样也广为诟病。为了应对这个情况,同时也考虑到大多数情况下日志格式并没有那么复杂,Logstash开发团队在5.0版新添加了另一个解析字段的插件:dissect。当日志格式有比较简明的分隔标志位且重复性较大的时候,可以使用dissect插件更快地完成解析工作。下面是解析syslog的示例。

    filter {

        dissect {

            mapping => {

                "message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}"

            }

            convert_datatype => {

                pid => "int"

            }

        }

    }

    我们看到上面使用了和Grok很类似的%{}语法来表示字段,这显然是基于习惯延续的考虑。不过示例中%{+ts}的加号就不一般了。dissect除了字段外面的字符串定位功能以外,还通过几个特殊符号来处理字段提取的规则:

    %{+key} 这个+表示前面已经捕获到一个key字段了,而这次捕获的内容自动添补到之前key字段内容的后面。

    %{+key/2} 这个/2表示在有多次捕获内容都填到key字段里的时候,拼接字符串的顺序谁前谁后。/2表示排第2位。

    %{?string} 这个?表示这块只是一个占位,并不会实际生成捕获字段存到Event里面。

    %{?string} %{&string} 当同样捕获名称都是string,但是一个?和一个&在一起的时候,表示这是一个键值对。

    比如对http://rizhiyi.com/index.do?id=123写这么一段配置:

    http://%{domain}/%{?url}?%{?arg1}=%{&arg1}

    则最终生成的 Event 内容是这样的:

    {

        domain => "rizhiyi.com",

        id => "123"

    }

    2.3.4 GeoIP地址查询

    GeoIP是最常见的免费IP地址归类查询库,同时也有收费版可以采购。GeoIP库可以根据IP地址提供对应的地域信息,包括国别、省市、经纬度等,对于可视化地图和区域统计非常有用。

    配置示例如下:

    filter {

        geoip {

            source =>"message"

        }

    }

    运行结果如下:

    {

    "message" =>"183.60.92.253",

    "@version" =>"1",

    "@timestamp" =>"2014-08-07T10:32:55.610Z",

    "host" =>"raochenlindeMacBook-Air.local",

    "geoip" => {

    "ip" =>"183.60.92.253",

    "country_code2" =>"CN",

    "country_code3" =>"CHN",

    "country_name" =>"China",

    "continent_code" =>"AS",

    "region_name" =>"30",

    "city_name" =>"Guangzhou",

    "latitude" =>23.11670000000001,

    "longitude" =>113.25,

    "timezone" =>"Asia/Chongqing",

    "real_region_name" =>"Guangdong",

    "location" => [

                [0] 113.25,

                [1] 23.11670000000001

            ]

        }

    }

    GeoIP库数据较多,如果你不需要这么多内容,可以通过fields选项指定自己所需要的。下例为全部可选内容:

    filter {

        geoip {

            fields => ["city_name", "continent_code", "country_code2",

                "country_code3", "country_name", "dma_code", "ip", "latitude",

                "longitude", "postal_code", "region_name", "timezone"]

        }

    }

    需要注意的是:geoip.location是Logstash通过latitude和longitude额外生成的数据。所以,如果你是想要经纬度又不想重复数据的话,应该像下面这样做:

    filter {

        geoip {

            fields => ["city_name", "country_code2", "country_name", "latitude",

                "longitude", "region_name"]

            remove_field => ["[geoip][latitude]", "[geoip][longitude]"]

        }

    }

    还要注意:geoip插件的“source”字段可以是任一处理后的字段,比如“client_ip”,但是字段内容却需要小心!GeoIp库内只存有公共网络上的IP信息,查询不到结果的,会直接返回null,而Logstash的GeoIp插件对null结果的处理是:“不生成对应的geoip.字段”。所以读者在测试时,如果使用了诸如127.0.0.1、172.16.0.1、182.168.0.1、10.0.0.1等内网地址,会发现没有对应输出!

    2.3.5 JSON编解码

    在上一章,已经讲过在Codec中使用JSON编码。但是,有些日志可能是一种复合的数据结构,其中只有一部分记录是JSON格式的。这时候,我们依然需要在filter阶段,单独启用JSON解码插件。

    配置示例如下:

    filter {

        json {

            source =>"message"

            target =>"jsoncontent"

        }

    }

    运行结果如下:

    {

    "@version": "1",

    "@timestamp": "2014-11-18T08:11:33.000Z",

    "host": "web121.mweibo.tc.sinanode.com",

    "message": "{\"uid\":3081609001,\"type\":\"signal\"}",

    "jsoncontent": {

    "uid": 3081609001,

    "type": "signal"

    }

    }

    如果不打算使用多层结构的话,删掉target配置即可。单层结构新的结果如下:

    {

    "@version": "1",

    "@timestamp": "2014-11-18T08:11:33.000Z",

    "host": "web121.mweibo.tc.sinanode.com",

    "message": "{\"uid\":3081609001,\"type\":\"signal\"}",

    "uid": 3081609001,

    "type": "signal"

    }

    2.3.6 key-value切分

    在很多情况下,日志内容本身都是一个类似于key-value的格式,但是格式具体的样式却是多种多样的。Logstash提供logstash-filter-kv插件,帮助处理不同样式的key-value日志,变成实际的LogStash::Event数据。

    配置示例如下:

    filter {

        ruby {

            init =>"@kname = ['method','uri','verb']"

        ruby {

            init => "@kname = ['method','uri','verb']"

            code => "

                new_event = LogStash::Event.new(Hash[@kname.zip(event.get('request').split('|'))])

                new_event.remove('@timestamp')

                event.append(new_event)""

            "

        }

        if [uri] {

            ruby {

                init => "@kname = ['url_path','url_args']"

                code => "

                    new_event = LogStash::Event.new(Hash[@kname.zip(event.get('uri').split('?'))])

                    new_event.remove('@timestamp')

                    event.append(new_event)""

                "

            }

            kv {

                prefix =>"url_"

                source =>"url_args"

                field_split =>"&"

                remove_field => [ "url_args", "uri", "request" ]

            }

        }

    }

    Nginx访问日志中的$request,通过这段配置,可以详细切分成method、url_path、verb、url_a、url_b...

    进一步,如果url_args中有过多字段,可能导致Elasticsearch集群因为频繁update mapping或者消耗太多内存在cluster state上而宕机。所以,更优的选择是只保留明确有用的url_args内容,其他部分舍去,如下所示:

    kv {

        prefix =>"url_"

        source =>"url_args"

        field_split =>"&"

        include_keys => [ "uid", "cip" ]

        remove_field => [ "url_args", "uri", "request" ]

    }

    上例即表示,除了url_uid和url_cip两个字段以外,其他的url_*都不保留。

    2.3.7 metrics数值统计

    logstash-filter-metrics插件是使用Ruby的Metriks模块来实现在内存里实时地计数和采样分析。该模块支持两个类型的数值分析:meter和timer。下面分别举例说明。

    1. Meter示例(速率阈值检测)

    Web访问日志的异常状态码频率是运维人员会非常关心的一个数据。通常我们的做法是通过Logstash或者其他日志分析脚本,把计数发送到rrdtool或者graphite里面,然后再通过check_graphite脚本之类的东西来检查异常并报警。

    事实上这个事情可以直接在Logstash内部就完成。比如如果最近一分钟504请求的个数超过100个就报警,如下所示:

    filter {

        metrics {

            meter => "error_%{status}"

                add_tag => "metric"

                ignore_older_than => 10

        }

        if "metric" in [tags] {

            ruby {

                code => "event.cancel if event.get('[error_504][rate_1m]') * 60 > 100"

            }

        }

    }

    output {

        if "metric" in [tags] {

            exec {

                command => "echo \"Out of threshold: %{[error_504][rate_1m]}\""

            }

    }

    这里需要注意*60的含义。metrics模块生成的rate_1m/5m/15m意思是:最近1、5、15分钟的每秒速率!

    2. Timer示例(box and whisker异常检测)

    官版的logstash-filter-metrics插件只适用于metric事件的检查。由插件生成的新事件内部不存有来自input区段的实际数据信息。所以,要完成我们的百分比分布箱体检测,需要首先对代码稍微做几行变动,即在metric的timer事件里加一个属性,存储最近一个实际事件的数值:

    def register

        …

        @last = {}

    end

    def filter(event)

        …

        @last[event.sprintf(name)] = event.sprintf(value).to_f

    end

    def flush(options = {})

        …

        @metric_timer.each_pair do |name, metric|

    event.set(“[#{name}][last]”, @last[name])

    metric.clear if should_clear?

        end

        …

    end

    有了这个Last值,然后我们就可以用如下配置来探测异常数据了:

    filter {

        metrics {

            timer => {"rt" =>"%{request_time}"}

            percentiles => [25, 75]

            add_tag =>"percentile"

        }

            ruby {

                code => "l=event.get('[rt][p75]')-event.get('[rt][p25]');event.set('[rt][low]', event.get('[rt][p25]')-l);event.set('[rt][high]',event.get('[rt][p75]')+l)"

            }

        }

    }

    output {

        if "percentile" in [tags] and ([rt][last] > [rt][high] or [rt][last] < [rt][low]) {

           exec {

               command => "echo \"Anomaly: %{[rt][last]}\""

           }

    }

    有关box and shisker plot内容和重要性,参见《数据之魅》一书。

    2.3.8 mutate数据修改

    logstash-filter-mutate插件是Logstash另一个重要插件,它提供了丰富的基础类型数据处理能力,包括类型转换、字符串处理和字段处理等。

    1.类型转换

    类型转换是logstash-filter-mutate插件最初诞生时的唯一功能,其应用场景在之前的2.3.5节“JSON编解码”中已经提到。

    可以设置的转换类型包括:“integer”、“float”和“string”。示例如下:

    filter {

        mutate {

            convert => ["request_time", "float"]

        }

    }

    mutate除了转换简单的字符值,还支持对数组类型的字段进行转换,即将[“1”,“2”]转换成[1,2]。但不支持对哈希类型的字段做类似处理。有这方面需求的可以采用稍后讲述的logstash-filter-ruby插件完成。

    2. 字符串处理

    有如下字符串处理的插件:

    gsub:仅对字符串类型字段有效。

    gsub => ["urlparams", "[\\?#]", "_"]

    split:分割字符串。

    filter {

        mutate {

            split => ["message", "|"]

        }

    }

    随意输入一串以|分割的字符,比如“123|321|adfd|dfjld*=123”,可以看到如下输出:

    {

    "message" => [

        [0] "123",

        [1] "321",

        [2] "adfd",

        [3] "dfjld*=123"

    ],

    "@version" =>"1",

    "@timestamp" =>"2014-08-20T15:58:23.120Z",

    "host" =>"raochenlindeMacBook-Air.local"

    }

    join:仅对数组类型字段有效。

    我们在之前已经用split割切的基础上再join回去。配置改成:

    filter {

        mutate {

            split => ["message", "|"]

        }

        mutate {

            join => ["message", ","]

        }

    }

    filter区段之内,是顺序执行的。所以我们最后看到的输出结果是:

    {

    "message" =>"123,321,adfd,dfjld*=123",

    "@version" =>"1",

    "@timestamp" =>"2014-08-20T16:01:33.972Z",

    "host" =>"raochenlindeMacBook-Air.local"

    }

    merge:合并两个数组或者哈希字段。依然在之前split的基础上继续:

    filter {

        mutate {

            split => ["message", "|"]

        }

        mutate {

            merge => ["message", "message"]

        }

    }

    我们会看到输出:

    {

    "message" => [

        [0] "123",

        [1] "321",

        [2] "adfd",

        [3] "dfjld*=123",

        [4] "123",

        [5] "321",

        [6] "adfd",

        [7] "dfjld*=123"

    ],

    "@version" =>"1",

    "@timestamp" =>"2014-08-20T16:05:53.711Z",

    "host" =>"raochenlindeMacBook-Air.local"

    }

    如果src字段是字符串,会自动先转换成一个单元素的数组再合并。把上一示例中的来源字段改成“host”:

    filter {

        mutate {

            split => ["message", "|"]

        }

        mutate {

            merge => ["message", "host"]

        }

    }

    结果变成:

    {

    "message" => [

        [0] "123",

        [1] "321",

        [2] "adfd",

        [3] "dfjld*=123",

        [4] "raochenlindeMacBook-Air.local"

    ],

    "@version" =>"1",

    "@timestamp" =>"2014-08-20T16:07:53.533Z",

    "host" => [

        [0] "raochenlindeMacBook-Air.local"

        ]

    }

    看,目的字段“message”确实多了一个元素,但是来源字段“host”本身也由字符串类型变成数组类型了!

    同样,如果目的字段不是数组,也会被强制转换。即使来源字段并不存在:

    filter {

        mutate {

            merge => ["message", "not_exist_field"]

        }

    }

    结果会变成:

    {

    "message" => [

            [0] "123|321|adfd|dfjld*=123"

        ],

    "@version" =>"1",

    "@timestamp" =>"2014-08-20T15:58:23.120Z",

    "host" =>"raochenlindeMacBook-Air.local"

    }

    strip:去除字段内容前后的空格。可以接受数组参数:

    filter {

        mutate {

            strip => ["syslog_message", "syslog_datetime"]

        }

    }

    lowercase:将字段内容全部转换成小写字母。同样可以接受数组。在ELK stack场景中,将内容转换成小写会是一个比较常见的需求。因为Elasticsearch默认是统一按照小写字母来搜索的。为了确保检索准确率,在不影响使用的情况下,建议对常用检索字段启用lowercase配置。

    uppercase:将字段内容全部转换成大写字母。同样可以接受数组。

    3.字段处理

    字段处理的插件有:

    rename:重命名某个字段,如果目的字段已经存在,会被覆盖掉,如下所示:

    filter {

        mutate {

            rename => ["syslog_host", "host"]

        }

    }

    update:更新某个字段的内容。如果字段不存在,不会新建。

    replace:作用和update类似,但是当字段不存在的时候,它会起到add_field参数一样的效果,自动添加新的字段。

    4.执行次序

    需要注意的是,filter/mutate内部是有执行次序的。其次序如下:

    rename(event) if @rename

    update(event) if @update

    replace(event) if @replace

    convert(event) if @convert

    gsub(event) if @gsub

    uppercase(event) if @uppercase

    lowercase(event) if @lowercase

    strip(event) if @strip

    remove(event) if @remove

    split(event) if @split

    join(event) if @join

    merge(event) if @merge

    filter_matched(event)

    而filter_matched这个filters/base.rb里继承的方法也是有次序的:

    @add_field.each do |field, value|

    end

    @remove_field.each do |field|

    end

    @add_tag.each do |tag|

    end

    @remove_tag.each do |tag|

    end

    2.3.9 随心所欲的Ruby处理

    如果你稍微懂那么一点点Ruby语法的话,logstash-filter-ruby插件将会是一个非常有用的工具。比如你需要稍微修改一下LogStash::Event对象,但是又不打算为此写一个完整的插件,用logstash-filter-ruby插件绝对感觉良好。

    配置示例如下:

    filter {

        ruby {

            init =>"@kname = ['client','servername','url','status','time','size','upstream',

                'upstreamstatus','upstreamtime','referer','xff','useragent']"

            code => "

                ew_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])

                new_event.remove('@timestamp')

            event.append(new_event)"

        }

    }

    官网示例是一个比较有趣但是没啥大用的做法—随机取消90%的事件。

    所以上面我们给出了一个有用而且强大的实例。

    通常我们都是用logstash-filter-grok插件来捕获字段的,但是正则耗费大量的CPU资源,很容易成为Logstash进程的瓶颈。

    而实际上,很多流经Logstash的数据都是有自己预定义的特殊分隔符的,我们可以很简单的直接切割成多个字段。

    从Logstash2.3开始,LogStash::event.append不再直接接受Hash对象,而必须是LogStash:: Event对象。所以示例变成要先初始化一个新的event,再把无用的@timestamp移除,再append进去。否则会把@timestamp变成有两个时间的数组了!

      从Logstash 5.0开始,LogStash::Event改为Java实现,直接使用event["parent"]["child"]形式获取的,不是原事件的引用而是复制品,需要改用event.get('[parent][child]')和event.set('[parent][child]', 'value')的方法。

    logstash-filter-mutate插件里的“split”选项只能切成数组,后续很不方便使用和识别。而在logstash-filter-ruby里,我们可以通过“init”参数预定义好由每个新字段的名字组成的数组,然后在“code”参数指定的Ruby语句里通过两个数组的zip操作生成一个哈希并添加进数组里。短短一行Ruby代码,可以减少50%以上的CPU使用率。

    logstash-filter-ruby插件用途远不止这一点,下一节你还会继续见到它的身影。

    更多实例如下:

    filter{

        date {

            match => ["datetime" , "UNIX"]

        }

        ruby {

            code =>"event.cancel if 5 * 24 * 3600 < (event.get( '@timestamp')-::Time.now).abs"

        }

    }

    在实际运用中,我们几乎肯定会碰到出乎意料的输入数据。这都有可能导致Elasticsearch集群出现问题。

    当数据格式发生变化,比如UNIX时间格式变成UNIX_MS时间格式,会导致Logstash疯狂创建新索引,集群崩溃。

    或者误输入过老的数据时,因为一般我们会close几天之前的索引以节省内存,必要时再打开。而直接尝试把数据写入被关闭的索引会导致内存问题。

    这时候我们就需要提前校验数据的合法性。上面配置,就是用于过滤掉时间范围与当前时间差距太大的非法数据的。

    2.3.10 split拆分事件

    上一章我们通过multiline插件将多行数据合并进一个事件里,那么反过来,也可以把一行数据,拆分成多个事件。这就是split插件。

    配置示例如下:

    filter {

        split {

            field =>"message"

            terminator =>"#"

        }

    }

    这个测试中,我们在intputs/stdin的终端中输入一行数据:“test1#test2”,结果看到输出两个事件:

    {

    "@version": "1",

    "@timestamp": "2014-11-18T08:11:33.000Z",

    "host": "web121.mweibo.tc.sinanode.com",

    "message": "test1"

    }

    {

    "@version": "1",

    "@timestamp": "2014-11-18T08:11:33.000Z",

    "host": "web121.mweibo.tc.sinanode.com",

    "message": "test2"

    }

    split插件中使用的是yield功能,其结果是split出来的新事件,会直接结束其在filter阶段的历程,也就是说写在split后面的其他filter插件都不起作用,进入到output阶段。所以,一定要保证split配置写在全部filter配置的最后。

      使用了类似功能的还有clone插件。从logstash-1.5.0beta1版本以后修复该问题。

    2.3.11 交叉日志合并

    Splunk有一项非常有用的功能,叫做transaction。可以在错乱的多行日志中,根据connected字段、maxspan窗口、startswith/endwith标签等信息计算出事件的duration和count结果。其文档见:http://docs.splunk.com/Documentation/Splunk/latest/SearchReference/Transaction

    ELK中,承载计算功能的Elasticsearch并不支持这种跨行计算,所以,变通的处理方式是:在Logstash中,提前做好事件的归并,直接计算出来transaction的duration数据。

    比如一个transaction task_id startswith=START endwith=END的查询,可以在Logstash中这样计算:

    filter {

        grok {

            match => ["message", "%{TIMESTAMP_ISO8601} START id: (?<task_id>.*)"]

            add_tag => [ "taskStarted" ]

        }

        grok {

            match => ["message", "%{TIMESTAMP_ISO8601} END id: (?<task_id>.*)"]

            add_tag => [ "taskTerminated"]

        }

        elapsed {

            start_tag =>"taskStarted"

            end_tag =>"taskTerminated"

            unique_id_field =>"task_id"

        }

    }

    如果你的需求是合并多行日志,不单是计算时差。则可以使用另一个第三方插件:logstash-filter-aggregate。配置示例如下:

    filter {

        grok {

            match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ]

        }

     

        if [logger] == "TASK_START" {

            aggregate {

                task_id => "%{taskid}"

                code => "map['sql_duration'] = 0"

                map_action => "create"

            }

        }

        if [logger] == "SQL" {

            aggregate {

                task_id => "%{taskid}"

                code => "map['sql_duration'] += event.get('duration')"

                map_action => "update"

            }

        }

        if [logger] == "TASK_END" {

            aggregate {

                task_id => "%{taskid}"

                code => "event.set('sql_duration', map['sql_duration'])"

                map_action => "update"

                end_of_task => true

                timeout => 120

            }

        }

    }

    该配置可以将如下这几行交叉打印的日志:

    INFO - 12345 - TASK_START - start

        INFO - 12345 - SQL - sqlQuery1 - 12

        INFO - 12345 - SQL - sqlQuery2 - 34

    INFO - 12345 - TASK_END - end

    以一个统一的task_id,以及超时时间、结束标识等条件,最终合并成为如下事件:

    {

        "message" => "INFO - 12345 - TASK_END - end",

        "sql_duration" => 46

    }

    2.4 输出插件

    2.4.1 输出到Elasticsearch

    Logstash早期有三个不同的Elasticsearch插件。到1.4.0版本的时候,开发者彻底重写了LogStash::Outputs::Elasticsearch插件。从此,我们只需要用这一个插件,就能任意切换使用Elasticsearch集群支持的各种不同协议了。

    1.配置示例

    output {

        elasticsearch {

            hosts => ["192.168.0.2:9200"]

            index => "logstash-%{type}-%{+YYYY.MM.dd}"

            document_type => "%{type}"

            flush_size => 20000

            idle_flush_time => 10

            sniffing => true

            template_overwrite => true

        }

    }

    2.解释

    批量发送 在过去的版本中,主要由本插件的'flush_size'和'idle_flush_time'两个参数共同控制Logstash向Elasticsearch发送批量数据的行为。以上面示例来说:Logstash会努力攒到20 000条数据一次性发送出去,但是如果10秒钟内也没攒够20 000条,Logstash还是会以当前攒到的数据量发一次。

    默认情况下,'flush_size'是500条,'idle_flush_time'是1秒。这也是很多人改大了'flush_size'也没能提高写入ES性能的原因—Logstash还是1秒钟发送一次。

    从5.0开始,这个行为有了另一个前提:'flush_size'的大小不能超过Logstash运行时的命令行参数设置的'batch_size',否则将以'batch_size'为批量发送的大小。

    索引名 写入的Elasticsearch索引的名称,这里可以使用变量。为了更贴合日志场景,Logstash提供了%{+YYYY.MM.dd}这种写法。在语法解析的时候,看到以+号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。所以,之前处理过程中不要给自定义字段取个加号开头的名字……

    此外,注意索引名中不能有大写字母,否则Elasticsearch在日志中会报InvalidIndex-NameException,但是Logstash不会报错,这个错误比较隐晦,也容易掉进这个坑中。

    轮询 gstash 1.4.2在transport和HTTP协议的情况下是固定连接指定host发送数据。从1.5.0开始,host可以设置数组,它会从节点列表中选取不同的节点发送数据,达到Round-Robin负载均衡的效果。

    3.不同版本的协议沿革

    1.4.0版本之前,有logstash-output-elasticsearch、 logstash-output-elasticsearch_http、logstash- output-elasticsearch_river三个插件。

    1.4.0到2.0版本之间,配合Elasticsearch废弃river方法,只剩下logstash-output-elasticsearch一个插件,同时实现了node、transport、http三种协议。

    2.0版本开始,为了兼容性和调试方便,logstash-output-elasticsearch改为只支持HTTP协议。想继续使用node或者transport协议的用户,需要单独安装logstash-output-elasticsearch_ java插件。

    一个小集群里,继续使用logstash-output-elasticsearch_java的node协议是最方便了。

    Logstash以Elasticsearch的client节点身份(即不存数据不参加选举)运行。如果你运行下面这行命令,你就可以看到自己的Logstash进程名,对应的node.role值是c:

    # curl 127.0.0.1:9200/_cat/nodes?v

    host       ip      heap.percent ram.percent load node.role master name

    local 192.168.0.102  7      c         -      logstash-local-1036-2012

    local 192.168.0.2    7      d         *      Sunstreak

    Logstash 1.5以后,也不再分发一个内嵌的Elasticsearch服务器。如果你想变更node协议下的这些配置,在$PWD/elasticsearch.yml文件里写自定义配置即可,Logstash会尝试自动加载这个文件。

    对于拥有很多索引的大集群,你可以用transport协议。Logstash进程会转发所有数据到你指定的某台主机上。这种协议跟上面的node协议是不同的。node协议下的进程是可以接收到整个Elasticsearch集群状态信息的,当进程收到一个事件时,它就知道这个事件应该存在集群内哪个机器的分片里,所以它就会直接连接该机器发送这条数据。而transport协议下的进程不会保存这个信息,在集群状态更新(节点变化,索引变化都会发送全量更新)时,就不会对所有的Logstash进程也发送这种信息。更多Elasticsearch集群状态的细节,参阅http://www.elasticsearch.org/guide。

    如果你已经有现成的Elasticsearch集群,但是版本跟Logstash自带的又不太一样,建议你使用http协议。Logstash会使用POST方式发送数据。

    4.数据重复问题

    经常有读者问,为什么 Logstash 在有多个 conf 文件的情况下,进入ES(Elasticsecrrch)的数据会重复,几个 conf 数据就会重复几次。其实问题原因在之前章节提到过,output 段顺序执行,没有对日志 type 进行判断的各插件配置都会全部执行一次。在 output 段对 type 进行判断的语法如下所示:

    output {

        if [type] == "nginxaccess" {

            elasticsearch { }

        }

    }

    5.模板

    Elasticsearch支持给索引预定义设置和mapping(前提是你用的Elasticsearch版本支持这个API,不过估计应该都支持)。Logstash自带有一个优化好的模板,内容如下:

    {

        "template" : "logstash-*",

        "version" : 50001,

        "settings" : {

            "index.refresh_interval" : "5s"

        },

        "mappings" : {

            "_default_" : {

                "_all" : {"enabled" : true, "norms" : false},

                "dynamic_templates" : [ {

                    "message_field" : {

                        "path_match" : "message",

                        "match_mapping_type" : "string",

                        "mapping" : {

                            "type" : "text",

                            "norms" : false

                        }

                    }

                }, {

                    "string_fields" : {

                        "match" : "*",

                        "match_mapping_type" : "string",

                        "mapping" : {

                            "type" : "text", "norms" : false,

                            "fields" : {

                                "keyword" : { "type": "keyword"  }

                            }

                        }

                    }

                }  ],

                "properties" : {

                    "@timestamp": { "type": "date", "include_in_all": false  },

                    "@version": { "type": "keyword", "include_in_all": false  },

                    "geoip"  : {

                        "dynamic": true,

                        "properties" : {

                            "ip": { "type": "ip"  },

                            "location" : { "type" : "geo_point"  },

                            "latitude" : { "type" : "half_float"  },

                            "longitude" : { "type" : "half_float"  }

                        }

                    }

                }

            }

        }

    }

    这其中的关键设置包括:

    template for index-pattern:只有匹配logstash-*的索引才会应用这个模板。有时候我们会变更Logstash的默认索引名称,记住你也得通过PUT方法上传可以匹配你自定义索引名的模板。当然,我更建议的做法是,把你自定义的名字放在“logstash-”后面,变成index =>"logstash-custom-%{+yyyy.MM.dd}"这样。

    refresh_interval for indexing:Elasticsearch是一个近实时搜索引擎。它实际上是每1秒钟刷新一次数据。对于日志分析应用,我们用不着这么实时,所以Logstash自带的模板修改成了5秒钟。你还可以根据需要继续放大这个刷新间隔以提高数据写入性能。

    multi-field with keyword:Elasticsearch 会自动使用自己的默认分词器(空格、点、斜线等分割)来分析字段。分词器对于搜索和评分是非常重要的,但是大大降低了索引写入和聚合请求的性能。所以Logstash模板定义了一种叫“多字段”(multi-field)类型的字段。这种类型会自动添加一个以“.keyword”结尾的字段,并给这个字段设置为不启用分词器。简单说,你想获取URL字段的聚合结果的时候,不要直接用url,而是用url.keyword作为字段名。当你还对分词字段发起聚合和排序请求的时候,直接提示无法构建fielddata了!

    在Logstash 5.0中,同时还保留携带了针对Elasticsearch 2.x的template文件,在那里,通过旧版本的mapping配置,达到和新版本相同的行为效果:对应统计字段明确设置"index":"not_analyzed","doc_values":true,以及对分词字段加上对fielddata 的{"format":"disabled"}。

    half_float:Elasticsearch 5.0新引入了half_float类型。比标准的float类型占用更少的资源,提供更好的性能。在明确自己数值范围较小的时候可用。刚巧,经纬度就是一个数值范围很小的数据。

    geo_point:Elasticsearch支持geo_point类型,geo distance聚合等等。比如说,你可以请求某个geo_point点方圆10千米内数据点的总数。在Kibana的tilemap类型面板里,就会用到这个类型的数据。

    6.其他模板配置建议

    order:如果你有自己单独定制template的想法,很好。这时候有几种选择:

    在logstash-output-elasticsearch配置中开启manage_template => false选项,然后一切自己动手;

    在logstash-output-elasticsearch配置中开启template =>"/path/to/your/tmpl.json"选项,让logstash来发送你自己写的template文件;

    避免变更Logstash里的配置,而是另外发送一个template,利用Elasticsearch的templates order功能。

    这个order功能,就是Elasticsearch在创建一个索引的时候,如果发现这个索引同时匹配上了多个template,那么就会先应用order数值小的template设置,然后再应用一遍order数值高的作为覆盖,最终达到一个merge的效果。比如,对上面这个模板已经很满意,只想修改一下refresh_interval,那么只需要新写一个:

    {

    "order" : 1,

    "template" : "logstash-*",

    "settings" : {

    "index.refresh_interval" : "20s"

        }

    }

    然后运行curl -XPUT http:// localhost:9200/_template/template_newid -d '@/path/to/your/tmpl.json' 即可。Logstash默认的模板,order是0,id是logstash,通过logstash-output-elasticsearch的配置选项template_name修改。你的新模板就不要跟这个名字冲突了。

    _fields_name:日志场景最重要的瓶颈首先在于入库速度。所以Logstash默认模板中,通过对一些字段的“norms”:false和“include_in_all”:false设定来减少计算量,提高入库速度。其实还有一个内置的字段,在条件许可的情况下,是可以关闭的,就是_fields_name。

    这个字段和_all有些类似,_all里面记录的是所有字段的值,而_fields_name里记录的是所有字段的名字。这个作用是可以大大加速诸如exists、missing类查询的速度。但是会导致30%左右的写入速度损耗。在日志场景中,也是可以关闭的,代码如下:

    "_all" : {"enabled" : true, "norms" : false},

    “_fields_name” : { “enabled” : false },

    2.4.2 发送email

    配置示例如下:

    output {

        email {

            port           =>    "25"

            address        =>    "smtp.126.com"

            username       =>    "test@126.com"

            password       =>    ""

            authentication =>    "plain"

            use_tls        =>    true

            from           =>    "test@126.com"

            subject        =>    "Warning: %{title}"

            to             =>    "test@qq.com"

            via            =>    "smtp"

            body           =>    "%{message}"

        }

    }

    logstash-output-email插件支持SMTP协议和sendmail两种方式,通过via参数设置。SMTP方式有较多的options参数可配置。sendmail只能利用本机上的sendmail服务来完成。文档上描述了Mail库支持的sendmail配置参数,但实际代码中没有相关处理,不要被迷惑了。

    2.4.3 调用系统命令执行

    logstash-output-exec插件的运用也非常简单,如下所示,将Logstash切割成的内容作为参数传递给命令。这样,在每个事件到达该插件的时候,都会触发这个命令的执行。

    output {

        exec {

            command =>"sendsms.pl \"%{message}\" -t %{user}"

        }

    }

    需要注意的是。这种方式是每次都重新开始执行一次命令并退出。本身是比较慢速的处理方式(程序加载,网络建联等都有一定的时间消耗)。最好只用于少量的信息处理场景,比如不适用nagios的其他报警方式。示例就是通过短信发送消息。

    2.4.4 保存成文件

    通过日志收集系统将分散在数百台服务器上的数据集中存储在某中心服务器上,这是运维最原始的需求。早年的scribed,甚至直接就把输出的语法命名为<store>。Logstash当然也能做到这点。

    和LogStash::Inputs::File不同,LogStash::Outputs::File里可以使用sprintf format格式来自动定义输出到带日期命名的路径。

    配置示例如下:

    output {

        file {

            path =>"/path/to/%{+yyyy}/%{+mm}/%{+dd}/%{+HH}/%{host}.log.gz"

            message_format =>"%{message}"

            gzip => true

        }

    }

    使用output/file插件首先需要注意的就是message_format参数。插件默认是输出整个event的JSON形式数据的。这可能跟大多数情况下使用者的期望不符。大家可能只是希望按照日志的原始格式保存就好了。所以需要定义为%{message},当然,前提是在之前的filter插件中,你没有使用remove_field或者update等参数删除或修改%{message}字段的内容。

    另一个非常有用的参数是gzip。gzip格式是一个非常奇特而友好的格式。其格式包括有:

    10字节的头,包含幻数、版本号以及时间戳。

    可选的扩展头,如原文件名。

    文件体,包括DEFLATE压缩的数据。

    8字节的尾注,包括CRC-32校验和以及未压缩的原始数据长度。

    这样gzip就可以一段一段的识别出来数据—反过来说,也就是可以一段一段压缩了,添加在后面!

    这对于我们流式添加数据简直太棒了!

    你或许见过网络流传的parallel命令行工具并发处理数据的神奇文档,但在自己用的时候总见不到效果。实际上就是因为:文档中处理的gzip文件,可以分开处理然后再合并的,而你的用法却不一定可以。

    这里需要注意两点:

    1)按照Logstash标准,其实应该可以把数据格式的定义改在codec插件中完成,但是 logstash-output-file插件内部实现中跳过了@codec.decode这步,所以codec设置无法生效!

    2)按照Logstash标准,配置参数的值可以使用event sprintf格式。但是logstash-output-file插件对event.sprintf(@path)的结果,还附加了一步inside_file_root?校验(个人猜测是为了防止越权到其他路径),这个file_root是通过直接对path参数分割/符号得到的。如果在sprintf格式中带有/符号,那么被切分后的结果就无法正确解析了。所以,如下所示配置,虽然看起来是正确的,实际效果却不对,正确写法应该是本节之前的配置示例那样。

    output {

        file {

            path => "/path/to/%{+yyyy/MM/dd}/%{host}.log.gz"

            codec => line {

                format => "%{message}"

            }

        }

    }

    2.4.5 报警发送到Nagios

    Logstash中有两个output插件是与Nagios有关的。logstash-output-nagios插件发送数据给本机的nagios.cmd管道命令文件,logstash-output-nagios_nsca插件则是调用send_nsca命令以NSCA协议格式把数据发送给Nagios服务器(远端或者本地皆可)。

    1. nagios.cmd

    nagios.cmd是Nagios服务器的核心组件。Nagios事件处理和内外交互都是通过这个管道文件来完成的。

    使用CMD方式,需要自己保证发送的Logstash事件符合Nagios事件的格式。即必须在filter阶段预先准备好nagios_host和nagios_service字段;此外,如果在filter阶段也准备好nagios_annotation和nagios_level字段,这里也会自动转换成nagios事件信息。

    filter {

        if [message] =~ /err/ {

            mutate {

                add_tag =>"nagios"

                rename => ["host", "nagios_host"]

                replace => ["nagios_service", "logstash_check_%{type}"]

            }

        }

    }

    output {

        if "nagios" in [tags] {

            nagios { }

        }

    }

    如果不打算在filter阶段提供nagios_level,那么也可以在该插件中通过参数配置。

    所谓nagios_level,即我们通过nagiosplugin检查数据时的返回值。其取值范围和含义如下:

    “0”,代表“OK”,服务正常。

    “1”,代表“WARNNING”,服务警告,一般nagios plugin命令中使用-w参数设置该阈值。

    “2”,代表“CRITICAL”,服务危急,一般nagios plugin命令中使用-c参数设置该阈值。

    “3”,代表“UNKNOWN”,未知状态,一般会在timeout等情况下出现。

    默认情况下,该插件会以“CRITICAL”等级发送报警给Nagios服务器。

    nagios.cmd文件的具体位置,可以使用command_file参数设置。默认位置是“/var/lib/nagios3/rw/nagios.cmd”。

    关于和nagios.cmd交互的具体协议说明,有兴趣的读者请阅读Using external commands in Nagios一文(http://archive09.linux.com/feature/153285),这是《Learning Nagios 3.0》书中内容节选。

    2. NSCA

    NSCA是一种标准的Nagios分布式扩展协议。分布在各机器上的send_nsca进程主动将监控数据推送给远端Nagios服务器的NSCA进程。

    当Logstash跟Nagios服务器没有在同一个主机上运行的时候,就只能通过NSCA方式来发送报警了—当然也必须在Logstash服务器上安装send_nsca命令。

    Nagios事件所需要的几个属性在上一段中已经有过描述。不过在使用这个插件的时候,不要求提前准备好,而是可以在该插件内部定义参数:

    output {

        nagios_nsca {

            nagios_host =>"%{host}"

            nagios_service =>"logstash_check_%{type}"

            nagios_status =>"2"

            message_format =>"%{@timestamp}: %{message}"

            host =>"nagiosserver.domain.com"

        }

    }

    这里请注意,host和nagios_host两个参数,分别是用来设置Nagios服务器的地址,和报警信息中有问题的服务器地址。

    关于NSCA原理,架构和配置说明,还不了解的读者请阅读官方网站Using NSClient++ from nagios with NSCA一节(http://nsclient.org/nscp/wiki/doc/usage/nagios/nsca)。

    3.其他类似插件

    除了Nagios以外,Logstash同样可以发送信息给其他常见监控系统,方式和Nagios大同小异:

    logstash-output-ganglia插件通过UDP协议,发送gmetric型数据给本机/远端的gmond或者gmetad。

    logstash-output-zabbix插件调用本机的zabbix_sender命令发送。

    2.4.6 statsd

    statsd最早是2008年Flickr公司用Perl写的针对graphite、datadog等监控数据后端存储开发的前端网络应用,2011年Etsy公司用Nodejs重构。用于接收、写入、读取和聚合时间序列数据,包括即时值和累积值等。

    graphite是用Python模仿RRDtools写的时间序列数据库套件,包括三个部分:

    carbon:一个Twisted守护进程,监听处理数据。

    whisper:存储时间序列的数据库。

    webapp:一个用Django框架实现的网页应用。

    statsd项目涉及多个模块同时部署,步骤比较多,这里单独给读者介绍一下部署方法。首先通过如下几步安装graphite:

    1)安装cairo和pycairo库:

    # yum -y install cairo pycairo

    2)pip安装:

    # yum install python-devel python-pip

    # pip install django django-tagging carbon whisper graphite-web uwsgi

    3)配置Graphite:

    # cd /opt/graphite/webapp/graphite

    # cp local_settings.py.example local_settings.py

    # python manage.py syncdb

    修改local_settings.py中的DATABASE为设置的db信息。

    4)启动cabon:

    # cd /opt/graphite/conf/

    # cp carbon.conf.example carbon.conf

    # cp storage-schemas.conf.example storage-schemas.conf

    # cd /opt/graphite/

    # ./bin/carbon-cache.py start

    然后再通过如下几步安装statsd:

    1)Graphite地址设置:

    # cd /opt/

    # git clone git://github.com/etsy/statsd.git

    # cd /opt/statsd

    # cp exampleConfig.js Config.js

    根据Graphite服务器地址,修改Config.js中的配置如下:

    {

        graphitePort: 2003,

        graphiteHost: "10.10.10.124",

        port: 8125,

        backends: [ "./backends/graphite" ]

    }

    2)uwsgi配置:

    cd /opt/graphite/webapp/graphite

    cat > wsgi_graphite.xml <<EOF

    <uwsgi>

        <socket>0.0.0.0:8630</socket>

        <workers>2</workers>

        <processes>2</processes>

        <listen>100</listen>

        <chdir>/opt/graphite/webapp/graphite</chdir>

        <pythonpath>..</pythonpath>

        <module>wsgi</module>

        <pidfile>graphite.pid</pidfile>

        <master>true</master>

        <enable-threads>true</enable-threads>

        <logdate>true</logdate>

        <daemonize>/var/log/uwsgi_graphite.log</daemonize>

    </uwsgi>

    EOF

    cp /opt/graphite/conf/graphite.wsgi /opt/graphite/webapp/graphite/wsgi.py

    3)Nginx 的 uwsgi 配置:

    cat > /usr/local/nginx/conf/conf.d/graphite.conf <<EOF

    server {

        listen 8081;

        server_name graphite;

     

        access_log /opt/graphite/storage/log/webapp/access.log ;

        error_log /opt/graphite/storage/log/webapp/error.log ;

     

        location / {

            uwsgi_pass 0.0.0.0:8630;

            include uwsgi_params;

            proxy_connect_timeout 300;

            proxy_send_timeout 300;

            proxy_read_timeout 300;

        }

    }

    EOF

    4)启动:

    # uwsgi -x /opt/graphite/webapp/graphite/wsgi_graphite.xml

    # systemctl nginx reload

    5)数据测试:

    echo "test.logstash.num:100|c" | nc -w 1 -u $IP $port

    如果安装配置是正常的,在graphite的左侧metrics->stats->test->logstash->num的表,statsd里面多了numStats等数据。

    配置示例如下:

    output {

        statsd {

            host =>"statsdserver.domain.com"

            namespace =>"logstash"

            sender =>"%{host}"

            increment => ["httpd.response.%{status}"]

        }

    }

    Graphite以树状结构存储监控数据,所以statsd也是如此。所以发送给statsd的数据的key也一定得是“first.second.tree.four”这样的形式。而在logstash-output-statsd插件中,就会以三个配置参数来拼接成这种形式:

    namespace.sender.metric

    其中namespace和sender都是直接设置的,而metric又分为好几个不同的参数可以分别设置。statsd支持的metric类型如下:

    increment

    示例语法:increment => ["nginx.status.%{status}"]。该配置即可在statsd中生成对应的nginx.status.200, nginx.status.206, nginx.status.304, nginx.status.404, nginx.status.502, nginx.status.503, nginx.status.504等一系列监控项。同时,在statsd内配置的一个时间周期内,各状态码的次数,会自动累加到各自监控项里。

    decrement

    语法同increment。不过是递减而不是递增。

    count

    示例语法:count => {"nginx.bytes" =>"%{bytes}"}。该配置可以在statsd中生成一个nginx.bytes监控项。而每条Nginx访问记录的响应字节数,累加成带宽。

    gauge

    语法同count。gauge和count(在rrdtool中叫counter)两种计数器的区别从早年的rrdtool时代就被反复强调,即gauge直接存储原始数值,不累加。

    set

    示例语法:set => {"online"=>"%{user_id}"}。是新版statsd才支持的功能,可以用来做去重计算,比如在线人数统计。

    timing

    示例语法:timing => ["nginx.requesttime"=>"%{request_time}"]。该配置可以在statsd中生成一个nginx.requesttime监控项,记录时间周期内,Nginx访问记录的响应时间的数值统计情况,包括平均值,最大值,最小值,标准差,百分比值等。

    关于这些metric类型的详细说明,请阅读statsd文档:https://github.com/etsy/statsd/blob/master/docs/metric_types.md。

    推荐阅读

    Etsy发布nodejs版本statsd的博客:Measure Anything, Measure Everything(http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/)

    Flickr发布statsd的博客:Counting & Timing(http://code.flickr.net/2008/10/27/counting-timing/)

    Librato有关statsd协作的博客:Using StatsD with Librato(http://support.metrics.librato.com/knowledgebase/articles/77199-using-statsd-with-librato)

    2.4.7 标准输出stdout

    和之前logstash-input-stdin插件一样,logstash-output-stdout插件也是最基础和简单的输出插件。同样在这里简单介绍一下,作为输出插件的一个共性了解。

    配置示例如下:

    output {

        stdout {

            codec => rubydebug

            workers => 2

        }

    }

    输出插件统一具有一个参数是workers。Logstash为输出做了多线程的准备。

    其次是codec设置。codec的作用在之前已经讲过。可能除了logstash-codec-multiline,其他codec插件本身并没有太多的设置项。所以一般省略掉后面的配置区段。换句话说。上面配置示例的完全写法应该是:

    output {

        stdout {

            codec => rubydebug {

            }

            workers => 2

        }

    }

    单就logstash-output-stdout插件来说,其最重要和常见的用途就是调试。所以在不太有效的时候,加上命令行参数-vv运行,查看更多详细调试信息。

    2.4.8 TCP发送数据

    虽然之前我们已经提到过不建议直接使用LogStash::Inputs::TCP和LogStash::Outputs::TCP 做转发工作,不过在实际交流中,发现确实有不少朋友觉得这种简单配置足够使用,因而不愿意多加一层消息队列的。所以,还是把Logstash如何直接发送TCP数据也稍微提点一下。

    配置示例如下:

    output {

        tcp {

            host  =>"192.168.0.2"

            port  => 8888

            codec => json_lines

        }

    }

    在收集端采用TCP方式发送给远端的TCP端口。这里需要注意的是,默认的Codec选项是json。而远端的LogStash::Inputs::TCP的默认Codec选项却是plain!所以不指定各自的Codec,对接肯定是失败的。

    另外,由于IO BUFFER的原因,即使是两端共同约定为json依然无法正常运行,接收端会认为一行数据没结束,一直等待直至自己OutOfMemory!

    所以,正确的做法是,发送端指定Codec为json_lines,这样每条数据后面会加上一个回车,接收端指定Codec为json_lines或者json均可,这样才能正常处理。包括在收集端已经切割好的字段,也可以直接带入收集端使用了。

    2.4.9 输出到HDFS

    数据写入HDFS是很多日志收集系统的最终目的。不过Logstash偏巧不是其中之一。到目前为止,Logstash还没有官方支持的直接写入HDFS的插件。而在社区,则有两种不同的解决方案可供选择。下面分别介绍。

    1.通过HTTP接口

    插件源码地址见:https://github.com/dstore-dbap/logstash-webhdfs

    该插件使用Hadoop的WebHDFS接口,其本质就是发送POST数据,可以说实现起来比较简单。未来logstash-plugins官方可能也会收这个插件。

    配置示例如下:

    output {

        hadoop_webhdfs {

            workers => 2

            server =>"your.nameno.de:14000"

            user =>"flume"

            path =>"/user/flume/logstash/dt=%{+Y}-%{+M}-%{+d}/logstash-%{+H}.log"

            flush_size => 500

            compress =>"snappy"

            idle_flush_time => 10

            retry_interval => 0.5

        }

    }

    插件使用方式,和其他自定义插件一样,通过—pluginpath或者打包gem均可。

    2.通过Java接口

    插件源码地址见:https://github.com/avishai-ish-shalom/logstash-hdfs

    该插件使用Hadoop的HDFS接口,利用JRuby可以直接导入Java类的特性,直接使用了org.apache.hadoop.fs.FileSystem等类来实现。

    配置示例如下:

    output {

        hdfs {

            path =>"/path/to/output_file.log"

            enable_append => true

        }

    }

    因为需要导入各种Hadoop的jar包,所以这个运行比较麻烦。Logstash-1.4.2上的运行命令示例如下:

    # LD_LIBRARY_PATH="/usr/lib/hadoop/lib/native" GEM_HOME=./logstash-1.4.2/vendor/bundle/jruby/1.9 CLASSPATH=$(find ./logstash-1.4.2/vendor/jar -type f -name '*.jar'|tr '\n' ':'):$(find /usr/lib/hadoop-hdfs -type f -name '*.jar' | tr '\n' ':'):$(find /usr/lib/hadoop -type f -name '*.jar' | tr '\n' ':'):/etc/hadoop/conf java org.jruby.Main -I./logstash-1.4.2/lib ./logstash-

    如果使用Logstash-1.5版本,可以通过rubygems.org直接安装打包好的插件:

    # bin/plugin install logstash-output-hdfs

    然后这样运行:

    # LD_LIBRARY_PATH="/usr/lib/hadoop/lib/native" CLASSPATH=$(find /usr/lib/hadoop-hdfs -type f -name '*.jar' | tr '\n' ':'):$(find /usr/lib/hadoop -type f -name '*.jar' | grep -v sources | tr '\n' ':'):/etc/hadoop/conf $LOGSTASH_DIR/bin/logstash agent -f logstash.conf

     

    相关资源:ELK Stack权威指南 第2版
    最新回复(0)