최근 저는 로컬 환경(Minikube 클러스터)에서 Helm을 이용해 Airflow를 배포하고, DAG 파일을 등록하는 과정에서 발생하는 문제들을 해결해 보았습니다. 이 글에서는 Hello World와 MySQL 연동 DAG를 예시로, Airflow DAG 등록 문제, MySQL 프로바이더 패키지 관련 이슈, 그리고 로컬과 클러스터 간 볼륨 마운트 설정 방법 등을 단계별로 정리해 보겠습니다.
1. 간단한 Hello World DAG 등록
먼저, 별다른 의존성이 없는 간단한 "Hello World" DAG는 Airflow가 기본적으로 잘 인식하는 것을 확인했습니다. 예시 코드는 다음과 같습니다.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
# 기본 인자 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
# DAG 객체 생성: 이름은 'test_dag', 매일 실행
dag = DAG(
'test_dag',
default_args=default_args,
schedule_interval='@daily'
)
def hello_world():
print("Hello World from test DAG!")
# PythonOperator를 사용한 태스크 정의
hello_task = PythonOperator(
task_id='hello_task',
python_callable=hello_world,
dag=dag
)
이 DAG는 /opt/airflow/dags 폴더에 정상적으로 복사되어 Airflow UI에 표시되었습니다.
2. MySQL 연동 DAG 등록 문제 및 해결
두 번째로 작성한 DAG는 외부 API에서 Cat Fact 데이터를 가져와 MySQL에 저장하는 내용입니다. 코드는 다음과 같습니다.
from datetime import datetime, timedelta
import requests
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
# DAG 환경변수 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# DAG 객체 생성: schedule_interval 대신 schedule 사용 (Airflow 2.x)
dag = DAG('openapi_to_mysql', default_args=default_args, schedule='@daily')
def fetch_cat_fact(**kwargs):
"""
Cat Fact API에서 데이터를 가져오는 함수.
HTTP GET 요청을 통해 데이터를 받아오고 JSON 형태로 파싱하여 반환한다.
반환된 데이터는 XCom을 통해 다음 태스크로 전달.
"""
url = 'https://catfact.ninja/fact'
response = requests.get(url)
data = response.json()
return data
def insert_cat_fact(**kwargs):
"""
fetch_cat_fact 태스크로부터 전달받은 데이터를 MySQL에 저장하는 함수.
MySQL 연결은 Airflow Connection ID 'mysql_default'를 사용한다.
테이블이 없으면 먼저 생성하고, 데이터를 삽입.
"""
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='fetch_cat_fact')
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
create_table_sql = """
CREATE TABLE IF NOT EXISTS cat_facts(
id INT AUTO_INCREMENT PRIMARY KEY,
fact TEXT,
length INT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
"""
mysql_hook.run(create_table_sql)
insert_sql = """
INSERT INTO cat_facts (fact, length)
VALUES (%s, %s)
"""
record = (data.get('fact'), data.get('length'))
mysql_hook.run(insert_sql, parameters=record)
# 태스크 정의
fetch_cat_fact_task = PythonOperator(
task_id='fetch_cat_fact',
python_callable=fetch_cat_fact,
dag=dag
)
insert_cat_fact_task = PythonOperator(
task_id='insert_cat_fact',
python_callable=insert_cat_fact,
dag=dag
)
fetch_cat_fact_task >> insert_cat_fact_task
이 DAG가 Airflow UI에 나타나지 않는 문제가 있었습니다.
원인:
MySQL 연동 DAG에서는 from airflow.providers.mysql.hooks.mysql import MySqlHook를 사용하고 있습니다. 이 모듈은 Airflow 코어에 포함되어 있지 않고, 별도의 MySQL 프로바이더 패키지에 속합니다. 만약 이 패키지가 Airflow 컨테이너에 설치되어 있지 않다면 ImportError가 발생해 DAG 로딩에 실패하게 됩니다.
해결 방법:
Airflow 컨테이너에 MySQL 프로바이더 패키지를 추가해야 합니다. 단순히 로컬 환경에서 pip install apache-airflow-providers-mysql 명령을 실행하는 것만으로는 충분하지 않습니다.
Minikube 클러스터에 Helm Chart를 통해 배포하는 경우, Helm values.yaml 파일에 extraPipPackages 옵션을 사용해 모든 Airflow 컨테이너(스케줄러, 웹서버, 워커)에 자동으로 해당 패키지를 설치하도록 설정할 수 있습니다.
예를 들어, values.yaml에 아래 내용을 추가합니다:
airflow:
extraPipPackages: "apache-airflow-providers-mysql"
이렇게 하면 Airflow 컨테이너가 시작될 때 필요한 MySQL 관련 라이브러리가 자동으로 설치되어, MySQL 연동 DAG가 정상적으로 로드되고 실행됩니다.
3. 로컬과 클러스터 간 DAG 공유: 볼륨 마운트 설정
Minikube나 KIND 클러스터를 사용할 경우, 로컬의 DAG 파일을 클러스터 내 Airflow 컨테이너와 공유하기 위해 볼륨 마운트를 사용합니다.
예를 들어, Minikube에서는 다음 명령어로 로컬의 dags 폴더를 Minikube 노드의 /opt/airflow/dags 경로에 마운트할 수 있습니다.
nohup minikube mount $(pwd)/dags:/opt/airflow/dags > mount.log 2>&1 &
이렇게 하면 로컬의 DAG 파일 변경사항이 Airflow 컨테이너에 바로 반영되어, 개발 및 테스트가 용이해집니다.
4. Helm을 통한 Airflow Cluster 배포 시 Persistent Volume 마운트
Helm Chart를 사용해 Airflow를 배포할 때, 모든 Airflow 컴포넌트(스케줄러, 웹서버, 워커)에서 동일한 DAG 파일을 사용하도록 extraVolumes와 extraVolumeMounts 설정을 적용할 수 있습니다. 아래는 해당 설정을 포함한 values.yaml 파일 예시입니다.
# Airflow Webserver 설정 및 기본 관리자 계정
webserver:
service:
type: LoadBalancer
extraVolumeMounts:
- name: dags
mountPath: /opt/airflow/dags
extraVolumes:
- name: dags
hostPath:
path: /opt/airflow/dags
auth:
defaultUser:
enabled: true
username: admin
password: admin
# PostgreSQL 설정 (Airflow 메타데이터 DB용)
postgresql:
primary:
securityContext:
runAsUser: 0
fsGroup: 0
volumePermissions:
enabled: true
# Worker 설정
workers:
securityContext:
runAsUser: 0
fsGroup: 0
resources:
requests:
memory: 2048Mi
cpu: 500m
limits:
memory: 3072Mi
cpu: 1000m
extraInitContainers:
- name: fix-worker-logs-permissions
image: busybox:latest
command:
- sh
- -c
- "mkdir -p /opt/airflow/logs/worker && chmod -R 755 /opt/airflow/logs/worker"
volumeMounts:
- name: logs
mountPath: /opt/airflow/logs
extraVolumeMounts:
- name: dags
mountPath: /opt/airflow/dags
extraVolumes:
- name: dags
hostPath:
path: /opt/airflow/dags
# Scheduler 설정
scheduler:
securityContext:
runAsUser: 0
fsGroup: 0
extraInitContainers:
- name: fix-logs-permissions
image: busybox:latest
command:
- sh
- -c
- "mkdir -p /opt/airflow/logs/scheduler && chmod -R 755 /opt/airflow/logs/scheduler"
volumeMounts:
- name: logs
mountPath: /opt/airflow/logs
extraVolumeMounts:
- name: dags
mountPath: /opt/airflow/dags
extraVolumes:
- name: dags
hostPath:
path: /opt/airflow/dags
# MySQL 프로바이더 의존성 해결을 위한 추가 pip 패키지 설치
airflow:
extraPipPackages: "apache-airflow-providers-mysql"
이 values.yaml 파일을 사용하여 Helm 업그레이드를 진행하면, 클러스터 내 모든 Airflow 컴포넌트가 Minikube 노드의 /opt/airflow/dags 경로를 공유하고, 동시에 MySQL 관련 라이브러리가 설치되어 MySQL 연동 DAG가 문제없이 동작합니다.
Helm 업그레이드 명령은 다음과 같이 실행합니다:
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --reuse-values -f values.yaml
5. 결론
이번 작업에서는 다음과 같은 주요 내용을 다루었습니다:
- 간단한 Hello World DAG 등록: 의존성 없는 DAG는 기본적으로 잘 로드됨.
- MySQL 연동 DAG 문제 해결:
- Airflow DAG에서 MySQL 관련 연산자를 사용하기 위해 MySqlHook를 임포트할 때, MySQL 프로바이더 패키지가 필수입니다.
- Helm Chart의 airflow.extraPipPackages 옵션을 통해 apache-airflow-providers-mysql 라이브러리를 컨테이너에 설치하여 이 문제를 해결했습니다.
- 볼륨 마운트를 통한 로컬과 클러스터 간 DAG 공유: Minikube mount 명령어를 사용해 로컬 DAG 폴더를 공유하는 방법을 소개했습니다.
- Helm values.yaml 구성: 모든 Airflow 컴포넌트에 대해 동일한 DAG 파일이 마운트되도록 설정하고, 추가 pip 패키지 설치 옵션을 사용하여 의존성 문제를 해결했습니다.
이와 같은 접근법을 통해, 로컬 개발 환경에서도 Airflow Cluster를 보다 안정적이고 일관되게 운영할 수 있으며, MySQL 연동 DAG 또한 문제없이 동작하게 할 수 있습니다.
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow 가이드 ep.2] 1부 개념과 설정 #2 | DAG란 무엇인가? DAG 구조와 트리거 방식 이해 (0) | 2025.03.25 |
---|---|
[Airflow 가이드 ep.1] 1부 개념과 설정 #1 | 워크플로 자동화란? Airflow의 필요성과 기본 개념 (0) | 2025.03.25 |
🚀 Docker Desktop에서 Apache Airflow 설치 및 DAG 파일 마운트하기 (0) | 2025.02.26 |
Apache Airflow를 Minikube와 Helm으로 설치하기 (Apple M1 환경) (0) | 2025.02.24 |
Apache Airflow 가이드: 역할과 아키텍처 정리 (0) | 2025.02.23 |