개인(팀) 프로젝트/기타 프로젝트 & 활동

06. 플럼과 카프카 기능 구현 방법

한소희DE 2021. 6. 17. 15:12

안녕하세요 한소희입니다. 공부를 통해 배운 내용을 작성하고 있습니다. 혹여 해당 포스팅에서 잘못된 부분이 있을 경우, 알려주시면 빠르게 수정 조치하도록 하겠습니다. 감사합니다.

 

 

 

목차

플럼 에이전트 생성하기

카프카 기능 구현

 


 

 

 

01. 플럼 에이전트 생성하기

 

우선 클라우데라 매니저에서 플럼 - 구성을 클릭한 뒤 맨 아래쪽으로 쭉 내려보자.

플럼의 Agent 이름 그리고 구성파일 영역에 에이전트를 생성해볼 것이다.

 

에이전트의 이름은 각각 SmartCar Agent와, DriveCarInfo Agent이다.

에이전트가 두 개밖에 없으므로, 한 개의 conf 파일에 두 에이전트를 정의할 것이다.

 

 

 

1-1. SamartCar (배치 로그파일) 에이전트 생성

 

왜 spooldir 를 사용하는지 등은 수집 기능 요구사항 정의를 했던 이전 포스팅에서 작성해두었다.

 

⬇ 이전 포스팅 링크

https://eng-sohee.tistory.com/54

 

04. 수집 요구사항 정의 + HDFS, 주키퍼 설치 및 실행

오늘은 우리의 프로젝트 수집 요구사항을 구체적으로 정의해보고, 수집에 필요한 프로그램 중 HDFS와 주키퍼를 설치해보는 실습을 해볼 것이다. 그리고, 간단한 HDFS와 주키퍼 실행을 공부해볼 것

eng-sohee.tistory.com

 

 

그리고, 인터셉터를 걸어서, 들어온 데이터들을 전달하기 전에 필터링을 해주면 된다. 정규식을 사용했는데, 숫자 14자리인 데이터만 정상적으로 넘어가게 한 것이다. 그리고 조건에 해당하지 않는 데이터들은 immediate 즉, 모두 다 제외를 시키겠다고 명령을 준 것이다.

 

SmartCar_Agent.sources  = SmartCarInfo_SpoolSource
SmartCar_Agent.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink 

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false


SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000


SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel

 

 

1-2. DriveCarInfo (리얼타임 로그파일) 에이전트 생성

 

소스와 채널, 카프카를 정의했다. 리얼타임 로그파일은 싱크 하는 목적지가 카프카이므로 싱크명에 카프카를 적어주었다. 

 

SmartCar_Agent.sources  = SmartCarInfo_SpoolSource DriverCarInfo_TailSource
SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink DriverCarInfo_KafkaSink

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working /car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false


SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000


SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel

 

 

1-3. 최종 Conf 파일

 

최종 완성은 아래와 같다.

 

SmartCar_Agent.sources  = SmartCarInfo_SpoolSource DriverCarInfo_TailSource
SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink DriverCarInfo_KafkaSink

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false


SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000


SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel



SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec
SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true
SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\d{14}
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000


SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory
SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000
SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000


SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel

 

 


 

02. 카프카 기능구현

 

2-1. 카프카 토픽 생성

토픽 명은 SmartCar-Topic으로 설정하겠다.

 

 

 

replication-factor을 1로 준 이유?

단일 카프카 브로커로 만들기 위해서다. 만약 다중 브로커를 생성하고 싶으면(=전송한 데이터를 n개만큼 복제하고 싶으면) n으로 할당해도 무방하다.
partitions 파라미터란?
해당 토픽에 partitions의 개수만큼 데이터가 분리 저장하게 된다. 여기서는 1로 설정하였다.

 

🔥 토픽이 정상적으로 생성되지 않을 땐 어떻게 하면 될까? 토픽 생성 오류 해결 방법!

정확히 잘 기입했는데도 생성되지 않아서 원인을 찾아보니 서버, 클라우데라 매니저 로그인이 풀려 있었다. 반드시 VM에서 서버와 클라우데라 매니저 로그인이 풀려있는지 확인해보아야 한다!

 

이번엔 카프카의 토픽을 생성했으니, 이 토픽에 Producer가 데이터를 전달하고, 그것을 토픽이 저장했다가 Consumer가 전달받는 상황을 만들어보며 실습할 것이다.

 

 

 

2-2. 프로듀서를 통해 데이터 전달

 

 

위와 같이 'Hello!'라는 데이터를 프로듀서로 위에 만든 토픽에 전달해볼 것이다. 명령어는 아래와 같다.

kafka-console-producer --broker-list server02.hadoop.com:9092 -topic SmartCar-Topic

 

 

Enter를 눌러주면 전달이 된 것이다. PuTTY창을 닫지 말고 새 PuTTY창(Server02)을 열어, 컨슈머를 생성해보겠다.

스마트카 토픽과 연결된 프로듀서로 Hello! 문장을 넣었으니, 스마트카 토픽과 연결된 컨슈머에서는 Hello! 가 출력될 것이다. 시행한 결과는 아래와 같다.

 

kafka-console-consumer --bootstrap-server server02.hadoop.com:9092 --topic SmartCar-Topic --partition 0 --from-beginning

 

 

이렇게 나오면 잘 된 것이다. 

 

만약 컨슈머를 하나 더 사용하고 싶으면 어떻게 해야 할까?
위 컨슈머 실행 명령어를, 새 PuTTY창으로 하나 더 띄워 실행하면 된다. 그러면 컨슈머가 하나 더 생긴 것이다.

 

 

 

 

좌측은 프로듀서, 우측은 컨슈머다.

프로듀서에 문자를 하나 더 기입하면, 컨슈머에도 바로바로 띄워지는 것을 알 수 있다.