본문 바로가기
BIG-DATA/FLUME

[FLUME] 무작정 시작하기 (3) - Kafka Source

by 허도치 2019. 12. 3.

2019/11/29 - [BIG-DATA/FLUME] - [FLUME] 무작정 시작하기 (1) - 설치 및 실행

2019/12/02 - [BIG-DATA/FLUME] - [FLUME] 무작정 시작하기 (2) - Channel & Sink 의 분산처리

 

 

  지난 포스트까지해서 Flume의 Channel과 Sink를 통한 분산처리를 해보았다. 이번 포스트에서는 [KAFKA 무작정 시작하기 ]에서 다루었던 Kafka를 Source로 사용할 계획이다. 로그가 적재되고 있는 Kafka만 준비되어 있다면 쉽게 따라할 수 있을 것이다.

  이번 포스트에서는 Flume만 다룰것이기 때문에 로그 수집 및 Kafka에 대한 설정은 다른 포스트를 참고하기 바란다.

 

 

#[KAFKA] 무작정 시작하기

2019/11/19 - [BIG-DATA/KAFKA] - [KAFKA] 무작정 시작하기 (1) - 설치 & 실행

2019/11/20 - [BIG-DATA/KAFKA] - [KAFKA] 무작정 시작하기 (2) - Zookeeper 설정

2019/11/20 - [BIG-DATA/KAFKA] - [KAFKA] 무작정 시작하기 (3) - Kafka 설정

2019/11/21 - [BIG-DATA/KAFKA] - [KAFKA] 무작정 시작하기 (4) - Topic & Producer & Consumer 실행

2019/11/22 - [BIG-DATA/KAFKA] - [KAFKA] 무작정 시작하기 (5) - SASL 인증 설정

 

 

 

1. Flume 설정.

   - 작업폴더: C:\work\C:\work\apache-flume-1.9.0-bin

 

   1-1. [ ./conf/agent_dochi_kafka.conf ] 작성.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# agent_dochi_kafka.conf
 
# Component 이름 지정
agent_dochi.sources = kafka_src
agent_dochi.channels = mem_chnl
agent_dochi.sinks = file_sink
 
# Kafka Source 설정
agent_dochi.sources.kafka_src.channels = mem_chnl
agent_dochi.sources.kafka_src.type = org.apache.flume.source.kafka.KafkaSource
 
agent_dochi.sources.kafka_src.kafka.topics = local-topic
agent_dochi.sources.kafka_src.kafka.bootstrap.servers = localhost:9093
 
agent_dochi.sources.kafka_src.kafka.consumer.client.id = dochi
agent_dochi.sources.kafka_src.kafka.consumer.group.id = local-group
agent_dochi.sources.kafka_src.kafka.consumer.auto.offset.reset = latest
agent_dochi.sources.kafka_src.kafka.consumer.security.protocol = SASL_PLAINTEXT
agent_dochi.sources.kafka_src.kafka.consumer.sasl.mechanism = PLAIN
agent_dochi.sources.kafka_src.kafka.consumer.sasl.jaas.config = \
  org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="dochi" \
    password="dochi";
 
    
# Memory Channel 설정
agent_dochi.channels.mem_chnl.type = memory
agent_dochi.channels.mem_chnl.capacity = 10000
agent_dochi.channels.mem_chnl.transactionCapacity = 1000
 
 
# File Roll Sink 설정
agent_dochi.sinks.file_sink.type = file_roll
agent_dochi.sinks.file_sink.channel = mem_chnl
agent_dochi.sinks.file_sink.sink.rollInterval = 0
agent_dochi.sinks.file_sink.sink.directory = C:/work/apache-flume-1.9.0-bin/logs
agent_dochi.sinks.file_sink.sink.pathManager.prefix = kafka_
agent_dochi.sinks.file_sink.sink.pathManager.extension = log
cs

         - 10 ln: KafkaSource 모듈 지정.

         - 12 ln: 수집할 Kafka의 Topic명, 복수 입력 가능.

         - 13 ln: Kafka 접속 정보, 복수 입력 가능.

         - 15 ln: Logging에 사용할 Client명.

         - 16 ln: Consumer의 Group명.

         - 17 ln: *처음 접속시 접근할 Offset의 위치

         - 18~23 ln: Kafka SASL인증 설정.

         - 28~29 ln: *Channel의 처리량.

         - 33~38 ln: File Roll Sink 설정.

 

   1-2. *처음 접속시 접근할 Offset의 위치.

         - earliest, current, latest 총 3가지가 있음.

         - earliest, current는 종료된 시점의 Offset부터 데이터를 수집 ,데이터 유실율이 적음.

         - latest는 가장 최신의 Offset부터 데이터를 수집, 데이터 유실이 많음.

 

   1-3. *Channel의 처리량

         - capacity는 Channel에 최대로 저장할 수 있는 event의 수, 이 값을 초과하면 Source에서 데이터를 않음, Sink에 처리속도가 늦을 경우 쌓이게 되므로 밸런스를 잘 맞추어야함.

         - transactionCapacity는 Channel이 Source에서 가지고오는 이벤트의 수 또는 Sink에서 한 트랜잭션당 처리할 수 있는 이벤트의 수를 의미.

         - transactionCapacity는 Source의 [ batch-size ] 보다 작아야 함.

         - transactionCapacity는 capacity를 초과할 수 없음.

 

   1-4. 공식 문서.

         - Kafka Source:  https://flume.apache.org/FlumeUserGuide.html#kafka-source

         - Memory Channel: https://flume.apache.org/FlumeUserGuide.html#memory-channel

 

 

2. Flume 실행.

   - 아래와 같이 실행 파일을 생성해서 실행하는 것을 추천함.

   - 작업폴더: C:\work\C:\work\apache-flume-1.9.0-bin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
rem flume_start.bat
 
@echo off
 
rem 현재 디렉토리 설정
pushd %~dp0
set FLUME_HOME=%cd%
popd
 
set JAVA_HOME=%FLUME_HOME%\java\jdk-11.0.5
 
set AGENT_NAME=agent_dochi
 
set FLUME_PROPS=
set FLUME_PROPS=%FLUME_PROPS%;flume.root.logger=INFO,LOGFILE,console
set FLUME_PROPS=%FLUME_PROPS%;flume.log.dir=%FLUME_HOME%\logs
set FLUME_PROPS=%FLUME_PROPS%;flume.log.file=agent_dochi.log
set FLUME_CONF=%FLUME_HOME%\conf
set FLUME_CONF_FILE=%FLUME_CONF%\agent_dochi_kafka.conf
 
%FLUME_HOME%\bin\flume-ng agent -name %AGENT_NAME% -conf %FLUME_CONF% -conf-file %FLUME_CONF_FILE% -property %FLUME_PROPS%
cs

 

   2-1. [ ./flume_start.bat ] 실행

실행 로그
접속 성공 로그

 

 

3. 실행결과.

   3-1. message에 Kafka에서 수집한 로그가 정상적으로 파일로 떨어지는 것이 확인됨.

 

 

4. 마치며.

   - 이번 포스트에서는 간단한 설정으로 Kafka Source를 연결하여 보았다. Filebeat, Logstash, Kafka처럼 매뉴얼대로 설정파일만 잘 작성해주면 끝이다. 다만, 각자의 상황에 맞게 사용하려면 옵션들을 잘 숙지하고 있어야하므로 반드시 공식 문서를 읽어보길 바란다.

   - 다음 포스트에서는 Kafka Source에서 수집한 데이터를 Elasticsearch에 적재해보도록 하겠다.

댓글