logstash输出到elasticsearch索引和映射
我试图让logstash输出到elasticsearch,但我不知道如何使用我在elasticsearch中定义的映射...
在Kibana,我做到了这一点:
创建一个像这样的索引和映射:
PUT /kafkajmx2
{
"mappings": {
"kafka_mbeans": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "integer"
},
"host": {
"type": "keyword"
},
"metric_path": {
"type": "text"
},
"type": {
"type": "keyword"
},
"path": {
"type": "text"
},
"metric_value_string": {
"type": "keyword"
},
"metric_value_number": {
"type": "float"
}
}
}
}
}
可以像这样写入数据:
POST /kafkajmx2/kafka_mbeans
{
"metric_value_number":159.03478490788203,
"path":"/home/usrxxx/logstash-5.2.0/bin/jmxconf",
"@timestamp":"2017-02-12T23:08:40.934Z",
"@version":"1","host":"localhost",
"metric_path":"node1.kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec.FifteenMinuteRate",
"type":null
}
现在我的logstash输出如下所示:
input {
kafka {
kafka details here
}
}
output {
elasticsearch {
hosts => "http://elasticsearch:9050"
index => "kafkajmx2"
}
}
它只是将它写入kafkajmx2
索引,但不使用地图,当我在kibana中这样查询它时:
get /kafkajmx2/kafka_mbeans/_search?q=*
{
}
我回来了:
{
"_index": "kafkajmx2",
"_type": "logs",
"_id": "AVo34xF_j-lM6k7wBavd",
"_score": 1,
"_source": {
"@timestamp": "2017-02-13T14:31:53.337Z",
"@version": "1",
"message": """
{"metric_value_number":0,"path":"/home/usrxxx/logstash-5.2.0/bin/jmxconf","@timestamp":"2017-02-13T14:31:52.654Z","@version":"1","host":"localhost","metric_path":"node1.kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec.Count","type":null}
"""
}
}
我如何告诉它在logstash输出中使用map kafka_mbeans
?
- - -编辑 - - -
我尝试了这样的输出,但仍然得到相同的结果:
output {
elasticsearch {
hosts => "http://10.204.93.209:9050"
index => "kafkajmx2"
template_name => "kafka_mbeans"
codec => plain {
format => "%{message}"
}
}
}
弹性搜索中的数据应如下所示:
{
"@timestamp": "2017-02-13T14:31:52.654Z",
"@version": "1",
"host": "localhost",
"metric_path": "node1.kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec.Count",
"metric_value_number": 0,
"path": "/home/usrxxx/logstash-5.2.0/bin/jmxconf",
"type": null
}
--------编辑2 --------------
我至少通过添加一个像这样的过滤器将消息解析到json中:
input {
kafka {
...kafka details....
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => "http://node1:9050"
index => "kafkajmx2"
template_name => "kafka_mbeans"
}
}
它不会使用模板,但至少可以正确解析json ...所以现在我得到了这个:
{
"_index": "kafkajmx2",
"_type": "logs",
"_id": "AVo4a2Hzj-lM6k7wBcMS",
"_score": 1,
"_source": {
"metric_value_number": 0.9967205071482902,
"path": "/home/usrxxx/logstash-5.2.0/bin/jmxconf",
"@timestamp": "2017-02-13T16:54:16.701Z",
"@version": "1",
"host": "localhost",
"metric_path": "kafka1.kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent.Value",
"type": null
}
}
你需要改变的很简单。 首先在你的kafka
输入中使用json
编解码器。 无需使用json
过滤器,您可以将其删除。
kafka {
...kafka details....
codec => "json"
}
然后,在您的elasticsearch
输出中,您缺少映射类型(参数document_type
below),这非常重要,否则它将默认为logs
(如您所见),并且与您的kafka_mbeans
映射类型不匹配。 此外,由于您的索引已经存在,因此您并不需要使用模板。 进行以下修改:
elasticsearch {
hosts => "http://node1:9050"
index => "kafkajmx2"
document_type => "kafka_mbeans"
}
这是通过elasticsearch
输出上的template_name
参数定义的。
elasticsearch {
hosts => "http://elasticsearch:9050"
index => "kafkajmx2"
template_name => "kafka_mbeans"
}
但有一个警告。 如果您想开始创建按时装箱的索引,例如每周一个索引,则必须采取更多步骤来确保您的映射与每个索引保持一致。 你有几个选择:
kafkajmx2-*
template
参数,该参数指定一个JSON文件,该文件定义将用于通过该输出创建的所有索引的映射。 上一篇: logstash output to elasticsearch index and mapping
下一篇: downloadButton/ downloadHandler does not recognize filename argument