Apache Airflow 상세 내용
Apache Airflow 상세 내용
Airflow 상세 내용
231120~231121, 231215~231222 학습한 내용 정리
Apache Airflow 개요
정의
- Apache Airflow: 워크플로우 오케스트레이션 플랫폼
- DAG: Directed Acyclic Graph (방향성 비순환 그래프)
- 스케줄링: 복잡한 데이터 파이프라인 스케줄링 및 모니터링
- 오케스트레이션: 작업 간 의존성 관리 및 실행 순서 제어
특징
- 시각화: 웹 UI를 통한 워크플로우 시각화
- 스케줄링: Cron 기반 스케줄링
- 모니터링: 실시간 작업 모니터링
- 확장성: 다양한 연산자와 플러그인 지원
장점
- 유연성: 다양한 작업 유형 지원
- 모니터링: 상세한 실행 로그 및 모니터링
- 재사용성: 재사용 가능한 컴포넌트
- 확장성: 커스텀 연산자 및 플러그인 개발
Airflow 설치 및 설정
1. Airflow 설치
1
2
3
4
5
6
7
8
# pip 설치
pip install apache-airflow
# 특정 버전 설치
pip install apache-airflow==2.5.0
# 추가 패키지 설치
pip install apache-airflow[postgres,celery,redis]
2. Airflow 초기화
1
2
3
4
5
6
7
8
9
10
11
12
13
# Airflow 홈 디렉토리 설정
export AIRFLOW_HOME=~/airflow
# 데이터베이스 초기화
airflow db init
# 관리자 사용자 생성
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
3. Airflow 시작
1
2
3
4
5
# 웹 서버 시작
airflow webserver --port 8080
# 스케줄러 시작
airflow scheduler
Airflow 기본 사용법
1. DAG 생성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
# 기본 인수 설정
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG 정의
dag = DAG(
'example_dag',
default_args=default_args,
description='예제 DAG',
schedule_interval=timedelta(days=1),
catchup=False,
tags=['example', 'tutorial'],
)
# 작업 정의
start_task = DummyOperator(
task_id='start',
dag=dag,
)
# Bash 작업
bash_task = BashOperator(
task_id='bash_task',
bash_command='echo "Hello Airflow!"',
dag=dag,
)
# Python 작업
def python_function():
print("Python 작업 실행")
return "작업 완료"
python_task = PythonOperator(
task_id='python_task',
python_callable=python_function,
dag=dag,
)
# 작업 의존성 설정
start_task >> bash_task >> python_task
2. Python 작업
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.python import ShortCircuitOperator
# 기본 Python 작업
def data_processing():
"""데이터 처리 함수"""
import pandas as pd
import numpy as np
# 샘플 데이터 생성
data = {
'id': range(1, 101),
'value': np.random.randn(100),
'category': np.random.choice(['A', 'B', 'C'], 100)
}
df = pd.DataFrame(data)
# 데이터 처리
processed_df = df.groupby('category').agg({
'value': ['mean', 'std', 'count']
}).round(2)
print("데이터 처리 완료:")
print(processed_df)
return processed_df.to_dict()
python_task = PythonOperator(
task_id='data_processing',
python_callable=data_processing,
dag=dag,
)
# 분기 작업
def decide_branch():
"""분기 결정 함수"""
import random
if random.random() > 0.5:
return 'branch_a'
else:
return 'branch_b'
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=decide_branch,
dag=dag,
)
# 분기 작업들
branch_a_task = DummyOperator(
task_id='branch_a',
dag=dag,
)
branch_b_task = DummyOperator(
task_id='branch_b',
dag=dag,
)
# 단축 회로 작업
def should_continue():
"""계속 진행할지 결정"""
import random
return random.random() > 0.3
short_circuit_task = ShortCircuitOperator(
task_id='short_circuit_task',
python_callable=should_continue,
dag=dag,
)
# 작업 의존성
python_task >> branch_task
branch_task >> [branch_a_task, branch_b_task]
branch_a_task >> short_circuit_task
branch_b_task >> short_circuit_task
3. 데이터베이스 작업
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
# PostgreSQL 작업
postgres_task = PostgresOperator(
task_id='postgres_task',
postgres_conn_id='postgres_default',
sql='''
CREATE TABLE IF NOT EXISTS test_table (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
''',
dag=dag,
)
# PostgreSQL Hook 사용
def postgres_hook_example():
"""PostgreSQL Hook 사용 예시"""
hook = PostgresHook(postgres_conn_id='postgres_default')
# 데이터 삽입
insert_sql = """
INSERT INTO test_table (name) VALUES (%s)
"""
hook.run(insert_sql, parameters=['test_name'])
# 데이터 조회
select_sql = "SELECT * FROM test_table LIMIT 10"
records = hook.get_records(select_sql)
print("조회된 데이터:")
for record in records:
print(record)
postgres_hook_task = PythonOperator(
task_id='postgres_hook_task',
python_callable=postgres_hook_example,
dag=dag,
)
Airflow 고급 기능
1. XCom (Cross-Communication)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from airflow.operators.python import PythonOperator
# XCom을 사용한 데이터 전달
def push_data():
"""데이터를 XCom에 푸시"""
return {
'processed_data': [1, 2, 3, 4, 5],
'metadata': {
'processed_at': datetime.now().isoformat(),
'record_count': 5
}
}
def pull_data(**context):
"""XCom에서 데이터를 풀"""
# 이전 작업의 결과 가져오기
data = context['task_instance'].xcom_pull(task_ids='push_data_task')
print("받은 데이터:")
print(f"처리된 데이터: {data['processed_data']}")
print(f"메타데이터: {data['metadata']}")
# 데이터 처리
processed_data = [x * 2 for x in data['processed_data']]
return processed_data
def final_processing(**context):
"""최종 처리"""
data = context['task_instance'].xcom_pull(task_ids='pull_data_task')
print(f"최종 처리된 데이터: {data}")
return sum(data)
# XCom 작업들
push_data_task = PythonOperator(
task_id='push_data_task',
python_callable=push_data,
dag=dag,
)
pull_data_task = PythonOperator(
task_id='pull_data_task',
python_callable=pull_data,
dag=dag,
)
final_processing_task = PythonOperator(
task_id='final_processing_task',
python_callable=final_processing,
dag=dag,
)
# 작업 의존성
push_data_task >> pull_data_task >> final_processing_task
2. 동적 DAG 생성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from airflow import DAG
from airflow.operators.bash import BashOperator
# 동적 DAG 생성 함수
def create_dynamic_dag(dag_id, schedule_interval, tasks):
"""동적 DAG 생성"""
default_args = {
'owner': 'data_team',
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id,
default_args=default_args,
description=f'동적 생성된 DAG: {dag_id}',
schedule_interval=schedule_interval,
catchup=False,
)
# 동적으로 작업 생성
previous_task = None
for i, task_config in enumerate(tasks):
task = BashOperator(
task_id=f'task_{i}',
bash_command=task_config['command'],
dag=dag,
)
if previous_task:
previous_task >> task
previous_task = task
return dag
# 동적 DAG 생성 예시
tasks_config = [
{'command': 'echo "작업 1 실행"'},
{'command': 'echo "작업 2 실행"'},
{'command': 'echo "작업 3 실행"'},
]
dynamic_dag = create_dynamic_dag(
'dynamic_example_dag',
timedelta(hours=1),
tasks_config
)
3. 커스텀 연산자
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class CustomOperator(BaseOperator):
"""커스텀 연산자"""
@apply_defaults
def __init__(self, custom_param, *args, **kwargs):
super(CustomOperator, self).__init__(*args, **kwargs)
self.custom_param = custom_param
def execute(self, context):
"""연산자 실행"""
print(f"커스텀 연산자 실행: {self.custom_param}")
# 커스텀 로직 구현
result = self.process_data()
return result
def process_data(self):
"""데이터 처리 로직"""
# 실제 데이터 처리 로직
processed_data = {
'input': self.custom_param,
'output': f"processed_{self.custom_param}",
'timestamp': datetime.now().isoformat()
}
return processed_data
# 커스텀 연산자 사용
custom_task = CustomOperator(
task_id='custom_task',
custom_param='test_value',
dag=dag,
)
Airflow 실무 적용 예시
1. ETL 파이프라인
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
# ETL 파이프라인 DAG
etl_dag = DAG(
'etl_pipeline',
default_args=default_args,
description='ETL 파이프라인',
schedule_interval=timedelta(hours=1),
catchup=False,
)
# Extract 작업
def extract_data():
"""데이터 추출"""
import requests
import json
# API에서 데이터 추출
response = requests.get('https://api.example.com/data')
data = response.json()
# 데이터를 파일로 저장
with open('/tmp/extracted_data.json', 'w') as f:
json.dump(data, f)
print(f"추출된 데이터 수: {len(data)}")
return len(data)
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=etl_dag,
)
# Transform 작업
def transform_data():
"""데이터 변환"""
import json
import pandas as pd
# 데이터 로드
with open('/tmp/extracted_data.json', 'r') as f:
data = json.load(f)
# DataFrame으로 변환
df = pd.DataFrame(data)
# 데이터 변환
df['processed_at'] = datetime.now()
df['value_squared'] = df['value'] ** 2
# 변환된 데이터 저장
df.to_csv('/tmp/transformed_data.csv', index=False)
print(f"변환된 데이터 수: {len(df)}")
return len(df)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
dag=etl_dag,
)
# Load 작업
def load_data():
"""데이터 로드"""
import pandas as pd
from airflow.providers.postgres.hooks.postgres import PostgresHook
# 데이터 로드
df = pd.read_csv('/tmp/transformed_data.csv')
# PostgreSQL에 데이터 삽입
hook = PostgresHook(postgres_conn_id='postgres_default')
# 테이블 생성
create_table_sql = """
CREATE TABLE IF NOT EXISTS processed_data (
id SERIAL PRIMARY KEY,
value FLOAT,
processed_at TIMESTAMP,
value_squared FLOAT
);
"""
hook.run(create_table_sql)
# 데이터 삽입
for _, row in df.iterrows():
insert_sql = """
INSERT INTO processed_data (value, processed_at, value_squared)
VALUES (%s, %s, %s)
"""
hook.run(insert_sql, parameters=[
row['value'],
row['processed_at'],
row['value_squared']
])
print(f"로드된 데이터 수: {len(df)}")
return len(df)
load_task = PythonOperator(
task_id='load_data',
python_callable=load_data,
dag=etl_dag,
)
# ETL 파이프라인 의존성
extract_task >> transform_task >> load_task
2. 데이터 품질 검사
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
# 데이터 품질 검사 DAG
quality_dag = DAG(
'data_quality_check',
default_args=default_args,
description='데이터 품질 검사',
schedule_interval=timedelta(days=1),
catchup=False,
)
# 데이터 품질 검사
def data_quality_check():
"""데이터 품질 검사"""
from airflow.providers.postgres.hooks.postgres import PostgresHook
import pandas as pd
hook = PostgresHook(postgres_conn_id='postgres_default')
# 데이터 조회
query = "SELECT * FROM processed_data"
df = pd.read_sql(query, hook.get_conn())
# 품질 검사
quality_report = {
'total_records': len(df),
'null_values': df.isnull().sum().to_dict(),
'duplicate_records': df.duplicated().sum(),
'data_types': df.dtypes.to_dict(),
'statistics': df.describe().to_dict()
}
# 품질 검사 결과 저장
with open('/tmp/quality_report.json', 'w') as f:
json.dump(quality_report, f, indent=2, default=str)
print("데이터 품질 검사 완료:")
print(f"총 레코드 수: {quality_report['total_records']}")
print(f"중복 레코드 수: {quality_report['duplicate_records']}")
return quality_report
quality_check_task = PythonOperator(
task_id='data_quality_check',
python_callable=data_quality_check,
dag=quality_dag,
)
# 품질 검사 결과 알림
def send_quality_report(**context):
"""품질 검사 결과 알림"""
import json
# 품질 검사 결과 가져오기
quality_report = context['task_instance'].xcom_pull(task_ids='data_quality_check')
# 알림 메시지 생성
message = f"""
데이터 품질 검사 결과:
총 레코드 수: {quality_report['total_records']}
중복 레코드 수: {quality_report['duplicate_records']}
상세 결과는 첨부 파일을 참조하세요.
"""
# 이메일 전송
email_task = EmailOperator(
task_id='send_quality_report',
to=['data_team@example.com'],
subject='데이터 품질 검사 결과',
html_content=message,
files=['/tmp/quality_report.json'],
dag=quality_dag,
)
return email_task
# 품질 검사 파이프라인 의존성
quality_check_task >> send_quality_report()
3. 머신러닝 파이프라인
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# 머신러닝 파이프라인 DAG
ml_dag = DAG(
'ml_pipeline',
default_args=default_args,
description='머신러닝 파이프라인',
schedule_interval=timedelta(days=1),
catchup=False,
)
# 데이터 전처리
def data_preprocessing():
"""데이터 전처리"""
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
# 데이터 로드
df = pd.read_csv('/tmp/raw_data.csv')
# 데이터 전처리
X = df.drop('target', axis=1)
y = df['target']
# 스케일링
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 훈련/테스트 분할
X_train, X_test, y_train, y_test = train_test_split(
X_scaled, y, test_size=0.2, random_state=42
)
# 전처리된 데이터 저장
pd.DataFrame(X_train).to_csv('/tmp/X_train.csv', index=False)
pd.DataFrame(X_test).to_csv('/tmp/X_test.csv', index=False)
pd.DataFrame(y_train).to_csv('/tmp/y_train.csv', index=False)
pd.DataFrame(y_test).to_csv('/tmp/y_test.csv', index=False)
print("데이터 전처리 완료")
return "preprocessing_complete"
preprocessing_task = PythonOperator(
task_id='data_preprocessing',
python_callable=data_preprocessing,
dag=ml_dag,
)
# 모델 훈련
def model_training():
"""모델 훈련"""
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import joblib
# 데이터 로드
X_train = pd.read_csv('/tmp/X_train.csv')
y_train = pd.read_csv('/tmp/y_train.csv')
X_test = pd.read_csv('/tmp/X_test.csv')
y_test = pd.read_csv('/tmp/y_test.csv')
# 모델 훈련
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 모델 평가
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
# 모델 저장
joblib.dump(model, '/tmp/trained_model.pkl')
print(f"모델 훈련 완료, 정확도: {accuracy:.4f}")
return accuracy
training_task = PythonOperator(
task_id='model_training',
python_callable=model_training,
dag=ml_dag,
)
# 모델 배포
def model_deployment():
"""모델 배포"""
import shutil
import os
# 모델 파일 복사
source = '/tmp/trained_model.pkl'
destination = '/opt/models/production_model.pkl'
# 디렉토리 생성
os.makedirs(os.path.dirname(destination), exist_ok=True)
# 모델 파일 복사
shutil.copy2(source, destination)
print("모델 배포 완료")
return "deployment_complete"
deployment_task = PythonOperator(
task_id='model_deployment',
python_callable=model_deployment,
dag=ml_dag,
)
# 머신러닝 파이프라인 의존성
preprocessing_task >> training_task >> deployment_task
Airflow 모니터링 및 관리
1. 알림 설정
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from airflow.operators.email import EmailOperator
from airflow.operators.slack import SlackAPIPostOperator
# 실패 시 알림
def failure_callback(context):
"""실패 시 콜백"""
task_instance = context['task_instance']
# 이메일 알림
email_task = EmailOperator(
task_id='failure_email',
to=['admin@example.com'],
subject=f'Airflow 작업 실패: {task_instance.task_id}',
html_content=f"""
작업 실패 정보:
- DAG: {context['dag'].dag_id}
- 작업: {task_instance.task_id}
- 실행 날짜: {context['ds']}
- 오류: {context['exception']}
""",
)
# Slack 알림
slack_task = SlackAPIPostOperator(
task_id='failure_slack',
channel='#alerts',
text=f'🚨 Airflow 작업 실패: {task_instance.task_id}',
username='airflow-bot',
)
return [email_task, slack_task]
# DAG에 실패 콜백 설정
dag = DAG(
'monitored_dag',
default_args=default_args,
description='모니터링되는 DAG',
schedule_interval=timedelta(hours=1),
catchup=False,
on_failure_callback=failure_callback,
)
2. 성능 모니터링
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from airflow.operators.python import PythonOperator
from airflow.models import TaskInstance
# 성능 모니터링
def performance_monitoring(**context):
"""성능 모니터링"""
task_instance = context['task_instance']
# 실행 시간 측정
start_time = task_instance.start_date
end_time = task_instance.end_date
if start_time and end_time:
execution_time = (end_time - start_time).total_seconds()
# 성능 메트릭 저장
metrics = {
'task_id': task_instance.task_id,
'execution_time': execution_time,
'start_time': start_time.isoformat(),
'end_time': end_time.isoformat(),
'status': task_instance.state
}
# 메트릭을 파일로 저장
with open(f'/tmp/metrics_{task_instance.task_id}.json', 'w') as f:
json.dump(metrics, f, indent=2)
print(f"성능 메트릭: {metrics}")
return metrics
performance_task = PythonOperator(
task_id='performance_monitoring',
python_callable=performance_monitoring,
dag=dag,
)
3. 리소스 모니터링
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import psutil
import os
# 리소스 모니터링
def resource_monitoring():
"""리소스 모니터링"""
# 시스템 리소스 정보
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Airflow 프로세스 정보
airflow_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
if 'airflow' in proc.info['name'].lower():
airflow_processes.append(proc.info)
# 리소스 정보 저장
resource_info = {
'timestamp': datetime.now().isoformat(),
'cpu_percent': cpu_percent,
'memory': {
'total': memory.total,
'available': memory.available,
'percent': memory.percent
},
'disk': {
'total': disk.total,
'used': disk.used,
'free': disk.free,
'percent': (disk.used / disk.total) * 100
},
'airflow_processes': airflow_processes
}
# 리소스 정보를 파일로 저장
with open('/tmp/resource_info.json', 'w') as f:
json.dump(resource_info, f, indent=2)
print("리소스 모니터링 완료:")
print(f"CPU 사용률: {cpu_percent}%")
print(f"메모리 사용률: {memory.percent}%")
print(f"디스크 사용률: {(disk.used / disk.total) * 100:.2f}%")
return resource_info
resource_monitoring_task = PythonOperator(
task_id='resource_monitoring',
python_callable=resource_monitoring,
dag=dag,
)
주의사항 및 모범 사례
1. DAG 설계
- 단순성: DAG를 단순하고 이해하기 쉽게 설계
- 재사용성: 재사용 가능한 컴포넌트 사용
- 의존성: 명확한 작업 의존성 정의
- 에러 처리: 적절한 에러 처리 및 재시도
2. 성능 최적화
- 리소스 관리: 적절한 리소스 할당
- 병렬 처리: 가능한 경우 병렬 처리 활용
- 캐싱: 중복 작업 방지를 위한 캐싱
- 모니터링: 지속적인 성능 모니터링
3. 보안
- 접근 제어: 적절한 접근 권한 관리
- 비밀 관리: 민감한 정보 보호
- 감사: 작업 실행 로그 및 감사
- 백업: 정기적인 백업 수행
마무리
Apache Airflow는 복잡한 데이터 파이프라인을 오케스트레이션하기 위한 강력한 플랫폼입니다. DAG를 통한 워크플로우 정의, 스케줄링, 모니터링 등의 기능을 통해 안정적이고 효율적인 데이터 파이프라인을 구축할 수 있습니다. 적절한 설계와 모니터링을 통해 실무에서 요구되는 높은 품질의 데이터 파이프라인을 운영할 수 있습니다.
This post is licensed under CC BY 4.0 by the author.