Press "Enter" to skip to content

ELK+KAFKA

ELK作为日志写入方案,如果ES性能低,可能会有积压有风险,中间加入一层Kafka,可能会更加可靠。

input {
	tcp {
		port => 5000
		codec =>"json_lines"
		type => "log-logstash"
	}
    tcp {
		port => 5001
		codec =>"json_lines"
		type => "log-api"
	}
    tcp {
		port => 5002
		codec =>"json_lines"
		type => "log-event"
	}
    kafka {
        bootstrap_servers=> "bootstrap_servers:9200"
        sasl_mechanism => "PLAIN"
        security_protocol => "SASL_PLAINTEXT"
        client_id => "mbl.loges"
        decorate_events => true
        group_id => "loges"
        topics => ["log-logstash","log-api","log-event"]
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='mbl'  password='token';"
        type => "log-kafka"
    }
}

## Add your filters / logstash plugins configuration here

filter {
    if [loglevel] == "debug" {
        drop { }
    }

    if [type] == "log-api" {
        if [heartbeat] == "log" {
            drop { }
        }
    }
    if [type] == "log-event" {
        if [heartbeat] == "log" {
            drop { }
        }
    }

}

output {
    if [type] == "log-logstash" {
        kafka {
            bootstrap_servers=> "bootstrap_servers:9200"
            sasl_mechanism => "PLAIN"
            security_protocol => "SASL_PLAINTEXT"
            client_id => "mbl"
            topic_id => "jdlog-app"
            codec => json
            sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='mbl'  password='token';"
        }
    }
    if [type] == "log-api" {
        kafka {
            bootstrap_servers=> "bootstrap_servers:9200"
            sasl_mechanism => "PLAIN"
            security_protocol => "SASL_PLAINTEXT"
            client_id => "mbl"
            topic_id => "log-api"
            codec => json
            sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='mbl'  password='token';"
        }
    }
    if [type] == "log-event" {
        kafka {
            bootstrap_servers=> "bootstrap_servers:9200"
            sasl_mechanism => "PLAIN"
            security_protocol => "SASL_PLAINTEXT"
            client_id => "mbl"
            topic_id => "log-event"
            codec => json
            sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='mbl'  password='token';"
        }
    }

    if [type] == "log-kafka" {
        elasticsearch {
            hosts => "elasticsearch:9200"
            index => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
            user => elastic
            password => password
        }
    }

}

decorate_events => true

这个配置可以让你在metadata里面读取到topic信息。

后续:上面的解决方案是个大坑啊, 我以为配置OK了就来写记录了,结果发现一堆问题,搞了我一下午。

上面的配置,其实数据是能跑的,但是我在ES里面看日志的时候,就发现,我的数据没有被json化,都放在message里面了。

纳尼? 哪里不对吧???

然后看了一堆资料,发现,我是应该在kafka的input里面加个

codec => json {}

然后我重新启动了logstash之后,我发现,我的elasticsearch里面接收不到数据了!!!

也不报错, 也没数据进ES, 我炸了啊! 你至少报错呗, 也不报错!!

然后我看到网上一个老兄也是我这个状况:https://discuss.elastic.co/t/codec-json-is-not-working/73637/5 也没得到答案,我无语了啊。。。

经过一下午的查资料和尝试,都没有解决问题。

然后我开始尝试分析这个问题。。

我尝试自己手动往kafka里面丢一个数据。 我手动丢的数据居然进ES了!!

因为我的数据流是

数据源 -> logstash -> kafka -> logstash -> ES

然后我发现我手动丢进kafka的数据和 我的logstash丢进去的数据差别在于,logstash丢进去的数据多了几个标记, 分别是 @version、@type、@timestamp。我分别试了下,最后确认是这个type导致了!

妈的!我瞬间顿悟了,原来的配置的话,这个type 是我在logstash里面核心标记,用来区别数据是从哪来的,怎么扭转的。如果第一次跑过logstash的数据,这个type在第二次进来的时候没有被更新,那就死循环了, 数据流就变成

数据源 -> logstash -> kafka -> logstash -> kafka -> logstash

这样一种死循环了,因为这个type没有被 kafka的input改掉,始终都是 之前的,始终就在那loop。。。

我看了眼kafka里面的历史数据,果然! 数据一种在里面loop。。。有很多一样的数据。

好吧,那解决方案呢?

我查了下资料,想找到什么能强制打标机type的方法,或者在output里面把这个type去掉的方法,结果来找到。

然后我又发现,他有个用tags打标机的方法,好吧,那我就用这个tags进行二次标机吧。

问题解决!!

新的配置如下:

input {
	tcp {
		port => 5000
		codec =>"json_lines"
		type => "jdlog-app"
	}
    tcp {
		port => 5001
		codec =>"json_lines"
		type => "jdlog-jsf"
	}
    tcp {
		port => 5002
		codec =>"json_lines"
		type => "jdlog-http"
	}
    kafka {
        bootstrap_servers=> "bootstrap_servers:9200"
        sasl_mechanism => "PLAIN"
        security_protocol => "SASL_PLAINTEXT"
        client_id => "mbl.jdloges"
        decorate_events => true
        codec => json {}
        group_id => "jdloges"
        topics => ["jdlog-app","jdlog-jsf","jdlog-http"]
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='mbl'  password='token';"
        tags => ["from_kafka"]
    }
}

## Add your filters / logstash plugins configuration here

filter {
    if [loglevel] == "debug" {
        drop { }
    }

    if [type] == "jdlog-jsf" {
        if [heartbeat] == "jd-log" {
            drop { }
        }
    }
    if [type] == "jdlog-http" {
        if [heartbeat] == "jd-log" {
            drop { }
        }
    }

}

output {
    if "from_kafka" in [tags] {
        elasticsearch {
            hosts => "elasticsearch:9200"
            index => "%{[type]}-%{+YYYY.MM.dd}"
            user => elastic
            password => Spring01ELK
        }
        stdout {
            codec => rubydebug
        }
    }else{
        if [type] == "jdlog-app" {
            kafka {
                bootstrap_servers=> "bootstrap_servers:9200"
                sasl_mechanism => "PLAIN"
                security_protocol => "SASL_PLAINTEXT"
                client_id => "mbl"
                topic_id => "jdlog-app"
                codec => json
                sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='mbl'  password='token';"
            }
        }
        if [type] == "jdlog-jsf" {
            kafka {
                bootstrap_servers=> "bootstrap_servers:9200"
                sasl_mechanism => "PLAIN"
                security_protocol => "SASL_PLAINTEXT"
                client_id => "mbl"
                topic_id => "jdlog-jsf"
                codec => json
                sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='mbl'  password='token';"
            }
        }
        if [type] == "jdlog-http" {
            kafka {
                bootstrap_servers=> "bootstrap_servers:9200"
                sasl_mechanism => "PLAIN"
                security_protocol => "SASL_PLAINTEXT"
                client_id => "mbl"
                topic_id => "jdlog-http"
                codec => json
                sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='mbl'  password='token';"
            }
        }
    }
 
}

参考资料:

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html

https://stackoverflow.com/questions/61075630/show-kafka-topic-title-as-a-field-in-kibana-logstash-add-field

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-decorate_events

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注