Post

Apache Airflow 기초 개념

Apache Airflow 기초 개념

Apache Airflow 기초 개념

워크플로우 오케스트레이션을 위한 플랫폼

Airflow란?

Apache Airflow는 워크플로우를 프로그래밍 방식으로 작성, 스케줄링, 모니터링할 수 있는 플랫폼입니다. 데이터 파이프라인의 복잡한 의존성과 스케줄링을 관리하는 데 특화되어 있습니다.

주요 특징

  • 프로그래밍 방식: Python 코드로 워크플로우 정의
  • 스케줄링: Cron 표현식 기반 스케줄링
  • 모니터링: 웹 UI를 통한 실시간 모니터링
  • 확장성: 다양한 연산자와 센서 제공
  • 재시도: 실패한 작업의 자동 재시도

핵심 개념

1. DAG (Directed Acyclic Graph)

  • 워크플로우를 정의하는 Python 스크립트
  • 방향성이 있는 비순환 그래프
  • 각 노드는 Task, 각 엣지는 의존성

2. Task

  • DAG 내의 개별 작업 단위
  • Operator를 통해 실행되는 작업

3. Operator

  • 실제 작업을 수행하는 컴포넌트
  • BashOperator, PythonOperator, HttpOperator 등

4. Sensor

  • 특정 조건이 만족될 때까지 대기
  • 파일 존재, API 응답, 데이터베이스 상태 등

기본 DAG 구조

1. 기본 설정

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# 기본 인수 설정
default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2023, 11, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# DAG 정의
dag = DAG(
    'my_first_dag',
    default_args=default_args,
    description='My first Airflow DAG',
    schedule_interval=timedelta(days=1),
    catchup=False
)

2. Task 정의

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Bash 작업
bash_task = BashOperator(
    task_id='bash_task',
    bash_command='echo "Hello Airflow!"',
    dag=dag
)

# Python 작업
def python_function():
    print("This is a Python task")
    return "Task completed"

python_task = PythonOperator(
    task_id='python_task',
    python_callable=python_function,
    dag=dag
)

3. 의존성 설정

1
2
3
4
5
# Task 간 의존성 설정
bash_task >> python_task

# 또는
python_task.set_upstream(bash_task)

주요 Operator

1. BashOperator

1
2
3
4
5
6
7
from airflow.operators.bash import BashOperator

bash_task = BashOperator(
    task_id='run_bash_command',
    bash_command='ls -la /tmp',
    dag=dag
)

2. PythonOperator

1
2
3
4
5
6
7
8
9
10
11
12
13
from airflow.operators.python import PythonOperator

def process_data():
    import pandas as pd
    df = pd.read_csv('/tmp/data.csv')
    processed_df = df.dropna()
    processed_df.to_csv('/tmp/processed_data.csv', index=False)

python_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=dag
)

3. HttpOperator

1
2
3
4
5
6
7
8
9
from airflow.providers.http.operators.http import SimpleHttpOperator

http_task = SimpleHttpOperator(
    task_id='call_api',
    http_conn_id='my_api_connection',
    endpoint='api/data',
    method='GET',
    dag=dag
)

4. SqliteOperator

1
2
3
4
5
6
7
8
from airflow.providers.sqlite.operators.sqlite import SqliteOperator

sqlite_task = SqliteOperator(
    task_id='create_table',
    sqlite_conn_id='sqlite_default',
    sql='CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)',
    dag=dag
)

XCom을 통한 데이터 전달

1. 데이터 푸시

1
2
3
4
5
6
7
8
9
10
def extract_data(ti):
    data = {'key': 'value', 'number': 42}
    ti.xcom_push(key='extracted_data', value=data)
    return data

extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag
)

2. 데이터 풀

1
2
3
4
5
6
7
8
9
10
def process_data(ti):
    data = ti.xcom_pull(key='extracted_data', task_ids='extract_data')
    print(f"Received data: {data}")
    return data

process_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
    dag=dag
)

실무 예제: 카카오 API 파이프라인

1. API 데이터 추출

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from airflow.providers.http.operators.http import SimpleHttpOperator

