logstash输出到elasticsearch索引和映射

我试图让logstash输出到elasticsearch,但我不知道如何使用我在elasticsearch中定义的映射...

在Kibana,我做到了这一点:

创建一个像这样的索引和映射:

PUT /kafkajmx2
{
  "mappings": {
    "kafka_mbeans": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "@version": {
          "type": "integer"
        },
        "host": {
          "type": "keyword"
        },
        "metric_path": {
          "type": "text"
        },
        "type": {
          "type": "keyword"
        },
        "path": {
          "type": "text"
        },
        "metric_value_string": {
          "type": "keyword"
        },
        "metric_value_number": {
          "type": "float"
        }
      }
    }
  }

}

可以像这样写入数据:

POST /kafkajmx2/kafka_mbeans
{
  "metric_value_number":159.03478490788203,
  "path":"/home/usrxxx/logstash-5.2.0/bin/jmxconf",
  "@timestamp":"2017-02-12T23:08:40.934Z",
  "@version":"1","host":"localhost",
  "metric_path":"node1.kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec.FifteenMinuteRate",
  "type":null


}

现在我的logstash输出如下所示:

input {
        kafka {
                kafka details here
        }

}
output {

    elasticsearch {
            hosts => "http://elasticsearch:9050"
            index => "kafkajmx2"

    }

}

它只是将它写入kafkajmx2索引,但不使用地图,当我在kibana中这样查询它时:

get /kafkajmx2/kafka_mbeans/_search?q=*
{


}

我回来了:

      {
        "_index": "kafkajmx2",
        "_type": "logs",
        "_id": "AVo34xF_j-lM6k7wBavd",
        "_score": 1,
        "_source": {
          "@timestamp": "2017-02-13T14:31:53.337Z",
          "@version": "1",
          "message": """
{"metric_value_number":0,"path":"/home/usrxxx/logstash-5.2.0/bin/jmxconf","@timestamp":"2017-02-13T14:31:52.654Z","@version":"1","host":"localhost","metric_path":"node1.kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec.Count","type":null}

"""
        }
      }

我如何告诉它在logstash输出中使用map kafka_mbeans

- - -编辑 - - -

我尝试了这样的输出,但仍然得到相同的结果:

output {

        elasticsearch {
                hosts => "http://10.204.93.209:9050"
                index => "kafkajmx2"
                template_name => "kafka_mbeans"
                codec => plain {
                        format => "%{message}"
                }

        }

}

弹性搜索中的数据应如下所示:

{
  "@timestamp": "2017-02-13T14:31:52.654Z", 
  "@version": "1", 
  "host": "localhost", 
  "metric_path": "node1.kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec.Count", 
  "metric_value_number": 0, 
  "path": "/home/usrxxx/logstash-5.2.0/bin/jmxconf", 
  "type": null
}

--------编辑2 --------------

我至少通过添加一个像这样的过滤器将消息解析到json中:

input {
        kafka {
                ...kafka details....
        }

}
filter {
        json {
                source => "message"
                remove_field => ["message"]
        }
}
output {

        elasticsearch {
                hosts => "http://node1:9050"
                index => "kafkajmx2"
                template_name => "kafka_mbeans"
        }

}

它不会使用模板,但至少可以正确解析json ...所以现在我得到了这个:

  {
    "_index": "kafkajmx2",
    "_type": "logs",
    "_id": "AVo4a2Hzj-lM6k7wBcMS",
    "_score": 1,
    "_source": {
      "metric_value_number": 0.9967205071482902,
      "path": "/home/usrxxx/logstash-5.2.0/bin/jmxconf",
      "@timestamp": "2017-02-13T16:54:16.701Z",
      "@version": "1",
      "host": "localhost",
      "metric_path": "kafka1.kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent.Value",
      "type": null
    }
  }

你需要改变的很简单。 首先在你的kafka输入中使用json编解码器。 无需使用json过滤器,您可以将其删除。

    kafka {
            ...kafka details....
            codec => "json"
    }

然后,在您的elasticsearch输出中,您缺少映射类型(参数document_type below),这非常重要,否则它将默认为logs (如您所见),并且与您的kafka_mbeans映射类型不匹配。 此外,由于您的索引已经存在,因此您并不需要使用模板。 进行以下修改:

    elasticsearch {
            hosts => "http://node1:9050"
            index => "kafkajmx2"
            document_type => "kafka_mbeans"
    }

这是通过elasticsearch输出上的template_name参数定义的。

elasticsearch {
        hosts         => "http://elasticsearch:9050"
        index         => "kafkajmx2"
        template_name => "kafka_mbeans"
}

但有一个警告。 如果您想开始创建按时装箱的索引,例如每周一个索引,则必须采取更多步骤来确保您的映射与每个索引保持一致。 你有几个选择:

  • 创建一个elasticsearch模板,并将其定义为使用glob应用于索引。 如kafkajmx2-*
  • 在输出上使用template参数,该参数指定一个JSON文件,该文件定义将用于通过该输出创建的所有索引的映射。
  • 链接地址: http://www.djcxy.com/p/95071.html

    上一篇: logstash output to elasticsearch index and mapping

    下一篇: downloadButton/ downloadHandler does not recognize filename argument