이 글에서는 Airflow의 핵심 아키텍처와 주요 구성요소들을 상세히 알아봅니다. Scheduler, Webserver, Worker, Metadata Database 등 Airflow를 구성하는 각 컴포넌트의 역할과 상호작용 방식을 이해하고, 각 구성요소가 어떻게 동작하는지 살펴보겠습니다.
📌 Airflow 아키텍처 개요
✅ Airflow의 기본 아키텍처
Airflow는 다음과 같은 주요 구성요소로 이루어진 분산 시스템입니다:
- Web Server: 사용자 인터페이스(UI) 제공
- Scheduler: DAG와 태스크 스케줄링 담당
- Worker: 실제 태스크 실행 담당
- Metadata Database: Airflow의 상태 정보 저장
- DAG Directory: DAG 파일 저장소
✅ 컴포넌트 간 상호작용
Airflow의 주요 구성요소들은 다음과 같이 상호작용합니다:
- Scheduler는 DAG Directory에서 DAG 파일을 읽어들임
- Scheduler는 Metadata Database에 있는 스케줄 정보를 확인하고 실행할 태스크를 결정
- Scheduler는 결정된 태스크를 Worker에게 전달
- Worker는 태스크를 실행하고 결과를 Metadata Database에 저장
- Web Server는 Metadata Database의 정보를 읽어 사용자 인터페이스에 표시
이러한 구조는 각 컴포넌트가 독립적으로 동작하면서도 전체적으로 조화롭게 워크플로를 관리할 수 있게 합니다.
📌 Scheduler (스케줄러)
✅ Scheduler의 역할
Scheduler는 Airflow의 심장부라고 할 수 있으며, 다음과 같은 핵심 역할을 수행합니다:
- DAG 파싱: DAG 디렉토리에서 Python 파일을 읽어 DAG 객체를 생성
- 스케줄링: 각 DAG의 실행 일정을 확인하고 실행할 시점인 DAG를 트리거
- 태스크 인스턴스 생성: DAG Run 내의 태스크를 Task Instance로 변환
- 의존성 확인: 태스크 간의 의존성을 확인하여 실행 순서를 결정
- 실행 큐 관리: 실행 준비가 된 태스크를 Executor에게 전달
# airflow.cfg 파일에서 Scheduler 관련 주요 설정
[scheduler]
# DAG 디렉토리를 얼마나 자주 스캔할지 설정 (초 단위)
# 값이 작을수록 새 DAG가 빨리 감지되지만 시스템 부하가 증가함
dag_dir_list_interval = 300
# 스케줄러가 한 번의 루프에서 처리할 최대 태스크 수
# 값이 클수록 처리량이 증가하지만 리소스 사용량도 증가함
max_tis_per_query = 512
# 스케줄러가 DAG 파일을 재처리하는 주기 (초 단위)
# 이 값을 통해 DAG 코드 변경이 반영되는 시간을 조절할 수 있음
min_file_process_interval = 30
# 스케줄러 자체의 실행 간격 (초 단위)
# 낮은 값은 더 즉각적인 실행을 제공하지만 DB 부하가 증가함
scheduler_heartbeat_sec = 5
# 동시에 실행할 수 있는 태스크 인스턴스의 최대 수
# 과도한 값은 시스템 리소스를 고갈시킬 수 있음
max_active_tasks_per_dag = 16
✅ Scheduler 동작 원리
Scheduler는 다음과 같은 주기적인 프로세스로 작동합니다:
- DAG 파싱 주기:
- dag_dir_list_interval 설정에 따라 DAG 디렉토리를 스캔
- 변경된 DAG 파일만 다시 파싱하여 메모리 효율성 유지
- 파싱된 DAG 정보는 메타데이터 데이터베이스에 저장
- 스케줄링 주기:
- scheduler_heartbeat_sec 설정에 따라 주기적으로 실행
- 실행 예정 시간이 지난 DAG를 찾아 DAG Run 생성
- catchup 설정에 따라 과거 실행 건너뛰기 여부 결정
- 태스크 스케줄링:
- 생성된 DAG Run에서 실행 가능한 태스크 식별
- 의존성을 확인하여 실행 순서 결정
- 실행 준비가 된 태스크를 Executor에게 전달
- 상태 모니터링:
- 실행 중인 태스크의 상태 주기적 확인
- 실행 시간 초과 태스크 식별 및 처리
- 완료된 태스크의 결과 처리 및 의존 태스크 트리거
# 스케줄러 시작 명령어
airflow scheduler
# 백그라운드로 실행하는 경우
nohup airflow scheduler > scheduler.log 2>&1 &
✅ Scheduler 설정 최적화 방법
- DAG 파싱 성능 최적화:
- min_file_process_interval을 증가시켜 파싱 빈도 감소
- 많은 수의 DAG를 사용하는 경우, 관련 DAG들을 별도의 폴더로 구성
- DAG 파일 내에서 불필요한 임포트 및 계산 최소화
- 스케줄링 성능 최적화:
- max_tis_per_query를 시스템 성능에 맞게 조정
- catchup=False를 사용하여 불필요한 과거 실행 방지
- 데이터베이스 커넥션 풀 크기 최적화
- 리소스 관리:
- max_active_tasks_per_dag 설정으로 병렬 실행 수 제한
- 태스크 우선순위 설정으로 중요 태스크 먼저 실행
- 충분한 CPU 및 메모리 할당
📌 Web Server (웹 서버)
✅ Web Server의 역할
Web Server는 사용자가 Airflow와 상호작용할 수 있는 인터페이스를 제공하며, 다음과 같은 역할을 수행합니다:
- 사용자 인터페이스(UI) 제공: DAG 및 태스크의 상태, 실행 이력, 로그 등을 시각적으로 제공
- 인증 및 권한 관리: 사용자 인증 및 권한 부여 관리
- REST API 제공: 프로그래밍 방식으로 Airflow를 제어할 수 있는 API 제공
- 대시보드 및 모니터링: DAG 및 태스크의 성공/실패 통계, 시각화된 의존성 그래프 제공
# airflow.cfg 파일에서 Web Server 관련 주요 설정
[webserver]
# 웹 서버가 사용할 포트 번호
web_server_port = 8080
# 최대 동시 웹 요청 수
workers = 4
# 웹 서버의 타임아웃 설정 (초 단위)
web_server_timeout = 120
# UI에 표시할 DAG 당 최대 실행 이력 수
dag_run_display_number = 25
# 웹 UI에서 사용할 인증 방식
# - password: 기본 데이터베이스 인증
# - ldap: LDAP 기반 인증
# - oauth: OAuth 기반 인증
authenticate = password
# RBAC(Role-Based Access Control) 사용 여부
rbac = True
✅ Web Server 주요 기능
- DAG 관리 인터페이스:
- DAG 목록 및 상세 정보 조회
- DAG 활성화/비활성화 전환
- DAG 수동 트리거
- 태스크 인스턴스 상태 확인 및 관리
- 모니터링 및 디버깅:
- 태스크 로그 조회
- DAG 그래프 시각화
- 실행 이력 및 타임라인 뷰
- 태스크 간 데이터 전달(XCom) 확인
- 관리 기능:
- 변수(Variables) 관리
- 연결(Connections) 설정
- 풀(Pools) 및 슬롯(Slots) 관리
- 사용자 및 역할 관리(RBAC 사용 시)
[Airflow 웹 UI의 주요 화면을 보여주는 이미지. DAG 목록, 그래프 뷰, 실행 이력, 로그 조회 등의 인터페이스]
✅ Web Server 보안 설정
Airflow Web Server의 보안을 강화하기 위한 주요 설정:
- 인증 설정:
# airflow.cfg의 인증 관련 설정
[webserver]
authenticate = password # 인증 방식 선택
auth_backend = airflow.api.auth.backend.basic_auth # 인증 백엔드 지정
# LDAP 인증 사용 시
[ldap]
uri = ldap://ldap.example.com
user_filter = objectClass=*
user_name_attr = uid
superuser_filter = memberOf=CN=airflow-super-users
data_profiler_filter = memberOf=CN=airflow-data-profilers
bind_user = uid=admin,ou=users,dc=example,dc=com
bind_password = admin_password
- RBAC(Role-Based Access Control) 설정:
# airflow.cfg에서 RBAC 활성화
[webserver]
rbac = True
- HTTPS 설정:
# airflow.cfg에서 SSL 설정
[webserver]
web_server_ssl_cert = /path/to/cert.pem
web_server_ssl_key = /path/to/key.pem
- API 인증 설정:
# RESTful API 보안 설정
[api]
auth_backend = airflow.api.auth.backend.deny_all # 기본적으로 모든 API 접근 거부
# 또는
auth_backend = airflow.api.auth.backend.basic_auth # 기본 인증 사용
📌 Executor & Worker (실행자 & 작업자)
✅ Executor의 역할과 종류
Executor는 Scheduler로부터 받은 태스크를 실제로 실행하는 방식을 결정하는 구성 요소입니다:
- SequentialExecutor:
- 가장 기본적인 실행자로, 한 번에 하나의 태스크만 실행
- SQLite와 함께 사용되며, 주로 테스트 및 개발 환경에 적합
- 병렬 처리가 불가능하여 프로덕션 환경에는 적합하지 않음
# airflow.cfg에서 SequentialExecutor 설정
[core]
executor = SequentialExecutor
- LocalExecutor:
- 로컬 환경에서 병렬로 태스크 실행
- 프로세스 기반 병렬 처리를 통해 단일 머신에서 여러 태스크 동시 실행
- 중소 규모 워크로드에 적합하며, MySQL 등의 데이터베이스 필요
# airflow.cfg에서 LocalExecutor 설정
[core]
executor = LocalExecutor
# 최대 병렬 태스크 수 (0은 제한 없음)
parallelism = 32
- CeleryExecutor:
- 분산 환경에서 여러 Worker 노드를 통해 태스크 실행
- Celery 메시지 큐를 통해 태스크 분배
- 대규모 워크로드 및 고가용성 요구 환경에 적합
# airflow.cfg에서 CeleryExecutor 설정
[core]
executor = CeleryExecutor
[celery]
# Celery 브로커 URL (RabbitMQ, Redis 등)
broker_url = redis://redis:6379/0
# Celery 결과 백엔드
result_backend = db+postgresql://airflow:airflow@postgres:5432/airflow
# 각 워커당 동시 실행 태스크 수
worker_concurrency = 16
- KubernetesExecutor:
- Kubernetes 클러스터에서 각 태스크를 별도의 Pod으로 실행
- 자원 격리 및 동적 스케일링이 가능
- 클라우드 네이티브 환경에 최적화된 실행자
# airflow.cfg에서 KubernetesExecutor 설정
[core]
executor = KubernetesExecutor
[kubernetes]
# Kubernetes 클러스터 연결 설정
in_cluster = True # 클러스터 내부에서 실행 시
# 또는
kube_config = /path/to/kube_config # 클러스터 외부에서 실행 시
# 기본 네임스페이스
namespace = airflow
# 기본 이미지
worker_container_repository = apache/airflow
worker_container_tag = 2.3.0
- CeleryKubernetesExecutor:
- CeleryExecutor와 KubernetesExecutor를 함께 사용
- 태스크 특성에 따라 적절한 Executor를 선택하여 실행
# airflow.cfg에서 CeleryKubernetesExecutor 설정
[core]
executor = CeleryKubernetesExecutor
[celery_kubernetes_executor]
# 태스크 라우팅 설정
kubernetes_queue = kubernetes
✅ Worker의 역할과 구성
Worker는 실제로 태스크를 실행하는 프로세스 또는 컨테이너입니다:
- Worker 프로세스:
- LocalExecutor: 부모 프로세스에서 생성된 자식 프로세스
- CeleryExecutor: Celery Worker 프로세스
- KubernetesExecutor: Kubernetes Pod
- Worker 구성 요소:
- 태스크 실행 환경 (Python 인터프리터)
- 필요한 라이브러리 및 의존성
- 태스크 실행 로직
- 결과 및 로그 처리 메커니즘
- Worker 설정 최적화:
# LocalExecutor의 Worker 설정
[core]
# 태스크당 최대 실행 시간 (초 단위)
task_execution_timeout = 6000
# CeleryExecutor의 Worker 설정
[celery]
# Worker 프로세스당 동시 실행 태스크 수
worker_concurrency = 16
# Worker 자동 종료 전 최대 실행 태스크 수
worker_autoscale = 16,8
# Worker 큐 설정
worker_queues = default,high_priority,low_priority
# KubernetesExecutor의 Worker 설정
[kubernetes]
# 태스크 Pod의 리소스 요청 및 제한
worker_container_repository = apache/airflow
worker_container_tag = 2.3.0
# 기본 CPU 요청
worker_request_cpu = 200m
# 기본 메모리 요청
worker_request_memory = 1Gi
# 기본 CPU 제한
worker_limit_cpu = 1
# 기본 메모리 제한
worker_limit_memory = 2Gi
✅ 환경에 따른 Executor 선택 가이드
- 개발 및 테스트 환경:
- SequentialExecutor: 가장 간단하게 설정 가능, 단일 태스크 실행에 적합
- LocalExecutor: 병렬 처리가 필요한 개발 환경에 적합
- 소규모 프로덕션 환경:
- LocalExecutor: 단일 서버에서 중소 규모 워크로드 처리에 적합
- 설정이 간단하고 추가 서비스(메시지 큐 등) 불필요
- 대규모 프로덕션 환경:
- CeleryExecutor: 수평적 확장이 가능한 분산 환경에 적합
- 고가용성 및 내결함성 지원
- 다양한 워크로드 패턴에 유연하게 대응 가능
- 클라우드 네이티브 환경:
- KubernetesExecutor: Kubernetes 클러스터에서 동적 확장이 필요한 경우
- 태스크별 리소스 격리가 중요한 경우
- 다양한 컴퓨팅 리소스 요구사항을 가진 워크로드에 적합
- 하이브리드 환경:
- CeleryKubernetesExecutor: 다양한 유형의 태스크를 실행하는 복잡한 환경
- 일부 태스크는 Celery, 일부는 Kubernetes에서 실행하고자 할 때
📌 Metadata Database (메타데이터 데이터베이스)
✅ Metadata Database의 역할
Metadata Database는 Airflow의 '두뇌'로, 다음과 같은 정보를 저장하고 관리합니다:
- DAG 메타데이터: DAG 정의, 속성, 스케줄 정보
- 실행 이력: DAG Run 및 Task Instance의 실행 기록 및 상태
- 변수 및 연결 정보: Airflow 변수 및 외부 시스템 연결 정보
- 사용자 및 권한: 인증 및 권한 관련 정보(RBAC 사용 시)
- XCom 데이터: 태스크 간 데이터 공유를 위한 XCom 값
✅ 지원되는 데이터베이스 유형
Airflow는 다양한 관계형 데이터베이스를 메타데이터 저장소로 사용할 수 있습니다:
- SQLite:
- 가장 간단한 설정으로, 파일 기반 데이터베이스
- SequentialExecutor와만 호환됨
- 개발 및 테스트 환경에만 권장
# airflow.cfg에서 SQLite 설정
[core]
sql_alchemy_conn = sqlite:////path/to/airflow.db
- PostgreSQL:
- 안정성과 성능이 좋아 프로덕션 환경에 권장됨
- 모든 Executor 유형과 호환
- 트랜잭션 및 동시성 지원이 우수
# airflow.cfg에서 PostgreSQL 설정
[core]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
- MySQL:
- 널리 사용되는 데이터베이스로, 프로덕션 환경에 적합
- 설정이 비교적 간단하고 성능이 우수
- 버전 8.0 이상 권장
# airflow.cfg에서 MySQL 설정
[core]
sql_alchemy_conn = mysql://airflow:airflow@mysql:3306/airflow
- 기타 데이터베이스:
- MSSQL, Oracle 등 SQLAlchemy가 지원하는 다른 데이터베이스도 사용 가능
- 특정 환경 요구사항에 따라 선택
✅ 데이터베이스 설정 및 최적화
- 연결 설정:
# airflow.cfg의 데이터베이스 연결 설정
[core]
# SQLAlchemy 연결 문자열
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
# 연결 풀 크기
sql_alchemy_pool_size = 5
# 연결 풀 초과 허용치
sql_alchemy_pool_recycle = 1800
# 연결 오버헤드(초)
sql_alchemy_connect_args = {"connect_timeout": 30}
- 성능 최적화:
# PostgreSQL 성능 최적화 설정
# postgresql.conf
max_connections = 200
shared_buffers = 2GB
work_mem = 16MB
maintenance_work_mem = 256MB
effective_cache_size = 4GB
- 데이터베이스 관리:
# 데이터베이스 초기화
airflow db init
# 데이터베이스 업그레이드 (Airflow 버전 업데이트 후)
airflow db upgrade
# 데이터베이스 재설정 (주의: 모든 데이터 삭제)
airflow db reset
# 데이터베이스 확인
airflow db check
- 백업 및 복구:
# PostgreSQL 백업
pg_dump -U airflow -d airflow -f airflow_backup.sql
# PostgreSQL 복구
psql -U airflow -d airflow -f airflow_backup.sql
# MySQL 백업
mysqldump -u airflow -p airflow > airflow_backup.sql
# MySQL 복구
mysql -u airflow -p airflow < airflow_backup.sql
✅ 데이터베이스 스키마 개요
Airflow 메타데이터 데이터베이스의 주요 테이블:
- dag: DAG 메타데이터 저장
- dag_run: DAG 실행 이력 저장
- task_instance: 태스크 실행 이력 및 상태 저장
- job: Scheduler, Webserver 등 Airflow 프로세스 정보 저장
- variable: Airflow 변수 저장
- connection: 외부 시스템 연결 정보 저장
- xcom: 태스크 간 데이터 공유를 위한 XCom 값 저장
- sla_miss: SLA 미달성 이벤트 저장
- log: 로그 정보 저장 (로그 데이터베이스 사용 시)
-- 주요 데이터베이스 쿼리 예시
-- 가장 최근 실행된 DAG 목록 조회
SELECT dag_id, execution_date, state
FROM dag_run
ORDER BY execution_date DESC
LIMIT 10;
-- 실패한 태스크 인스턴스 조회
SELECT dag_id, task_id, execution_date
FROM task_instance
WHERE state = 'failed'
ORDER BY execution_date DESC;
-- 특정 DAG의 실행 시간 통계
SELECT
dag_id,
AVG(EXTRACT(EPOCH FROM (end_date - start_date))) as avg_duration,
MAX(EXTRACT(EPOCH FROM (end_date - start_date))) as max_duration
FROM task_instance
WHERE dag_id = 'example_dag'
GROUP BY dag_id;
📌 DAG 저장소 (DAG Storage)
✅ DAG 저장소의 역할
DAG 저장소는 모든 DAG 파일이 저장되는 위치로, 다음과 같은 역할을 합니다:
- DAG 코드 저장: Python으로 작성된 DAG 정의 파일 저장
- 관련 파일 저장: DAG에서 참조하는 Python 모듈, 설정 파일, 템플릿 등 저장
- 스케줄러 접근 제공: Scheduler가 주기적으로 DAG를 파싱하기 위한 접근 제공
✅ DAG 저장소 설정
- 기본 설정:
# airflow.cfg에서 DAG 디렉토리 설정
[core]
# DAG 파일이 저장될 디렉토리 경로
dags_folder = /path/to/airflow/dags
# DAG 파일 감지 설정
# Python 파일이 아닌 파일을 무시할지 여부
ignore_dags_by_default = True
# DAG 검색 시 서브디렉토리까지 검색할지 여부
dags_folder_list_interval = 300
- 다양한 저장소 유형:
b. NFS나 공유 파일 시스템:c. Git 동기화:d. Kubernetes ConfigMap/Git-Sync: Kubernetes 환경에서는 ConfigMap이나 Git-Sync 사이드카를 통해 DAG 관리# 기본 로컬 파일 시스템 설정 dags_folder = /path/to/airflow/dags
- # Git 기반 DAG 동기화 설정 (외부 도구 사용) dags_folder = /path/to/airflow/dags # 외부 Git 동기화 스크립트 설정 필요
- # 네트워크 공유 파일 시스템 설정 dags_folder = /mnt/nfs/airflow/dags
- a. 로컬 파일 시스템:
- DAG 파일 감지 설정:
# DAG 파일 인식 패턴 설정
[core]
# DAG 파일 패턴 (기본값은 *.py)
dag_file_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
dag_discovery_safe_mode = True
✅ DAG 코드 관리 모범 사례
- 버전 관리 시스템 사용:
- Git을 사용하여 DAG 코드 변경 사항 추적
- 브랜치 전략 수립 (예: dev, staging, production)
- 코드 리뷰 및 PR(Pull Request) 프로세스 도입
# DAG 저장소 버전 관리 예시
git init /path/to/airflow/dags
cd /path/to/airflow/dags
git remote add origin https://github.com/your-org/airflow-dags.git
# 개발 환경에서 테스트 후 프로덕션에 배포
git checkout -b feature/new-data-pipeline
# DAG 파일 개발 및 수정
git add new_dag.py
git commit -m "Add new data pipeline DAG"
git push origin feature/new-data-pipeline
# PR 생성 및 코드 리뷰 후 메인 브랜치에 병합
- 디렉토리 구조화:
dags/
├── data_pipelines/ # 데이터 파이프라인 관련 DAG
│ ├── sales_pipeline.py
│ └── marketing_pipeline.py
├── maintenance/ # 유지보수 관련 DAG
│ ├── cleanup_logs.py
│ └── system_health_check.py
├── utils/ # 공통 유틸리티 모듈
│ ├── slack_alerts.py
│ └── db_helpers.py
├── include/ # SQL 쿼리, 구성 파일 등 포함 파일
│ ├── sql/
│ └── config/
└── tests/ # DAG 테스트 코드
├── test_sales_pipeline.py
└── test_marketing_pipeline.py
- 변경 관리 및 배포:
- CI/CD 파이프라인 구축하여 자동 테스트 및 배포
- 변경 로그 유지
- 롤백 절차 마련
- 다중 환경 관리:
- 환경별 변수 분리
- 환경별 설정 파일 분리
- 환경별 태그 사용
# 환경별 설정 분리 예시
def create_dag(env):
if env == 'dev':
schedule = None # 개발 환경에서는 수동 트리거만
db_conn_id = 'dev_db'
elif env == 'staging':
schedule = '@daily' # 스테이징에서 일일 실행
db_conn_id = 'staging_db'
else: # 프로덕션
schedule = '0 */3 * * *' # 3시간마다 실행
db_conn_id = 'prod_db'
dag = DAG(
f'data_pipeline_{env}',
schedule_interval=schedule,
# 기타 설정...
)
# DAG 태스크 정의...
return dag
# 환경별 DAG 생성
for env in ['dev', 'staging', 'prod']:
globals()[f'data_pipeline_{env}'] = create_dag(env)
📌 Airflow 구성요소 간 통신
✅ 컴포넌트 통신 흐름
Airflow 구성요소 간의 주요 통신 흐름:
- Scheduler ↔ Metadata DB:
- DAG 및 태스크 메타데이터 읽기/쓰기
- 실행 상태 업데이트
- 스케줄링 정보 관리
- Scheduler ↔ Executor:
- 실행할 태스크 전달
- 태스크 상태 및 결과 수신
- Executor ↔ Worker:
- (CeleryExecutor의 경우) 메시지 큐를 통한 태스크 분배
- (KubernetesExecutor의 경우) Kubernetes API를 통한 Pod 생성 및 관리
- Worker ↔ Metadata DB:
- 태스크 실행 상태 업데이트
- XCom을 통한 태스크 간 데이터 공유
- Web Server ↔ Metadata DB:
- UI 표시를 위한 DAG 및 태스크 정보 조회
- 사용자 작업(DAG 트리거 등) 처리
- Web Server ↔ DAG Storage:
- DAG 코드 조회 및 표시
# 컴포넌트 간 통신 설정 예시
# Metadata DB 연결 설정
[core]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
# CeleryExecutor 사용 시 메시지 큐 설정
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres:5432/airflow
# KubernetesExecutor 사용 시 Kubernetes API 설정
[kubernetes]
in_cluster = True
# 또는
kube_config = /path/to/kube_config
✅ 통신 보안 설정
- 데이터베이스 연결 보안:
# DB 연결 암호화 설정
[core]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow?sslmode=require
- Celery 메시지 큐 보안:
# Celery 브로커 보안 설정
[celery]
broker_url = redis://:password@redis:6379/0
# 또는
broker_url = amqp://user:password@rabbitmq:5672/airflow
- API 보안:
# API 인증 설정
[api]
auth_backend = airflow.api.auth.backend.basic_auth
- 네트워크 수준 보안:
- 컴포넌트 간 통신을 위한 전용 내부 네트워크 구성
- 방화벽 규칙을 통한 접근 제한
- VPC(Virtual Private Cloud) 또는 네트워크 세그먼트 분리
📌 Airflow 설치 구성 옵션
✅ 단일 노드 설치
모든 구성요소가 하나의 머신에서 실행되는 가장 간단한 설치 방식:
- 장점:
- 설정이 간단함
- 리소스 요구사항이 적음
- 개발 및 테스트 환경에 적합
- 단점:
- 확장성 제한
- 단일 장애점 존재
- 리소스 경합 가능성
- 설치 예시:
# pip를 통한 설치
pip install apache-airflow
# 초기화
airflow db init
# 사용자 생성
airflow users create \
--username admin \
--password admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
# 서비스 시작
airflow webserver --port 8080
airflow scheduler
✅ 다중 노드 설치
구성요소가 여러 머신에 분산되어 실행되는 고급 설치 방식:
- 장점:
- 높은 확장성
- 컴포넌트별 리소스 최적화
- 내결함성 및 고가용성 지원
- 단점:
- 설정 복잡성 증가
- 추가 인프라 요구사항
- 관리 오버헤드 증가
- 설치 예시:
# 모든 노드에 Airflow 설치
pip install apache-airflow[celery,redis,postgres]
# Metadata DB 노드 설정
# PostgreSQL 설정 및 데이터베이스 생성
# Web Server 노드 설정
airflow webserver --port 8080
# Scheduler 노드 설정
airflow scheduler
# Worker 노드 설정 (CeleryExecutor 사용 시)
airflow celery worker
✅ 컨테이너 기반 설치
Docker 및 Kubernetes를 활용한 컨테이너화된 설치 방식:
- Docker Compose:
- 개발 및 중소 규모 환경에 적합
- 단일 호스트에서 여러 컨테이너로 구성요소 분리
# docker-compose.yaml 예시
version: '3'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
redis:
image: redis:latest
webserver:
image: apache/airflow:2.3.0
depends_on:
- postgres
- redis
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
command: webserver
ports:
- "8080:8080"
scheduler:
image: apache/airflow:2.3.0
depends_on:
- postgres
- redis
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
command: scheduler
volumes:
- ./dags:/opt/airflow/dags
worker:
image: apache/airflow:2.3.0
depends_on:
- postgres
- redis
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
command: celery worker
volumes:
- ./dags:/opt/airflow/dags
volumes:
postgres-db-volume:
- Kubernetes:
- 대규모 프로덕션 환경에 적합
- 자동 스케일링 및 고가용성 지원
- Helm 차트를 통한 설치 관리
# Helm을 사용한 Kubernetes 설치
helm repo add apache-airflow https://airflow.apache.org
helm install airflow apache-airflow/airflow \
--set executor=CeleryExecutor \
--set webserver.replicas=2 \
--set worker.replicas=3
✅ 클라우드 관리형 서비스
클라우드 제공업체가 관리하는 Airflow 서비스:
- Amazon MWAA (Managed Workflows for Apache Airflow):
- AWS에서 제공하는 관리형 Airflow 서비스
- 인프라 관리 오버헤드 감소
- AWS 서비스와의 통합 지원
- Google Cloud Composer:
- GCP에서 제공하는 관리형 Airflow 서비스
- GCP 서비스와의 통합 지원
- 자동 스케일링 및 업그레이드 지원
- Microsoft Azure Data Factory:
- Airflow는 아니지만 유사한 워크플로 관리 서비스
- Azure 서비스와의 통합 지원
- Astronomer:
- Airflow에 특화된 독립 관리형 서비스
- 기업용 기능 및 지원 제공
Summary
- Airflow 아키텍처는 Web Server, Scheduler, Worker, Metadata Database, DAG Storage로 구성된 분산 시스템입니다.
- Scheduler는 DAG 파싱, 스케줄링, 의존성 확인, 실행 큐 관리 등 Airflow의 핵심 역할을 담당합니다.
- Web Server는 사용자 인터페이스, 인증 및 권한 관리, REST API, 모니터링 기능을 제공합니다.
- Executor는 SequentialExecutor, LocalExecutor, CeleryExecutor, KubernetesExecutor 등 다양한 유형이 있으며, 워크로드 특성에 맞게 선택해야 합니다.
- Metadata Database는 PostgreSQL, MySQL 등의 관계형 데이터베이스를 사용하여 DAG, 실행 이력, 변수, 연결 정보 등을 저장합니다.
- DAG Storage는 Python으로 작성된 DAG 파일과 관련 리소스를 저장하며, 버전 관리 시스템과 연동하여 효율적으로 관리할 수 있습니다.
- Airflow 설치 옵션으로는 단일 노드 설치, 다중 노드 설치, 컨테이너 기반 설치, 클라우드 관리형 서비스 등이 있으며, 환경 요구사항에 맞게 선택할 수 있습니다.
- 각 구성요소를 최적화하여 Airflow의 성능, 안정성, 확장성을 향상시킬 수 있습니다.