본문 바로가기
BIG-DATA/LOGSTASH

[LOGSTASH] 무작정 시작하기 (3) - Filebeat To Kafka

by 허도치 2019. 11. 28.

2019/11/26 - [BIG-DATA/LOGSTASH] - [LOGSTASH] 무작정 시작하기 (1) - 설치 & 실행

2019/11/26 - [BIG-DATA/LOGSTASH] - [LOGSTASH] 무작정 시작하기 (2) - filter

 

 

  지난 포스트까지해서 Logstash의 기본적인 사용법을 알아보았다. 이번 포스트에서는 Chrome Debug 로그를 Filebeat으로 수집하고 Logstash로 집계하여 변환하고 Kafka에 적재하는 프로세스를 구현해보도록 하겠다. Filebeat과 Kafka만 잘 셋팅되어 있다면 쉽게 구현할 수 있을 것이다.

 

 

이번 포스트에서 Filebeat과 Kafka에 대한 셋팅은 다른 포스트를 참고하길 바란다.

#Filebeat 셋팅

2019/11/25 - [BIG-DATA/FILEBEAT] - [FILEBEAT] 무작정 시작하기 (1) - 설치 및 실행

 

#Kafka 셋팅

2019/11/19 - [BIG-DATA/KAFKA] - [KAFKA] 무작정 시작하기 (1) - 설치 & 실행

 

 

1. Chrome Deubg Log 수집.

   1-1. 실행.

1
"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe" --enable-logging --v=1 --user-data-dir=C:\work\chrome
cs

 

 

2. Filebeat 설정 및 실행.

   2-1. [ filebeat-7.3.1/conf.d/chrome_logstash.yml ] 작성.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - C:\work\chrome\chrome_debug.log
  fields:
    type: "CHROME_BROWSER"
    topic_name: "local-topic"
 
  multiline.pattern: '^\[\d+:\d+:\d+[/]\d+[.]\d+[:]'
  multiline.negate: true
  multiline.match: after
  
  encoding: utf-8
 
output.logstash:
  enabled: true
  hosts: [ 'localhost:5044' ]
  
logging.level: debug
path.data: C:\work\filebeat-7.3.1\data
path.logs: C:\work\filebeat-7.3.1\logs
cs

         - 16~18ln: Beat를 Logstash로 내보내기, 5044는 Logstash의 기본 포트.

 

   2-2. 실행.

1
2
3
set FILEBEAT_HOME=C:\work\filebeat-7.3.1
 
%FILEBEAT_HOME%\filebeat run -c %FILEBEAT_HOME%\conf.d\chrome_logstash.yml --d publish
cs

         - 아직 Logstash가 실행되지 않아서 Back Off가 발생함.

 

 

3. Logstash [ Beats Input Plugin ] 설정 및 테스트 실행. 

   3-1. [ logstash-7.3.1/conf.d/filebeat_to_kafka.conf ] 작성.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
input {
  beats {
    port => 5044
  }
}
 
filter {
  grok {
    match => { "message" => "^(?<log_time>\[\d+:\d+:\d+[/]\d+[.]\d+)[:](?<log_level>\w+)[:].+" }
  }
  if [log_level] == "INFO" {
    ruby {
      code => "event.set('message', event.get('message') + 10.chr)"
    }
  }
  mutate {
    lowercase => [ "log_level" ]
  }
}
 
output {
  if [log_level] {
    file {
      path => "C:/work/logstash-7.3.1/data/plugins/outputs/file/%{log_level}.log"
      codec => plain {
        format => "%{message}"
      }
    }
  }
  else {
    file {
      path => "C:/work/logstash-7.3.1/data/plugins/outputs/file/etc.log"
    }
  }
}
cs

         - 이전 포스트에서 작성했던 [ logstash.conf ]를 활용.

         - 1~5 ln: Beat에서 5044 포트로 보내오는 데이터를 수집.

 

   3-2. 테스트 실행.

1
2
3
4
set LOGSTASH_HOME=C:\work\logstash-7.3.1
set JAVA_HOME=%LOGSTASH_HOME%\java\jdk-11.0.5
 
%LOGSTASH_HOME%\bin\logstash.bat --path.config %LOGSTASH_HOME%\conf.d\filebeat_to_kafka.conf --config.reload.automatic
cs

         - Logstash가 실행되면 Filebeat에서 Beat를 전송해줌.

Filebeat 실행 로그

         - Beat를 수집하여 File로 내보내는 것이 확인됨.

Logstash 실행 로그

 

 

4. Logstash 설정 및 실행.

   4-1. [ logstash-7.3.1/conf.d/filebeat_to_kafka.conf ] 수정.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
input {
  beats {
    port => 5044
  }
}
 
