4.5HZ - AI 기반 음악 추천 서비스 개발기
프로젝트 기간: 2023.11.27 ~ 2024.01.05 (40일)
팀 구성: 5명 (이도형, 유현준, 정유진, 조민진, 최세인)
역할: 백엔드 개발, 데이터 파이프라인 구축, ML 모델 개발
🎯 프로젝트 개요
글로벌 음악 데이터를 분석하여 사용자 맞춤형 음악을 추천하는 웹 서비스를 개발했습니다. Spotify API를 활용한 실시간 데이터 수집부터 Apache Airflow를 통한 자동화된 파이프라인까지, 전체적인 데이터 사이언스 프로젝트의 생명주기를 경험할 수 있었습니다.
주요 성과
- 최종 프로젝트 최우수 수상
- 추천 정확도: RMSE 0.85 달성
- 데이터 처리량: 100만+ 음악 트랙 분석
- 실시간 처리: 1시간 간격 데이터 업데이트
🏗 시스템 아키텍처
전체 워크플로우
시스템 아키텍처 다이어그램
graph TB
%% 데이터 수집 및 전처리
A[Spotify API] --> B[pandas<br/>Data Preprocessing]
C[Dummy Data] --> B
D[Apache Airflow<br/>Orchestration] --> B
%% 데이터 저장
B --> E[MySQL<br/>Main Database]
B --> F[OpenSearch<br/>Log Search]
%% 실시간 스트리밍 파이프라인
G[User Interaction] --> H[fluentd<br/>Log Collector]
H --> I[Kafka<br/>Message Queue]
I --> J[Spark Streaming<br/>Real-time Processing]
J --> K[MongoDB<br/>Log Storage]
%% 시각화 및 분석
K --> L[Chart.js<br/>Visualization]
L --> M[Apache Airflow<br/>Analytics Pipeline]
M --> E
%% 웹 서비스 및 추천
E --> N[Django<br/>Web Service]
N --> O[Content/Collaborative<br/>Recommendation Algorithm]
O --> N
O --> M
%% 최종 서비스
N --> P[4.5HZ<br/>Music Recommendation Service]
%% 스타일링
classDef api fill:#1db954,stroke:#1ed760,color:#fff
classDef processing fill:#ff6b35,stroke:#ff8c42,color:#fff
classDef storage fill:#336791,stroke:#4a90a4,color:#fff
classDef streaming fill:#ff6b6b,stroke:#ff8e8e,color:#fff
classDef web fill:#092e20,stroke:#0c4b33,color:#fff
classDef service fill:#6c5ce7,stroke:#a29bfe,color:#fff
class A api
class B,D,M processing
class E,F,K storage
class G,H,I,J streaming
class N,O web
class P service
데이터베이스 구조 (ERD)
🛠 기술 스택
Backend & Web Framework
- Django 4.2.8: 웹 프레임워크
- Django REST Framework: API 개발
- MySQL: 메인 데이터베이스
- MongoDB: 로그 데이터 저장
Data Engineering
- Apache Spark 3.2.4: 대용량 데이터 처리
- Apache Kafka: 실시간 스트리밍
- Apache Airflow: 워크플로우 오케스트레이션
- Fluentd: 로그 수집
Machine Learning
- Scikit-learn: 전통적인 ML 알고리즘
- TensorFlow: 딥러닝 모델
- Surprise: 추천 시스템 라이브러리
- Pandas, NumPy: 데이터 처리
📊 데이터 파이프라인
1. 데이터 수집 (Data Collection)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| # Spotify API를 통한 음악 데이터 수집
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
def collect_track_data():
sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())
# 아티스트 정보 수집
artists = sp.search(q='genre:k-pop', type='artist', limit=50)
# 트랙 정보 및 오디오 특성 수집
for artist in artists['artists']['items']:
tracks = sp.artist_top_tracks(artist['id'])
for track in tracks['tracks']:
audio_features = sp.audio_features(track['id'])
# 데이터베이스 저장
|
2. 데이터 전처리 (Data Preprocessing)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| # 오디오 특성을 활용한 클러스터링
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
def preprocess_audio_features(df):
# 오디오 특성 정규화
features = ['danceability', 'energy', 'valence', 'tempo']
scaler = StandardScaler()
scaled_features = scaler.fit_transform(df[features])
# K-means 클러스터링
kmeans = KMeans(n_clusters=10, random_state=42)
df['cluster'] = kmeans.fit_predict(scaled_features)
return df
|
3. 실시간 데이터 처리
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| # Kafka + Spark Streaming을 통한 실시간 로그 처리
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
def process_user_logs():
spark = SparkSession.builder.appName("UserLogProcessor").getOrCreate()
# Kafka에서 로그 데이터 읽기
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user_logs") \
.load()
# 실시간 집계
result = df.groupBy("user_id", window("timestamp", "1 hour")) \
.agg(count("track_id").alias("play_count"))
return result
|
🤖 추천 알고리즘
1. 협업 필터링 (Collaborative Filtering)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| from surprise import SVD, Dataset, Reader
from surprise.model_selection import cross_validate
def build_collaborative_model():
# 사용자-아이템 매트릭스 구성
reader = Reader(rating_scale=(0, 5))
data = Dataset.load_from_df(ratings_df, reader)
# SVD 모델 학습
model = SVD(n_epochs=20, lr_all=0.005, reg_all=0.4)
cross_validate(model, data, measures=['RMSE', 'MAE'], cv=5)
return model
def recommend_songs(user_id, n_recommendations=10):
# 사용자가 아직 듣지 않은 곡들 중에서 추천
user_tracks = get_user_played_tracks(user_id)
all_tracks = get_all_tracks()
unplayed_tracks = set(all_tracks) - set(user_tracks)
predictions = []
for track_id in unplayed_tracks:
pred = model.predict(user_id, track_id)
predictions.append((track_id, pred.est))
# 예상 평점이 높은 순으로 정렬
predictions.sort(key=lambda x: x[1], reverse=True)
return predictions[:n_recommendations]
|
2. 콘텐츠 기반 필터링 (Content-Based Filtering)
1
2
3
4
5
6
7
8
9
10
11
12
13
| from sklearn.metrics.pairwise import cosine_similarity
def content_based_recommendation(track_id, n_recommendations=10):
# 오디오 특성 기반 유사도 계산
track_features = get_track_audio_features(track_id)
all_features = get_all_track_features()
# 코사인 유사도 계산
similarities = cosine_similarity([track_features], all_features)[0]
# 유사도가 높은 곡들 반환
similar_indices = similarities.argsort()[-n_recommendations-1:-1][::-1]
return [get_track_by_index(idx) for idx in similar_indices]
|
3. 하이브리드 추천 시스템
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| def hybrid_recommendation(user_id, track_id=None):
# 협업 필터링 결과
cf_recommendations = collaborative_filtering(user_id)
# 콘텐츠 기반 필터링 결과
if track_id:
cb_recommendations = content_based_recommendation(track_id)
else:
# 사용자의 최근 재생 곡 기반
recent_tracks = get_user_recent_tracks(user_id, limit=5)
cb_recommendations = []
for track in recent_tracks:
cb_recommendations.extend(content_based_recommendation(track['id']))
# 가중 평균으로 최종 추천 리스트 생성
final_recommendations = weighted_combine(cf_recommendations, cb_recommendations)
return final_recommendations
|
🔄 자동화된 파이프라인
Apache Airflow DAG
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
| from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
default_args = {
'owner': '4.5HZ',
'depends_on_past': False,
'start_date': datetime(2023, 12, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'music_recommendation_pipeline',
default_args=default_args,
description='음악 추천 시스템 파이프라인',
schedule_interval='@hourly',
catchup=False,
tags=['music', 'recommendation', 'ml'],
)
# 1. 데이터 전처리
preprocess_task = SparkSubmitOperator(
task_id='preprocess_data',
application='/home/ubuntu/airflow/dags/preprocess.py',
conn_id='spark_local',
dag=dag,
)
# 2. 모델 재학습
retrain_model = PythonOperator(
task_id='retrain_model',
python_callable=retrain_recommendation_model,
dag=dag,
)
# 3. 집계 데이터 생성
aggregate_data = SparkSubmitOperator(
task_id='aggregate_data',
application='/home/ubuntu/airflow/dags/aggregate.py',
conn_id='spark_local',
dag=dag,
)
# 4. 데이터베이스 업데이트
update_database = PythonOperator(
task_id='update_database',
python_callable=update_recommendation_database,
dag=dag,
)
# 태스크 의존성 설정
preprocess_task >> retrain_model >> aggregate_data >> update_database
|
📈 성능 최적화
1. 데이터베이스 최적화
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| # 인덱스 최적화
class Track(models.Model):
track_id = models.CharField(primary_key=True, max_length=30)
track_name = models.CharField(max_length=200, db_index=True)
genres = models.CharField(max_length=80, db_index=True)
class Meta:
indexes = [
models.Index(fields=['track_popularity']),
models.Index(fields=['release_date']),
]
# 쿼리 최적화
def get_popular_tracks_by_genre(genre, limit=50):
return Track.objects.filter(
genres__icontains=genre
).order_by('-track_popularity')[:limit]
|
2. 캐싱 전략
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| from django.core.cache import cache
import json
def get_user_recommendations(user_id):
cache_key = f"recommendations_{user_id}"
cached_result = cache.get(cache_key)
if cached_result:
return json.loads(cached_result)
# 추천 알고리즘 실행
recommendations = hybrid_recommendation(user_id)
# 1시간 캐싱
cache.set(cache_key, json.dumps(recommendations), 3600)
return recommendations
|
🚀 배포 및 모니터링
Docker 컨테이너화
1
2
3
4
5
6
7
8
9
10
11
12
13
| # Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "final_pjt.wsgi:application"]
|
로깅 및 모니터링
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| import logging
import json
from datetime import datetime
class RecommendationLogger:
def __init__(self):
self.logger = logging.getLogger('recommendation')
handler = logging.FileHandler('logs/recommendation.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_recommendation(self, user_id, recommendations, response_time):
log_data = {
'user_id': user_id,
'recommendations': recommendations,
'response_time': response_time,
'timestamp': datetime.now().isoformat()
}
self.logger.info(json.dumps(log_data))
|
📊 프로젝트 결과
🎥 시연 영상
성능 지표
- 추천 정확도: RMSE 0.85 (Surprise 라이브러리 기준)
- 응답 시간: 평균 200ms 이하
- 시스템 가용성: 99.5%
- 데이터 처리량: 시간당 10만+ 로그 처리
사용자 피드백
- 추천 만족도: 4.2/5.0
- 재방문율: 65%
- 평균 세션 시간: 12분
🎓 학습한 점
기술적 성장
- 풀스택 개발: 프론트엔드부터 데이터 파이프라인까지 전체 시스템 구축
- MLOps: 모델 학습부터 배포까지의 자동화된 파이프라인 구축
- 대용량 데이터 처리: Spark를 활용한 분산 처리 경험
- 실시간 시스템: Kafka + Spark Streaming을 통한 실시간 데이터 처리
협업 경험
- 팀 협업: 5명 팀원과의 효율적인 협업 방식 구축
- 코드 리뷰: Git을 활용한 체계적인 코드 리뷰 프로세스
- 문서화: 기술 문서 작성 및 API 문서화 경험
📚 참고 자료
💬 마무리
4.5HZ 프로젝트를 통해 데이터 사이언스의 전체 생명주기를 경험할 수 있었습니다. 단순한 추천 알고리즘 구현을 넘어서, 실제 사용자가 사용할 수 있는 서비스로 만드는 과정에서 많은 것을 배웠습니다.
특히 데이터 엔지니어링과 MLOps 분야에서의 경험이 가장 값진 것 같습니다. 추천 시스템의 성능을 높이는 것도 중요하지만, 안정적이고 확장 가능한 시스템을 구축하는 것이 더욱 중요하다는 것을 깨달았습니다.
앞으로도 지속적으로 개선해나가며, 더 나은 음악 추천 서비스를 만들어가고 싶습니다.
🔗 관련 링크
📁 GitHub Repository
🎥 YouTube 시연 영상
📄 발표 자료