def extract_kakao(ti):
    import requests
    
    api_key = "your_kakao_api_key"
    url = f"https://dapi.kakao.com/v2/search/web?query=파이썬"
    headers = {"Authorization": f"KakaoAK {api_key}"}
    
    response = requests.get(url, headers=headers)
    data = response.json()
    
    return data

extract_task = PythonOperator(
    task_id='extract_kakao',
    python_callable=extract_kakao,
    dag=dag
)

2. 데이터 전처리

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def preprocessing(ti):
    import pandas as pd
    
    # 이전 태스크에서 데이터 가져오기
    search_result = ti.xcom_pull(task_ids=['extract_kakao'])
    documents = search_result[0]['documents']
    
    # DataFrame으로 변환
    df = pd.DataFrame(documents)
    
    # CSV로 저장
    df.to_csv('/tmp/processed_result.csv', index=False)
    
    return "Preprocessing completed"

preprocess_task = PythonOperator(
    task_id='preprocessing',
    python_callable=preprocessing,
    dag=dag
)

3. 데이터베이스 저장

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def load_to_db(ti):
    import pandas as pd
    import sqlite3
    
    # CSV 파일 읽기
    df = pd.read_csv('/tmp/processed_result.csv')
    
    # 데이터베이스 연결
    conn = sqlite3.connect('/tmp/results.db')
    
    # 테이블에 저장
    df.to_sql('search_results', conn, if_exists='replace', index=False)
    
    conn.close()
    return "Data loaded to database"

load_task = PythonOperator(
    task_id='load_to_db',
    python_callable=load_to_db,
    dag=dag
)

4. 전체 파이프라인

1
2
# 의존성 설정
extract_task >> preprocess_task >> load_task

스케줄링

1. Cron 표현식

1
2
3
4
5
6
dag = DAG(
    'scheduled_dag',
    default_args=default_args,
    schedule_interval='0 9 * * *',  # 매일 오전 9시
    catchup=False
)

2. 상대적 간격

1
2
3
4
5
6
dag = DAG(
    'interval_dag',
    default_args=default_args,
    schedule_interval=timedelta(hours=2),  # 2시간마다
    catchup=False
)

3. 조건부 실행

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from airflow.operators.dummy import DummyOperator
from airflow.sensors.filesystem import FileSensor

# 파일 존재 확인
file_sensor = FileSensor(
    task_id='wait_for_file',
    filepath='/tmp/input_data.csv',
    dag=dag
)

# 조건부 실행
dummy_task = DummyOperator(
    task_id='dummy_task',
    dag=dag
)

file_sensor >> dummy_task

모니터링과 로깅

1. 웹 UI

  • DAGs: 모든 DAG 목록 및 상태
  • Graph: DAG의 그래프 뷰
  • Task Instance: 개별 태스크 상세 정보
  • Logs: 태스크 실행 로그

2. 알림 설정

1
2
3
4
5
6
7
default_args = {
    'email_on_failure': True,
    'email_on_retry': False,
    'email': ['admin@company.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

3. 로깅

1
2
3
4
5
6
7
def logging_task():
    import logging
    
    logger = logging.getLogger(__name__)
    logger.info("This is an info message")
    logger.warning("This is a warning message")
    logger.error("This is an error message")

주요 학습 포인트

1. DAG 설계 원칙

  • 단일 책임: 각 DAG는 하나의 목적
  • 재사용성: 공통 로직은 별도 모듈로 분리
  • 테스트 가능성: 단위 테스트 작성

2. 에러 처리

  • 재시도 전략: 적절한 재시도 횟수 설정
  • 실패 처리: 실패 시 알림 및 복구 로직
  • 의존성 관리: 실패한 태스크의 영향 최소화

3. 성능 최적화

  • 병렬 처리: 독립적인 태스크의 병렬 실행
  • 리소스 관리: 적절한 리소스 할당
  • 캐싱: 중복 계산 방지

4. 보안

  • 연결 관리: 민감한 정보의 안전한 저장
  • 권한 관리: 적절한 접근 권한 설정
  • 암호화: 데이터 전송 및 저장 시 암호화

Airflow는 복잡한 데이터 파이프라인을 체계적으로 관리할 수 있는 강력한 도구입니다.

This post is licensed under CC BY 4.0 by the author.