데이터 공부/Kafka

Kafka Consumer 애플리케이션 개발 - Python (Local Server 기반)

한소희DE 2022. 3. 31. 22:21
해당 프로젝트는 로컬 서버를 기반으로 카프카 프로듀서 애플리케이션을 개발하는 프로젝트에 대한 포스팅이다.
컨플루언트 기반 클러스터에서 카프카를 실행하기 전, 카프카의 동작 원리를 이해하기 위해 로컬에 직접 설치 및 실행해보았다.

 


목차

Python Consumer 개발


 

01. Python Consumer 개발

 

1-1. 개요

앞선 포스팅에서, 카프카를 로컬에 설치하는 방법 그리고 해당 로컬에서 카프카, 주키퍼 서버를 띄우는 방법에 대해 알아보았다.

그리고 kafka-python을 이용해 카프카 프로듀서를 띄워보았다.

 

 

이전 포스팅 링크

https://eng-sohee.tistory.com/136

 

Local Server 기반, Kafka Producer 애플리케이션 개발 - Python

해당 프로젝트는 로컬 서버를 기반으로 카프카 프로듀서 애플리케이션을 개발하는 프로젝트에 대한 포스팅이다. 컨플루언트 기반 클러스터에서 카프카를 실행하기 전, 카프카의 동작 원리를

eng-sohee.tistory.com

 

 

이제, 카프카 컨슈머를 띄워 볼 차례다.

 

 

1-2. Python Consumer 개발

 

우선, 카프카 컨슈머가 동작하기 위해서는 주키퍼 서버와 카프카 서버가 실행되고 있어야 한다.

그리고, 카프카 컨슈머가 바라 볼 토픽도 생성되어 있어야 한다. 해당 토픽에 데이터도 들어가 있어야 컨슈머가 작동하므로, 

앞선 포스팅을 참고하여

1. 주키퍼 서버 실행 2. 카프카 서버 실행 3. 토픽 생성 4. 토픽에 데이터 추가

작업이 순차적으로 선작업 되어있어야 한다.

 

 

이게 완료되면 kafka_consumer.py 파일을 생성해준다. 

pwd, ls 명령어로 확인해보면 아래와 같다.

hansohee@hansohuiui-MacBookPro kafka_python % pwd
/Users/hansohee/dev/kafka_python

hansohee@hansohuiui-MacBookPro kafka_python % ls
kafka_consumer.py kafka_producer.py

 

 

 

kafka_consumer.py 의 구성은 아래와 같다.

 

from kafka import KafkaConsumer
from json import loads


consumer = KafkaConsumer(
    'test',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='test-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')),
     consumer_timeout_ms=1000
)


for message in consumer:
    print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
        message.topic, message.partition, message.offset, message.key, message.value))

 

KafkaConsumer 내부의 파라미터 설명을 하자면 아래와 같다.

  • bootstrap_servers: 부트스트랩 리스트
  • auto_offset_reset: 토픽에 붙은 consumer의 offset 정보가 없다면, 어떠한 규칙을 따를 지 설정하는 것 (참고문서 링크)
    • latest: 가장 마지막 offset 부터 consume
    • earliest: 가장 처음 offset 부터 consume
    • none: 만약 해당 consumer의 offset 정보가 없으면 예외 처리 
  • enable_auto_commit: 특정 주기마다 자동으로 commit 을 할 것인지 설정
  • group_id: 해당 컨슈머의 group id
  • value_deserializer: 데이터를 다시 사용할 수 있는 형태로 변환하는데, 이때 어떻게 디코드할지 설정
  • consumer_timeout_ms: 타임아웃 지정. 지정 시간이 지날 동안 데이터가 들어오지 않으면 커넥팅이 끊김. (단위: 밀리 초)

 

 

 

이제 해당 파일을 저장 및 실행한다.

그후, 앞선 포스팅에서 개발한 카프카 컨슈머를 띄워서 데이터를 토픽에 넣어본다.

 

이때, 해당 파일에서 토픽에 저장된 데이터를 잘 출력한다면 컨슈머가 정상적으로 실행하는 것이다.

 

 

 

 

 

이로써, 카프카 파이썬을 이용해 카프카의 Pub/Sub 원리를 직접 간단히 구현해보고, 동작 원리를 이해해보았다.

이제, java를 이용해 컨플루언트 클러스터 서버 기반 컨슈머를 개발하는 과정 or 카프카 개념 등을 포스팅해보겠다.

 

 

 

 

공부를 통해 배운 내용을 작성하고 있습니다. 혹여 해당 포스팅에서 잘못된 부분이 있을 경우, 알려주시면 빠르게 수정 조치 하도록 하겠습니다. 오늘도 읽어주셔서 감사합니다! - 소희 -