2019/11/29 - [BIG-DATA/FLUME] - [FLUME] 무작정 시작하기 (1) - 설치 및 실행
2019/12/02 - [BIG-DATA/FLUME] - [FLUME] 무작정 시작하기 (2) - Channel & Sink 의 분산처리
지난 포스트까지해서 Flume의 Channel과 Sink를 통한 분산처리를 해보았다. 이번 포스트에서는 [KAFKA 무작정 시작하기 ]에서 다루었던 Kafka를 Source로 사용할 계획이다. 로그가 적재되고 있는 Kafka만 준비되어 있다면 쉽게 따라할 수 있을 것이다.
이번 포스트에서는 Flume만 다룰것이기 때문에 로그 수집 및 Kafka에 대한 설정은 다른 포스트를 참고하기 바란다.
#[KAFKA] 무작정 시작하기
2019/11/19 - [BIG-DATA/KAFKA] - [KAFKA] 무작정 시작하기 (1) - 설치 & 실행
2019/11/20 - [BIG-DATA/KAFKA] - [KAFKA] 무작정 시작하기 (2) - Zookeeper 설정
2019/11/20 - [BIG-DATA/KAFKA] - [KAFKA] 무작정 시작하기 (3) - Kafka 설정
2019/11/21 - [BIG-DATA/KAFKA] - [KAFKA] 무작정 시작하기 (4) - Topic & Producer & Consumer 실행
2019/11/22 - [BIG-DATA/KAFKA] - [KAFKA] 무작정 시작하기 (5) - SASL 인증 설정
1. Flume 설정.
- 작업폴더: C:\work\C:\work\apache-flume-1.9.0-bin
1-1. [ ./conf/agent_dochi_kafka.conf ] 작성.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# agent_dochi_kafka.conf
# Component 이름 지정
agent_dochi.sources = kafka_src
agent_dochi.channels = mem_chnl
agent_dochi.sinks = file_sink
# Kafka Source 설정
agent_dochi.sources.kafka_src.channels = mem_chnl
agent_dochi.sources.kafka_src.type = org.apache.flume.source.kafka.KafkaSource
agent_dochi.sources.kafka_src.kafka.topics = local-topic
agent_dochi.sources.kafka_src.kafka.bootstrap.servers = localhost:9093
agent_dochi.sources.kafka_src.kafka.consumer.client.id = dochi
agent_dochi.sources.kafka_src.kafka.consumer.group.id = local-group
agent_dochi.sources.kafka_src.kafka.consumer.auto.offset.reset = latest
agent_dochi.sources.kafka_src.kafka.consumer.security.protocol = SASL_PLAINTEXT
agent_dochi.sources.kafka_src.kafka.consumer.sasl.mechanism = PLAIN
agent_dochi.sources.kafka_src.kafka.consumer.sasl.jaas.config = \
org.apache.kafka.common.security.plain.PlainLoginModule required \
username="dochi" \
password="dochi";
# Memory Channel 설정
agent_dochi.channels.mem_chnl.type = memory
agent_dochi.channels.mem_chnl.capacity = 10000
agent_dochi.channels.mem_chnl.transactionCapacity = 1000
# File Roll Sink 설정
agent_dochi.sinks.file_sink.type = file_roll
agent_dochi.sinks.file_sink.channel = mem_chnl
agent_dochi.sinks.file_sink.sink.rollInterval = 0
agent_dochi.sinks.file_sink.sink.directory = C:/work/apache-flume-1.9.0-bin/logs
agent_dochi.sinks.file_sink.sink.pathManager.prefix = kafka_
agent_dochi.sinks.file_sink.sink.pathManager.extension = log
|
cs |
- 10 ln: KafkaSource 모듈 지정.
- 12 ln: 수집할 Kafka의 Topic명, 복수 입력 가능.
- 13 ln: Kafka 접속 정보, 복수 입력 가능.
- 15 ln: Logging에 사용할 Client명.
- 16 ln: Consumer의 Group명.
- 17 ln: *처음 접속시 접근할 Offset의 위치.
- 18~23 ln: Kafka SASL인증 설정.
- 28~29 ln: *Channel의 처리량.
- 33~38 ln: File Roll Sink 설정.
1-2. *처음 접속시 접근할 Offset의 위치.
- earliest, current, latest 총 3가지가 있음.
- earliest, current는 종료된 시점의 Offset부터 데이터를 수집 ,데이터 유실율이 적음.
- latest는 가장 최신의 Offset부터 데이터를 수집, 데이터 유실이 많음.
1-3. *Channel의 처리량
- capacity는 Channel에 최대로 저장할 수 있는 event의 수, 이 값을 초과하면 Source에서 데이터를 않음, Sink에 처리속도가 늦을 경우 쌓이게 되므로 밸런스를 잘 맞추어야함.
- transactionCapacity는 Channel이 Source에서 가지고오는 이벤트의 수 또는 Sink에서 한 트랜잭션당 처리할 수 있는 이벤트의 수를 의미.
- transactionCapacity는 Source의 [ batch-size ] 보다 작아야 함.
- transactionCapacity는 capacity를 초과할 수 없음.
1-4. 공식 문서.
- Kafka Source: https://flume.apache.org/FlumeUserGuide.html#kafka-source
- Memory Channel: https://flume.apache.org/FlumeUserGuide.html#memory-channel
2. Flume 실행.
- 아래와 같이 실행 파일을 생성해서 실행하는 것을 추천함.
- 작업폴더: C:\work\C:\work\apache-flume-1.9.0-bin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
rem flume_start.bat
@echo off
rem 현재 디렉토리 설정
pushd %~dp0
set FLUME_HOME=%cd%
popd
set JAVA_HOME=%FLUME_HOME%\java\jdk-11.0.5
set AGENT_NAME=agent_dochi
set FLUME_PROPS=
set FLUME_PROPS=%FLUME_PROPS%;flume.root.logger=INFO,LOGFILE,console
set FLUME_PROPS=%FLUME_PROPS%;flume.log.dir=%FLUME_HOME%\logs
set FLUME_PROPS=%FLUME_PROPS%;flume.log.file=agent_dochi.log
set FLUME_CONF=%FLUME_HOME%\conf
set FLUME_CONF_FILE=%FLUME_CONF%\agent_dochi_kafka.conf
%FLUME_HOME%\bin\flume-ng agent -name %AGENT_NAME% -conf %FLUME_CONF% -conf-file %FLUME_CONF_FILE% -property %FLUME_PROPS%
|
cs |
2-1. [ ./flume_start.bat ] 실행


3. 실행결과.

3-1. message에 Kafka에서 수집한 로그가 정상적으로 파일로 떨어지는 것이 확인됨.
4. 마치며.
- 이번 포스트에서는 간단한 설정으로 Kafka Source를 연결하여 보았다. Filebeat, Logstash, Kafka처럼 매뉴얼대로 설정파일만 잘 작성해주면 끝이다. 다만, 각자의 상황에 맞게 사용하려면 옵션들을 잘 숙지하고 있어야하므로 반드시 공식 문서를 읽어보길 바란다.
- 다음 포스트에서는 Kafka Source에서 수집한 데이터를 Elasticsearch에 적재해보도록 하겠다.
'BIG-DATA > FLUME' 카테고리의 다른 글
[FLUME] 무작정 시작하기 (2) - Channel & Sink 의 분산처리 (0) | 2019.12.02 |
---|---|
[FLUME] 무작정 시작하기 (1) - 설치 및 실행 (0) | 2019.11.29 |
댓글