Apache Kafka 상세 내용
Apache Kafka 상세 내용
Kafka 상세 내용
231122 학습한 내용 정리
Apache Kafka 개요
정의
- Apache Kafka: 분산 스트리밍 플랫폼
- 메시지 큐: 대용량 실시간 데이터 스트리밍
- 이벤트 스트리밍: 이벤트 기반 아키텍처 지원
- 로그 기반: 분산 로그 시스템
특징
- 고성능: 초당 수백만 메시지 처리
- 확장성: 수평적 확장 가능
- 내구성: 데이터 지속성 보장
- 실시간: 낮은 지연시간
장점
- 처리량: 높은 처리량과 낮은 지연시간
- 확장성: 클러스터로 확장 가능
- 내구성: 데이터 손실 방지
- 통합: 다양한 시스템과 통합
Kafka 설치 및 설정
1. Kafka 설치
1
2
3
4
5
6
7
8
9
# Kafka 다운로드
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
# 압축 해제
tar -xzf kafka_2.13-2.8.1.tgz
# 환경변수 설정
export KAFKA_HOME=/path/to/kafka_2.13-2.8.1
export PATH=$PATH:$KAFKA_HOME/bin
2. Kafka 시작
1
2
3
4
5
# Zookeeper 시작
bin/zookeeper-server-start.sh config/zookeeper.properties
# Kafka 서버 시작
bin/kafka-server-start.sh config/server.properties
3. Python Kafka 클라이언트
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# kafka-python 설치
pip install kafka-python
# 기본 사용법
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
import time
# 프로듀서 설정
producer_config = {
'bootstrap_servers': ['localhost:9092'],
'value_serializer': lambda x: json.dumps(x).encode('utf-8'),
'key_serializer': lambda x: x.encode('utf-8') if x else None,
'acks': 'all',
'retries': 3,
'batch_size': 16384,
'linger_ms': 10,
'buffer_memory': 33554432
}
# 컨슈머 설정
consumer_config = {
'bootstrap_servers': ['localhost:9092'],
'group_id': 'my_consumer_group',
'auto_offset_reset': 'earliest',
'enable_auto_commit': True,
'value_deserializer': lambda x: json.loads(x.decode('utf-8')),
'key_deserializer': lambda x: x.decode('utf-8') if x else None
}
Kafka 기본 사용법
1. 프로듀서 (Producer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def create_producer():
"""Kafka 프로듀서 생성"""
try:
producer = KafkaProducer(**producer_config)
return producer
except Exception as e:
print(f"프로듀서 생성 오류: {e}")
return None
def send_message(producer, topic, message, key=None):
"""메시지 전송"""
try:
future = producer.send(topic, value=message, key=key)
record_metadata = future.get(timeout=10)
print(f"메시지 전송 성공:")
print(f" 토픽: {record_metadata.topic}")
print(f" 파티션: {record_metadata.partition}")
print(f" 오프셋: {record_metadata.offset}")
return True
except KafkaError as e:
print(f"메시지 전송 오류: {e}")
return False
# 프로듀서 사용 예시
def producer_example():
"""프로듀서 사용 예시"""
producer = create_producer()
if producer:
# 단일 메시지 전송
message = {"user_id": 1, "action": "login", "timestamp": time.time()}
send_message(producer, "user_events", message, key="user_1")
# 여러 메시지 전송
for i in range(10):
message = {
"user_id": i,
"action": "page_view",
"page": f"/page_{i}",
"timestamp": time.time()
}
send_message(producer, "user_events", message, key=f"user_{i}")
# 프로듀서 종료
producer.close()
# 프로듀서 예시 실행
# producer_example()
2. 컨슈머 (Consumer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def create_consumer():
"""Kafka 컨슈머 생성"""
try:
consumer = KafkaConsumer(**consumer_config)
return consumer
except Exception as e:
print(f"컨슈머 생성 오류: {e}")
return None
def consume_messages(consumer, topics, timeout_ms=1000):
"""메시지 소비"""
try:
consumer.subscribe(topics)
while True:
message_batch = consumer.poll(timeout_ms=timeout_ms)
if not message_batch:
continue
for topic_partition, messages in message_batch.items():
for message in messages:
print(f"메시지 수신:")
print(f" 토픽: {message.topic}")
print(f" 파티션: {message.partition}")
print(f" 오프셋: {message.offset}")
print(f" 키: {message.key}")
print(f" 값: {message.value}")
print(f" 타임스탬프: {message.timestamp}")
print("-" * 50)
# 오프셋 커밋
consumer.commit()
except KeyboardInterrupt:
print("컨슈머 중지")
except Exception as e:
print(f"메시지 소비 오류: {e}")
finally:
consumer.close()
# 컨슈머 사용 예시
def consumer_example():
"""컨슈머 사용 예시"""
consumer = create_consumer()
if consumer:
topics = ["user_events"]
consume_messages(consumer, topics)
# 컨슈머 예시 실행
# consumer_example()
3. 토픽 관리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
from kafka.admin.config_resource import ConfigResource
from kafka.errors import TopicAlreadyExistsError
def create_admin_client():
"""Kafka 관리자 클라이언트 생성"""
try:
admin_client = KafkaAdminClient(
bootstrap_servers=['localhost:9092'],
client_id='admin_client'
)
return admin_client
except Exception as e:
print(f"관리자 클라이언트 생성 오류: {e}")
return None
def create_topic(admin_client, topic_name, num_partitions=1, replication_factor=1):
"""토픽 생성"""
from kafka.admin import NewTopic
try:
topic = NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor
)
admin_client.create_topics([topic])
print(f"토픽 '{topic_name}' 생성 완료")
return True
except TopicAlreadyExistsError:
print(f"토픽 '{topic_name}'이 이미 존재합니다")
return False
except Exception as e:
print(f"토픽 생성 오류: {e}")
return False
def list_topics(admin_client):
"""토픽 목록 조회"""
try:
metadata = admin_client.describe_topics()
topics = list(metadata.keys())
print(f"토픽 목록: {topics}")
return topics
except Exception as e:
print(f"토픽 목록 조회 오류: {e}")
return []
def delete_topic(admin_client, topic_name):
"""토픽 삭제"""
try:
admin_client.delete_topics([topic_name])
print(f"토픽 '{topic_name}' 삭제 완료")
return True
except Exception as e:
print(f"토픽 삭제 오류: {e}")
return False
# 토픽 관리 예시
def topic_management_example():
"""토픽 관리 예시"""
admin_client = create_admin_client()
if admin_client:
# 토픽 생성
create_topic(admin_client, "test_topic", num_partitions=3, replication_factor=1)
# 토픽 목록 조회
list_topics(admin_client)
# 토픽 삭제
delete_topic(admin_client, "test_topic")
# 토픽 관리 예시 실행
# topic_management_example()
Kafka 고급 기능
1. 파티셔닝
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def custom_partitioner(key, all_partitions, available_partitions):
"""커스텀 파티셔너"""
if key is None:
return available_partitions[0]
# 키의 해시값을 사용하여 파티션 선택
partition = hash(key) % len(available_partitions)
return available_partitions[partition]
def create_partitioned_producer():
"""파티셔닝이 적용된 프로듀서 생성"""
config = producer_config.copy()
config['partitioner'] = custom_partitioner
try:
producer = KafkaProducer(**config)
return producer
except Exception as e:
print(f"파티셔닝 프로듀서 생성 오류: {e}")
return None
def send_partitioned_messages():
"""파티셔닝된 메시지 전송"""
producer = create_partitioned_producer()
if producer:
# 키를 사용하여 파티션 지정
messages = [
("user_1", {"user_id": 1, "action": "login"}),
("user_2", {"user_id": 2, "action": "logout"}),
("user_1", {"user_id": 1, "action": "page_view"}),
("user_3", {"user_id": 3, "action": "purchase"})
]
for key, message in messages:
send_message(producer, "partitioned_topic", message, key=key)
producer.close()
# 파티셔닝 예시 실행
# send_partitioned_messages()
2. 컨슈머 그룹
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def create_consumer_group(group_id, topics):
"""컨슈머 그룹 생성"""
config = consumer_config.copy()
config['group_id'] = group_id
try:
consumer = KafkaConsumer(**config)
consumer.subscribe(topics)
return consumer
except Exception as e:
print(f"컨슈머 그룹 생성 오류: {e}")
return None
def consumer_group_example():
"""컨슈머 그룹 예시"""
# 여러 컨슈머가 같은 그룹에 속하면 메시지를 분할하여 처리
topics = ["user_events"]
# 컨슈머 1
consumer1 = create_consumer_group("my_group", topics)
# 컨슈머 2
consumer2 = create_consumer_group("my_group", topics)
# 각 컨슈머는 다른 파티션의 메시지를 처리
if consumer1 and consumer2:
print("컨슈머 그룹이 생성되었습니다")
print("각 컨슈머는 다른 파티션의 메시지를 처리합니다")
# 컨슈머 종료
consumer1.close()
consumer2.close()
# 컨슈머 그룹 예시 실행
# consumer_group_example()
3. 오프셋 관리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def manual_offset_commit():
"""수동 오프셋 커밋"""
config = consumer_config.copy()
config['enable_auto_commit'] = False
try:
consumer = KafkaConsumer(**config)
consumer.subscribe(["user_events"])
message_count = 0
for message in consumer:
print(f"메시지 처리: {message.value}")
message_count += 1
# 10개 메시지마다 오프셋 커밋
if message_count % 10 == 0:
consumer.commit()
print(f"{message_count}개 메시지 처리 완료, 오프셋 커밋")
except KeyboardInterrupt:
print("컨슈머 중지")
except Exception as e:
print(f"수동 오프셋 커밋 오류: {e}")
finally:
consumer.close()
# 수동 오프셋 커밋 예시 실행
# manual_offset_commit()
Kafka 실무 적용 예시
1. 이벤트 스트리밍
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def event_streaming_example():
"""이벤트 스트리밍 예시"""
# 이벤트 프로듀서
def event_producer():
producer = create_producer()
if producer:
events = [
{"event_type": "user_registration", "user_id": 1, "timestamp": time.time()},
{"event_type": "user_login", "user_id": 1, "timestamp": time.time()},
{"event_type": "product_view", "user_id": 1, "product_id": 100, "timestamp": time.time()},
{"event_type": "add_to_cart", "user_id": 1, "product_id": 100, "timestamp": time.time()},
{"event_type": "purchase", "user_id": 1, "product_id": 100, "amount": 50.0, "timestamp": time.time()}
]
for event in events:
send_message(producer, "user_events", event, key=str(event["user_id"]))
time.sleep(1) # 1초 간격으로 이벤트 전송
producer.close()
# 이벤트 컨슈머
def event_consumer():
consumer = create_consumer()
if consumer:
consumer.subscribe(["user_events"])
for message in consumer:
event = message.value
print(f"이벤트 처리: {event['event_type']} - 사용자 {event['user_id']}")
# 이벤트 타입별 처리
if event["event_type"] == "purchase":
print(f"구매 완료: 상품 {event['product_id']}, 금액 {event['amount']}")
elif event["event_type"] == "user_registration":
print(f"신규 사용자 등록: {event['user_id']}")
# 이벤트 스트리밍 실행
# event_producer()
# event_consumer()
2. 로그 수집
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def log_collection_example():
"""로그 수집 예시"""
# 로그 프로듀서
def log_producer():
producer = create_producer()
if producer:
import logging
# 로그 메시지 생성
log_messages = [
{"level": "INFO", "message": "사용자 로그인 성공", "user_id": 1},
{"level": "WARNING", "message": "로그인 시도 실패", "user_id": 2},
{"level": "ERROR", "message": "데이터베이스 연결 실패", "service": "db"},
{"level": "INFO", "message": "API 요청 처리 완료", "endpoint": "/api/users"},
{"level": "ERROR", "message": "외부 API 호출 실패", "service": "external_api"}
]
for log_msg in log_messages:
send_message(producer, "application_logs", log_msg, key=log_msg["level"])
time.sleep(0.5)
producer.close()
# 로그 컨슈머
def log_consumer():
consumer = create_consumer()
if consumer:
consumer.subscribe(["application_logs"])
for message in consumer:
log_msg = message.value
# 로그 레벨별 처리
if log_msg["level"] == "ERROR":
print(f"🚨 오류 발생: {log_msg['message']}")
# 오류 알림 시스템에 전송
elif log_msg["level"] == "WARNING":
print(f"⚠️ 경고: {log_msg['message']}")
else:
print(f"ℹ️ 정보: {log_msg['message']}")
# 로그 수집 실행
# log_producer()
# log_consumer()
3. 데이터 파이프라인
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def data_pipeline_example():
"""데이터 파이프라인 예시"""
# 데이터 소스 (프로듀서)
def data_source():
producer = create_producer()
if producer:
# 다양한 데이터 소스에서 데이터 수집
data_sources = [
{"source": "web_analytics", "data": {"page_views": 1000, "unique_visitors": 500}},
{"source": "mobile_app", "data": {"app_opens": 2000, "active_users": 800}},
{"source": "api_usage", "data": {"api_calls": 5000, "response_time": 150}},
{"source": "database", "data": {"queries": 3000, "slow_queries": 50}}
]
for source_data in data_sources:
send_message(producer, "raw_data", source_data, key=source_data["source"])
time.sleep(1)
producer.close()
# 데이터 처리 (컨슈머)
def data_processor():
consumer = create_consumer()
if consumer:
consumer.subscribe(["raw_data"])
for message in consumer:
raw_data = message.value
source = raw_data["source"]
data = raw_data["data"]
# 데이터 전처리
processed_data = {
"source": source,
"timestamp": time.time(),
"processed_at": time.time(),
"data": data
}
print(f"데이터 처리 완료: {source}")
# 처리된 데이터를 다른 토픽으로 전송
producer = create_producer()
if producer:
send_message(producer, "processed_data", processed_data, key=source)
producer.close()
# 데이터 파이프라인 실행
# data_source()
# data_processor()
Kafka 모니터링 및 관리
1. 메트릭 수집
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def collect_kafka_metrics():
"""Kafka 메트릭 수집"""
# 토픽별 메트릭
def get_topic_metrics(admin_client, topic_name):
try:
metadata = admin_client.describe_topics([topic_name])
topic_metadata = metadata[topic_name]
metrics = {
"topic": topic_name,
"partitions": len(topic_metadata.partitions),
"replication_factor": len(topic_metadata.partitions[0].replicas)
}
return metrics
except Exception as e:
print(f"토픽 메트릭 수집 오류: {e}")
return None
# 컨슈머 그룹 메트릭
def get_consumer_group_metrics(admin_client, group_id):
try:
# 컨슈머 그룹 정보 조회
group_info = admin_client.describe_consumer_groups([group_id])
metrics = {
"group_id": group_id,
"state": group_info[group_id].state,
"members": len(group_info[group_id].members)
}
return metrics
except Exception as e:
print(f"컨슈머 그룹 메트릭 수집 오류: {e}")
return None
# 메트릭 수집 실행
admin_client = create_admin_client()
if admin_client:
# 토픽 메트릭
topic_metrics = get_topic_metrics(admin_client, "user_events")
if topic_metrics:
print(f"토픽 메트릭: {topic_metrics}")
# 컨슈머 그룹 메트릭
group_metrics = get_consumer_group_metrics(admin_client, "my_consumer_group")
if group_metrics:
print(f"컨슈머 그룹 메트릭: {group_metrics}")
# 메트릭 수집 실행
# collect_kafka_metrics()
2. 성능 모니터링
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def performance_monitoring():
"""성능 모니터링"""
# 메시지 처리 속도 측정
def measure_throughput():
producer = create_producer()
consumer = create_consumer()
if producer and consumer:
# 메시지 전송 속도 측정
start_time = time.time()
message_count = 1000
for i in range(message_count):
message = {"id": i, "data": f"message_{i}"}
send_message(producer, "performance_test", message)
end_time = time.time()
throughput = message_count / (end_time - start_time)
print(f"전송 처리량: {throughput:.2f} 메시지/초")
producer.close()
consumer.close()
# 지연시간 측정
def measure_latency():
producer = create_producer()
consumer = create_consumer()
if producer and consumer:
# 메시지 전송 시간 기록
send_time = time.time()
message = {"timestamp": send_time, "data": "latency_test"}
send_message(producer, "latency_test", message)
# 메시지 수신 시간 측정
consumer.subscribe(["latency_test"])
for message in consumer:
receive_time = time.time()
latency = receive_time - message.value["timestamp"]
print(f"지연시간: {latency:.4f}초")
break
producer.close()
consumer.close()
# 성능 모니터링 실행
# measure_throughput()
# measure_latency()
주의사항 및 모범 사례
1. 성능 최적화
- 배치 크기: 적절한 배치 크기 설정
- 압축: 메시지 압축 사용
- 파티션 수: 적절한 파티션 수 설정
- 복제 팩터: 적절한 복제 팩터 설정
2. 안정성
- 오프셋 관리: 적절한 오프셋 관리
- 에러 처리: 적절한 에러 처리
- 재시도: 실패 시 재시도 로직
- 모니터링: 지속적인 모니터링
3. 보안
- 인증: 적절한 인증 설정
- 암호화: 데이터 암호화
- 접근 제어: 접근 권한 관리
- 감사: 로그 및 감사
마무리
Apache Kafka는 대용량 실시간 데이터 스트리밍을 위한 강력한 분산 플랫폼입니다. 높은 처리량, 낮은 지연시간, 확장성 등의 특징을 통해 실시간 데이터 처리 시스템을 구축할 수 있습니다. 적절한 설정과 모니터링을 통해 안정적이고 효율적인 스트리밍 시스템을 운영할 수 있습니다.
This post is licensed under CC BY 4.0 by the author.