Shipper

Shipper의 책임은 Kafka 토픽, 즉 생산자로 수신되는 데이터를 즉시 전송하는 것

Indexer

Indexer는 Kafka 토픽을 전처리 수행 후 Elasticsearch로 전송하는 것



설치


설정

  1. FileBeats 설정

    > filebeat -e setup
    #output.elasticsearch:
      #hosts: ["localhost:9200"]
    
    
    output.logstash:
      hosts: ["localhost:5044"]
    > filebeat modules enable apache
    
    
    #access_log
    var.paths: ["/private/var/log/apache2/access_log*"]
    
    #error_log
    var.paths: ["/private/var/log/apache2/error_log*"]



  2. Logstash(Shipper)

    input {
      beats {
        port => 5044
      }
    }
    
    output {
      kafka {
        bootstrap_servers => "127.0.0.1:9092"
        topic_id => "apache-log"
        codec => json
      }
      stdout {
        codec => json
      }
    }
  3. Logstash(Indexer)

    input {
      kafka {
        bootstrap_servers => "127.0.0.1:9092"
        topics => "apache-log"
        group_id => "apache-log-logstash"
        codec => json
      }
    }
    
    filter {
      if [fileset][module] == "apache2" {
        if [fileset][name] == "access" {
          grok {
            match => { "message" => ["%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \[%{HTTPDATE:[apache2][access][time]}\] \"%{WORD:[apache2][access][method]} %{DATA:[apache2][access][url]} HTTP/%{NUMBER:[apache2][access][http_version]}\" %{NUMBER:[apache2][access][response_code]} %{NUMBER:[apache2][access][body_sent][bytes]}( \"%{DATA:[apache2][access][referrer]}\")?( \"%{DATA:[apache2][access][agent]}\")?",
              "%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \\[%{HTTPDATE:[apache2][access][time]}\\] \"-\" %{NUMBER:[apache2][access][response_code]} -" ] }
            remove_field => "message"
          }
          mutate {
            add_field => { "read_timestamp" => "%{@timestamp}" }
          }
          date {
            match => [ "[apache2][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
            remove_field => "[apache2][access][time]"
          }
          useragent {
            source => "[apache2][access][agent]"
            target => "[apache2][access][user_agent]"
            remove_field => "[apache2][access][agent]"
          }
          geoip {
            source => "[apache2][access][remote_ip]"
            target => "[apache2][access][geoip]"
          }
        }
        else if [fileset][name] == "error" {
          grok {
            match => { "message" => ["\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{LOGLEVEL:[apache2][error][level]}\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message]}",
              "\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{DATA:[apache2][error][module]}:%{LOGLEVEL:[apache2][error][level]}\] \[pid %{NUMBER:[apache2][error][pid]}(:tid %{NUMBER:[apache2][error][tid]})?\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message1]}" ] }
            pattern_definitions => {
              "APACHE_TIME" => "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}"
            }
            remove_field => "message"
          }
          mutate {
            rename => { "[apache2][error][message1]" => "[apache2][error][message]" }
          }
          date {
            match => [ "[apache2][error][timestamp]", "EEE MMM dd H:m:s YYYY", "EEE MMM dd H:m:s.SSSSSS YYYY" ]
            remove_field => "[apache2][error][timestamp]"
          }
        }
      }
    }
    
    output {
      elasticsearch {
        hosts => localhost
        manage_template => false
        index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
      }
      stdout {
        codec => json
      }
    }


실행

  1. Elasticsearch 실행

    > bin/elasticsearch
  2. Kibana 실행

    > bin/kibana
  3. Kafka 실행

    > bin/zookeeper-server-start.sh config/zookeeper.properties
    > bin/kafka-server-start.sh config/server.properties
    
    
    > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic apache-log
    
    
    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic apache-log --from-beginning
    config/server.properties에 advertised.listeners = PLAINTEXT://localhost:9092 추가
  4. Indexer 실행

    > bin/logstash -f ./config/logstash-indexer.conf
  5. Shipper 실행

    > bin/logstash -f ./config/logstash-shipper.conf
  6. FileBeats 실행

    > filebeat -e

확인

http://127.0.0.1:5601



부록

  1. FileBeats → Kafka
     



    output.kafka:
      hosts: ["localhost:9092"]
      topic: "apache-log"
      codec.json:
        pretty: false
  2. Ingest Node pipeline 사용하기

    > filebeat setup --pipelines --modules apache

    위 명령어 실행시에는 filebeat.yml의 output이 Elasticsearch 이어야 함



    input {
      kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["apache-log"]
        codec => json
      }
    }
    
    output {
      if [@metadata][pipeline] {
        elasticsearch {
          hosts => localhost
          manage_template => false
          index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
          pipeline => "%{[@metadata][pipeline]}"
        }
    
        stdout {
          codec => json
        }
      } else {
        elasticsearch {
          hosts => localhost
          manage_template => false
          index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
        }
    
        stdout {
          codec => json
        }
      }
    }