filter {
  # 현재시간 계산.
  ruby {
    code => "event.set('local_timestamp', event.timestamp.time.localtime.strftime('%Y-%m-%d %H:%M:%S.%s'))"
  }
  
  # log_level 추출.
  grok {
    match => { "message" => "^(?<log_time>\[\d+:\d+:\d+[/]\d+[.]\d+)[:](?<log_level>\w+)[:].+" }
  }
  
  # log_level별로 topic 부여.
  if [log_level] == "INFO" {
    ruby {
      code => "event.set('message', event.get('message') + 10.chr)"
      add_field => { "topic_test" => "info-log-topic" }
    }
  } else if [log_level] == "WARNING" {
    mutate {
      add_field => { "topic_test" => "warning-log-topic" }
    }
  } else if [log_level] == "VERBOSE1" {
    mutate {
      add_field => { "topic_test" => "verbose-log-topic" }
    }
  }
  
  mutate {
    # 소문자로 변경.
    lowercase => [ "log_level" ]
    
    # 새로운 Field 추가.
    add_field => {
      "topic" => "%{[fields][topic_name]}"
      "browser" => "%{[fields][browser_name]}"
    }
    
    # 불필요한 Field 삭제.
    remove_field => [ "input""flags""host""fields""tags" ]
    
    # Field 수정.
    update => {
      "topic_test" => "분기처리 테스트용"
    }
  }
}
 
output {
  if [log_level] {
    kafka {
      bootstrap_servers => "localhost:9093"
      topic_id => "%{topic}"
      codec => json
      acks => "1"
      sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='producer' password='dochi';"
      sasl_mechanism => "PLAIN"
      security_protocol => "SASL_PLAINTEXT"
    }
  }
  else {
    file {
      path => "C:/work/logstash-7.3.1/data/plugins/outputs/file/etc.log"
    }
  }
}
cs

         - 9~11 ln: *현재 시간을 문자열로 계산하여 [ local_timestamp ] Field 생성.

           *현재 시간은 Logstash에서 Date 타입의 TimeZone이 [ GMT+9 ]라서 한국시간이랑 다름.

         - 14~16 ln: [ message ] Field에서 [ log_level ] Field 추출.

         - 19~32 ln: [ log_level ] Field로 분기처리하여 [ *topic_test ] Field 추가. 

            *topic_test는 분기처리를 이용하여 동적으로 [ topic ]을 할당할 수 있음을 보여줌.

         - 34~52 ln: Mutate Filter Plugin 활용.

         - 56~64 ln: Kafka Producer 설정, SASL 인증방식.

 

   4-2. 실행.

1
2
3
4
set LOGSTASH_HOME=C:\work\logstash-7.3.1
set JAVA_HOME=%LOGSTASH_HOME%\java\jdk-11.0.5
 
%LOGSTASH_HOME%\bin\logstash.bat --path.config %LOGSTASH_HOME%\conf.d\filebeat_to_kafka.conf --config.reload.automatic
cs

         - Kafka가 실행되어 있지 않기 때문에 경고가 발생함.

Logstash 실행 로그

 

 

5. Zookeeper & Kafka Server 실행.

   5-1. Zookeeper Server 실행.

1
2
3
4
5
6
7
set KAFKA_HOME=C:\work\kafka_2.12-2.1.0
set JAVA_HOME=%KAFKA_HOME%\java\jdk-11.0.5
set KAFKA_OPTS=
set KAFKA_OPTS=%KAFKA_OPTS% -Dkafka.logs.dir=%KAFKA_HOME%\logs
set KAFKA_OPTS=%KAFKA_OPTS% -Djava.security.auth.login.config=%KAFKA_HOME%\auth\zookeeper_jaas.conf
 
%KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper.properties
cs

 

   5-2. Kafka Server 실행.

1
2
3
4
5
6
7
set KAFKA_HOME=C:\work\kafka_2.12-2.1.0
set JAVA_HOME=%KAFKA_HOME%\java\jdk-11.0.5
set KAFKA_OPTS=
set KAFKA_OPTS=%KAFKA_OPTS% -Dkafka.logs.dir=%KAFKA_HOME%\logs
set KAFKA_OPTS=%KAFKA_OPTS% -Djava.security.auth.login.config=%KAFKA_HOME%\auth\kafka_jaas.conf
 
%KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server.properties
cs

         - Kafka Server가 실행되는 중간에 Logstash에 아래와 같은 로그가 출력됨.

Logstash 실행 로그

         - Kafka Server가 완전히 실행되면 더이상 [ WARN ]로그가 출력되지않음. ( 연결된 상태 )

 

 

6. Consumer Client 실행.

   6-1. 실행.

1
2
3
set KAFKA_HOME=C:\work\kafka_2.12-2.1.0
 
python %KAFKA_HOME%\consumer.py
cs

         - 실행되는 순간 Topic에 적재되어있는 데이터를 출력함. 

Consumer Client 로그

 

 

7. 마치며.

   - 지금까지 Chrome Debug 로그를 Filebeat으로 수집하여 Logstash로 전송하고, Logstash에서 데이터를 가공하여 Kafka에 적재하는 프로세스를 구현하여 보았다. Consumer Client를 열어두고 Debug 모드로 실행한 Chrome에서 이리저리 돌아다니면 Consumer Client에 로그가 실시간으로 출력되는 것을 볼 수 있다. ( 약간의 지연시간은 어쩔 수 없다 )

 

   - 지금은 Kafka로 적재하고 쌓여있는데로 데이터를 출력하기만해서 볼품없지만, Elastsicsearch에 적재하면 API나 모니터링 웹 페이지(Kibana)를 통해 조건에 맞는 로그를 조회해볼 수 있다. 우선 Kafka에 적재된 데이터를 Flume으로 수집하고 Elasticsearch와 Database에 적재하는 프로세스를 구현해보도록 하겠다.

댓글