해당 프로젝트는 로컬 서버를 기반으로 카프카 프로듀서 애플리케이션을 개발하는 프로젝트에 대한 포스팅이다.
컨플루언트 기반 클러스터에서 카프카를 실행하기 전, 카프카의 동작 원리를 이해하기 위해 로컬에 직접 설치 및 실행해보았다.
목차
카프카 로컬 설치
서버 실행
Python Producer 개발
01. 카프카 로컬 설치
카프카 설치를 위해서는 공식 사이트를 이용해 파일을 다운로드 받아야 한다.
아래 공식다운로드 홈페이지 링크를 통해 접속하여, 파일을 다운받는다.
난 여기서 kafka_2.13-3.0.0 버전을 다운받았고, 해당 압축을 풀어주었다.
https://kafka.apache.org/downloads
파일을 다운로드 받았으면, 파일 위치에 맞게 시스템 환경변수를 설정해주면 된다.
참고로 나는 mac 을 사용한다.
환경변수 설정을 위해 .zshrc 로 이동한다.
vim ~/.zshrc
그후, PATH를 설정한다. 나는 dev 파일 내에 kafka_2.13-3.0.0 폴더를 다운로드 & 압축해제 했으므로
아래와 같은 PATH를 추가해주었다.
export PATH="$PATH:/Users/hansohee/dev/kafka_2.13-3.0.0/bin"
02. 서버 실행
이제 로컬에서 서버를 띄워보자.
카프카를 정상적으로 실행하기 위해서는, 기본적으로 주키퍼 & 카프카 서버를 띄워야 하는데, 여기서 주키퍼 서버가 우선적으로 띄워줘야 한다.
따라서, 터미널에서 다운로드한 폴더에 접속하여 주키퍼 서버를 실행해준다.
주키퍼 서버 실행
kafka_2.13-3.0.0 % zookeeper-server-start.sh config/zookeeper.properties
이후, 카프카 서버를 실행해준다.
카프카 서버 실행
kafka_2.13-3.0.0 % kafka-server-start.sh config/server.properties
이렇게 되면 기본 설정대로, localhost:9092에 서버가 실행된다.
03. 파이썬 프로듀서 (Python Producer) 개발
3-1. 토픽 생성
이제 프로듀서를 생성해 볼 건데, 프로듀서가 바라 볼 토픽을 우선 생성해주도록 하겠다.
나는 파티션이 3개고 이름이 'test'인 토픽을 생성해보겠다.
kafka_2.13-3.0.0 % kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test --partitions 3 --replication-factor 1
포스팅이 오래된 블로그에서는 보통, --zookeeper:2181 을 직접 작성하여 토픽을 생성한다고 하는데,
요즘(2.2 버전 이후)은 주키퍼와 직접 통신하지 않고, 카프카 서버를 통해 토픽과 관련된 명령어를 실행할 수 있으며, 이로써 아키텍처의 복잡도를 단순화했다고 하므로 참고하자.
3-2. 카프카 파이썬 설치
이제 생성한 토픽으로 python-kafka를 이용해 프로듀서를 본격적으로 개발해보자.
kafka-python 를 pip 를 이용해 설치한다.
pip install kafka-python
(참고) kafka-python의 공식문서는 아래와 같다.
https://pypi.org/project/kafka-python/
3-3. 프로듀서(Producer) 개발
이제 python 파일을 만들 디렉토리를 생성한다.
나는 kafka_python 라는 폴더를 새로 생성했고, 그 안에 각각 kafka_consumer.py, kafka_producer.py 파일을 만들었다.
pwd, ls 명령어로 확인해보면 아래와 같다.
hansohee@hansohuiui-MacBookPro kafka_python % pwd
/Users/hansohee/dev/kafka_python
hansohee@hansohuiui-MacBookPro kafka_python % ls
kafka_consumer.py kafka_producer.py
kafka_producer.py 는 아래와 같다.
from kafka import KafkaProducer
from json import dumps
producer = KafkaProducer(acks=0, bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
for i in range(100):
data = { str(i) : 'value'+str(i)}
producer.send('test', value=data)
producer.flush()
KafkaProducer 파라미터 내부 설명을 하자면 아래와 같다.
- acks: 메시지를 보낸 후 요청 완료 전 승인 수
- bootstrap_server: 부트스트랩 리스트
- value_serializer : Lambda를 이용해 메시지 전처리
- json 라이브러리의 dumps를 활용: 문자열을 json타입으로 변환 & utf-8 형식으로 인코딩
아래 for문 코드대로, 반복적으로 데이터가 생성되어 프로듀서에 전달될 것이다.
이제 잘 들어왔는지, 파이썬으로 개발한 컨슈머가 잘 동작하는지 확인해보자.
kafka-console-consumer.sh 를 실행시켜서 확인해볼 것이다.
따라서 다시 터미널로 돌아와서, kafka_2.13-3.0.0 폴더 내부에서 컨슈머를 실행해준다.
설명을 하자면, 이름이 test인 topic을 바라본 뒤 컨슈머를 실행하는데, 이때 --from-beginning 이라는 옵션을 주어, 토픽의 처음부터 데이터를 확인해보도록 하겠다. (해당 옵션이 없으면, 실행된 이후로 들어오는 데이터만 확인이 가능하다.)
kafka_2.13-3.0.0 % kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
잘 출력되면 문제없이 동작함을 알 수 있다.
이제 다음 포스팅에서는 kafka-python을 이용해 컨슈머도 생성해보도록 하겠다.
'데이터 공부 > Kafka' 카테고리의 다른 글
Kafka Consumer 애플리케이션 개발 - Python (Local Server 기반) (0) | 2022.03.31 |
---|