ELK 套件分析Rails日志

当系统访问很频繁,服务器日志增多到一定量时,仅靠 tail -f 已经很难分析出有用信息了。
可能就只能进行一些简单的排除工作,比如根据日志内的时间信息,查找该时间附近有什么特殊访问之类。

即使这样用有时候也会比较麻烦,因为当你的应用服务器增多时,每个服务器都是单独写到本机日志文件内,造成了分析日志的难度。
这个时候就很需要一个工具来收集各个服务器日志来统一处理。

今天说的就是有关于这个问题的解法。

日志收集的工具也有不少 Flumestorm。 今天说的是 ELK stack 这个套件。这个套件由 3 个部分组成:

简单地说: Logstash 用来收集数据;ElasticSearch 存储数据;Kibana 是表现层,Web 接口用来对接用户UI。

ElasticSearch

相信 ElasticSearch 大家可能听说的,是做全文搜索的,跟 Solr 差不多是基于 Lucene 来做的。他有几个特点:

  • RESTful Web 接口
  • API 数据基于 JSON
  • 分布式架构

当然 Solr 现在也可以放到多台服务器上组成集群了,但是 ElasticSearch 从一开始就是这么设计的,这种与生俱来的特性与后天修补出来的体验是否一样,你懂得。

ElasticSearchELK 套件中处于数据存储的身份,无疑是最核心的。

Logstash

Logstash 可以单独使用,比如把各个服务器的日志收集起来集成到一个日志文件内,这样只要看一个日志文件,就不需要费力到每个服务器查看日志了。
当然结合今天介绍的套件使用无疑是最强大的。

Logstash 作为输入接口,他有很多插件可用,对接文件,对接网络接口,甚至对接 ElasticSearch 作为输入。
比如在我们的例子中,要收集多台日志文件,Logstash 可以开 TCP 端口,各个日志服务器使用 filebeat 代理来检查日志输出,一旦有新的日志输出到文件,
filebeat 就会把该内容发送到 Logstash 配置的端口内,这样就完成了日志的收集工作。

Logstash 在收集了日志后,还可以对数据进行 分析,拆解成多个字段,再输出到 ElasticSearch 中,这样就能使用 ElasticSearch 强大的功能对这些字段进行搜索了。

Kibana

Kibana 作为最后的用户 UI 接口,支持了很多分析的功能。比如分析每天的PV情况,一天内访问的高峰与低谷,异常攻击情况,瓶颈的处理等。

Kibana截图

实际应用

下面是配置的 Rails4 日志的分析结构

logstash-pattern-rails4.confview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Rails4 log prefix
LOGLEVELSINGLE %{WORD}
PROCESSNUM %{NUMBER}
RAILS4PREFIX %{LOGLEVELSINGLE}, \[%{TIMESTAMP_ISO8601} #%{PROCESSNUM}] %{LOGLEVEL} -- :

# This will often be the only line:
URIPATHPARAM2 %{URIPATH:uripath}(?:%{URIPARAM})?
RAILS4HEAD (?m)%{RAILS4PREFIX}Started %{WORD:verb} "%{URIPATHPARAM2:request}" for %{IPORHOST:clientip} at (?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND} %{ISO8601_TIMEZONE})

# Output schema migration info right after started
RAILS4SCHEMAMIGRATION \W*%{RAILS4PREFIX}ActiveRecord::SchemaMigration Load \(%{NUMBER:schemams}ms\) SELECT "schema_migrations".* FROM "schema_migrations"
# For some strange reason, params are stripped of {} - not sure that's a good idea.
RAILS4PARAMETERS \W*%{RAILS4PREFIX} Parameters: {%{DATA:params}}
RAILS4RPROCESSING (?:%{RAILS4SCHEMAMIGRATION})?\W*%{RAILS4PREFIX}Processing by %{RCONTROLLER} as (?<format>\S+)(?:%{RAILS4PARAMETERS})?

RAILS4PROFILE (?:\(Views: %{NUMBER:viewms:float}ms \| ActiveRecord: %{NUMBER:activerecordms:float}ms|\(ActiveRecord: %{NUMBER:activerecordms:float}ms)?
RAILS4FOOT %{RAILS4PREFIX}Completed %{NUMBER:response} %{DATA} in %{NUMBER:totalms:int}ms %{RAILS4PROFILE}%{GREEDYDATA}

# Put it all together
RAILS4 %{RAILS4HEAD}(?:%{RAILS4RPROCESSING})?(?<context>(?:%{DATA}\n)*)(?:%{RAILS4FOOT})?

补充要说的是,时间戳默认是以 Logstash 收到的日志数据的系统时间为每条日志时间戳的,这与时间日志发生的时间并不一致,
需要抽出日志内容里的时间为日志时间戳,这个需要在以下的处理:

1
2
3
4
data {
match => ["timestamp", 'yyyy-MM-dd HH:mm:ss Z']
remove_field => ['timestamp'] # 删除原来的时间戳
}

完整的Logstash 配置文件例子

logstash.confview raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
input {
#beats {
# port => '5044'
#}

file {
path => '/path/to/log/production.log'
codec => multiline {
pattern => "-- : Started "
negate => true
what => 'previous'
}
}
}

filter {
grok {
patterns_dir => '../grok-patterns'
match => { "message" => "%{RAILS4}" }
}

date {
match => ['timestamp', 'yyyy-MM-dd HH:mm:ss Z']
remove_field => ['timestamp']
}

geoip {
source => 'clientip'
#fields => ["city_name", "country_code2", "country_name", "latitude", "longitude", "region_name"]
}
}

output {
stdout {
codec => rubydebug
}

elasticsearch {
hosts => ['127.0.0.1:9200']
}
}

延伸阅读

参考 ELKstack 中文指南