데이터 공부/Kafka

Kafka Producer 애플리케이션 개발 - Python (Local Server 기반)

한소희DE 2022. 3. 30. 08:55

 

해당 프로젝트는 로컬 서버를 기반으로 카프카 프로듀서 애플리케이션을 개발하는 프로젝트에 대한 포스팅이다.
컨플루언트 기반 클러스터에서 카프카를 실행하기 전, 카프카의 동작 원리를 이해하기 위해 로컬에 직접 설치 및 실행해보았다.




목차

카프카 로컬 설치

서버 실행

Python Producer 개발

 


01. 카프카 로컬 설치

 


카프카 설치를 위해서는 공식 사이트를 이용해 파일을 다운로드 받아야 한다.

 

아래 공식다운로드 홈페이지 링크를 통해 접속하여, 파일을 다운받는다.

난 여기서 kafka_2.13-3.0.0 버전을 다운받았고, 해당 압축을 풀어주었다.

https://kafka.apache.org/downloads

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

 

파일을 다운로드 받았으면, 파일 위치에 맞게 시스템 환경변수를 설정해주면 된다.

참고로 나는 mac 을 사용한다.

 

환경변수 설정을 위해 .zshrc 로 이동한다.

vim ~/.zshrc

 

 

그후, PATH를 설정한다. 나는 dev 파일 내에 kafka_2.13-3.0.0 폴더를 다운로드 & 압축해제 했으므로 

아래와 같은 PATH를 추가해주었다.

export PATH="$PATH:/Users/hansohee/dev/kafka_2.13-3.0.0/bin"

 

 

 

 


02. 서버 실행

이제 로컬에서 서버를 띄워보자.

카프카를 정상적으로 실행하기 위해서는, 기본적으로 주키퍼 & 카프카 서버를 띄워야 하는데, 여기서 주키퍼 서버가 우선적으로 띄워줘야 한다.


따라서, 터미널에서 다운로드한 폴더에 접속하여 주키퍼 서버를 실행해준다.
주키퍼 서버 실행

 kafka_2.13-3.0.0 % zookeeper-server-start.sh config/zookeeper.properties

 

이후, 카프카 서버를 실행해준다.
카프카 서버 실행

 kafka_2.13-3.0.0 % kafka-server-start.sh config/server.properties


이렇게 되면 기본 설정대로, localhost:9092에 서버가 실행된다.

 

 

03. 파이썬  프로듀서 (Python Producer) 개발

 

3-1. 토픽 생성

 

이제 프로듀서를 생성해 볼 건데, 프로듀서가 바라 볼 토픽을 우선 생성해주도록 하겠다.

나는 파티션이 3개고 이름이 'test'인 토픽을 생성해보겠다.

kafka_2.13-3.0.0 % kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test --partitions 3 --replication-factor 1

 

포스팅이 오래된 블로그에서는 보통, --zookeeper:2181 을 직접 작성하여 토픽을 생성한다고 하는데,
요즘(2.2 버전 이후)은 주키퍼와 직접 통신하지 않고, 카프카 서버를 통해 토픽과 관련된 명령어를 실행할 수 있으며, 이로써 아키텍처의 복잡도를 단순화했다고 하므로 참고하자.

 

 

 

3-2. 카프카 파이썬 설치


이제 생성한 토픽으로 python-kafka를 이용해 프로듀서를 본격적으로 개발해보자.

kafka-python 를 pip 를 이용해 설치한다.

pip install kafka-python


(참고) kafka-python의 공식문서는 아래와 같다.
https://pypi.org/project/kafka-python/

 

kafka-python

Pure Python client for Apache Kafka

pypi.org

 

 

 

 

3-3. 프로듀서(Producer) 개발



이제 python 파일을 만들 디렉토리를 생성한다.
나는 kafka_python 라는 폴더를 새로 생성했고, 그 안에 각각 kafka_consumer.py, kafka_producer.py 파일을 만들었다.

pwd, ls 명령어로 확인해보면 아래와 같다.

hansohee@hansohuiui-MacBookPro kafka_python % pwd
/Users/hansohee/dev/kafka_python

hansohee@hansohuiui-MacBookPro kafka_python % ls
kafka_consumer.py kafka_producer.py

 



kafka_producer.py 는 아래와 같다.

from kafka import KafkaProducer
from json import dumps


producer = KafkaProducer(acks=0, bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'))


for i in range(100):
    data = { str(i) : 'value'+str(i)}
    producer.send('test', value=data)
    producer.flush()



KafkaProducer 파라미터 내부 설명을 하자면 아래와 같다.

  • acks: 메시지를 보낸 후 요청 완료 전 승인 수
  • bootstrap_server: 부트스트랩 리스트
  • value_serializer : Lambda를 이용해 메시지 전처리
    • json 라이브러리의 dumps를 활용: 문자열을 json타입으로 변환 & utf-8 형식으로 인코딩

아래 for문 코드대로, 반복적으로 데이터가 생성되어 프로듀서에 전달될 것이다.

 

 

 

이제 잘 들어왔는지, 파이썬으로 개발한 컨슈머가 잘 동작하는지 확인해보자.

kafka-console-consumer.sh 를 실행시켜서 확인해볼 것이다.

 

따라서 다시 터미널로 돌아와서, kafka_2.13-3.0.0 폴더 내부에서 컨슈머를 실행해준다.

설명을 하자면, 이름이 test인 topic을 바라본 뒤 컨슈머를 실행하는데, 이때 --from-beginning 이라는 옵션을 주어, 토픽의 처음부터 데이터를 확인해보도록 하겠다. (해당 옵션이 없으면, 실행된 이후로 들어오는 데이터만 확인이 가능하다.)

kafka_2.13-3.0.0 % kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning



잘 출력되면 문제없이 동작함을 알 수 있다.

이제 다음 포스팅에서는 kafka-python을 이용해 컨슈머도 생성해보도록 하겠다.