Notice
Recent Posts
Recent Comments
Link
devops
Local 테스트용 Kafka 브로커, 주키퍼 올리기 with Python 본문
반응형
Producer와 Consumer는 Python으로 테스트, Broker와 Zookeeper는 Docker로 올려본다. 서버를 EC2로 올리는 건 비용이 들기 때문에 간단한 테스트용으로 추천한다.
(Confluent-Kafka는 개발이 활발하고 다른 버전들보다 여러면에서 스펙이 가장 뛰어나기 때문에 Confluent버전을 사용함)
1) Confluent-kafka 설치
$ pip install confluent-kafka
(consumer와 producer 어플리케이션을 위해)
2) Broker와 Zookeeper docker-compose로 올리기
# docker-compose.yml
---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
hostname: zookeeper
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
zookeeper는 2181 포트, broker는 9092 포트로 올린다. 기본적으로 replication은 3개 이상 만들어야하지만, 테스트기 때문에 주키퍼와 브로커를 1개씩 올린다.
$ docker compose up -d
3) Configuration 파일 설정
# getting_started.ini
[default]
bootstrap.servers=localhost:9092
[consumer]
group.id=python_example_group_1
# 'auto.offset.reset=earliest' to start reading from the beginning of
# the topic if no committed offsets exist.
auto.offset.reset=earliest
bootstrap.servers는 Broker를 말한다. 컨슈머는 기본적으로 컨슈머 그룹 설정이 필요하기 때문에 Producer와 Consumer 모두 이 구성 파일을 통해 실행한다.
4) broker 토픽 생성하기
docker compose exec broker \
kafka-topics --create \
--topic purchases \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1
5) Producer 코드
#!/usr/bin/env python
import sys
from random import choice
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Producer
if __name__ == '__main__':
# Parse the command line.
parser = ArgumentParser()
parser.add_argument('config_file', type=FileType('r'))
args = parser.parse_args()
# Parse the configuration.
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
config_parser = ConfigParser()
config_parser.read_file(args.config_file)
config = dict(config_parser['default'])
# Create Producer instance
producer = Producer(config)
# Optional per-message delivery callback (triggered by poll() or flush())
# when a message has been successfully delivered or permanently
# failed delivery (after retries).
def delivery_callback(err, msg):
if err:
print('ERROR: Message failed delivery: {}'.format(err))
else:
print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
# Produce data by selecting random values from these lists.
topic = "purchases"
user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']
count = 0
for _ in range(10):
user_id = choice(user_ids)
product = choice(products)
producer.produce(topic, product, user_id, callback=delivery_callback)
count += 1
# Block until the messages are sent.
producer.poll(10000)
producer.flush()
6) Consumer 코드
#!/usr/bin/env python
import sys
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Consumer, OFFSET_BEGINNING
if __name__ == '__main__':
# Parse the command line.
parser = ArgumentParser()
parser.add_argument('config_file', type=FileType('r'))
parser.add_argument('--reset', action='store_true')
args = parser.parse_args()
# Parse the configuration.
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
config_parser = ConfigParser()
config_parser.read_file(args.config_file)
config = dict(config_parser['default'])
config.update(config_parser['consumer'])
# Create Consumer instance
consumer = Consumer(config)
# Set up a callback to handle the '--reset' flag.
def reset_offset(consumer, partitions):
if args.reset:
for p in partitions:
p.offset = OFFSET_BEGINNING
consumer.assign(partitions)
# Subscribe to topic
topic = "purchases"
consumer.subscribe([topic], on_assign=reset_offset)
# Poll for new messages from Kafka and print them.
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
# Initial message consumption may take up to
# `session.timeout.ms` for the consumer group to
# rebalance and start consuming
print("Waiting...")
elif msg.error():
print("ERROR: %s".format(msg.error()))
else:
# Extract the (optional) key and value, and print.
print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(
topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
except KeyboardInterrupt:
pass
finally:
# Leave group and commit final offsets
consumer.close()
반응형
'DevOps > Kafka' 카테고리의 다른 글
Kafka Consumer의 Offset, Group Cordinator와 파티션 할당 전략 (0) | 2022.11.20 |
---|---|
Kafka Producer의 파티셔닝 동작 원리 및 전송 방식 (0) | 2022.11.20 |
Kafka 기본 개념 정리 (0) | 2022.11.18 |
Comments