Data Engineering/Airflow

[Airflow 가이드 ep.3] 1부 개념과 설정 #3 | Airflow 주요 구성요소: Scheduler, Webserver, Worker, DB

ygtoken 2025. 3. 25. 15:25
728x90

이 글에서는 Airflow의 핵심 아키텍처와 주요 구성요소들을 상세히 알아봅니다. Scheduler, Webserver, Worker, Metadata Database 등 Airflow를 구성하는 각 컴포넌트의 역할과 상호작용 방식을 이해하고, 각 구성요소가 어떻게 동작하는지 살펴보겠습니다.


📌 Airflow 아키텍처 개요

Airflow의 기본 아키텍처

Airflow는 다음과 같은 주요 구성요소로 이루어진 분산 시스템입니다:

  1. Web Server: 사용자 인터페이스(UI) 제공
  2. Scheduler: DAG와 태스크 스케줄링 담당
  3. Worker: 실제 태스크 실행 담당
  4. Metadata Database: Airflow의 상태 정보 저장
  5. DAG Directory: DAG 파일 저장소

 

Airflow 아키텍처 구성요소

 

 

컴포넌트 간 상호작용

Airflow의 주요 구성요소들은 다음과 같이 상호작용합니다:

  1. SchedulerDAG Directory에서 DAG 파일을 읽어들임
  2. SchedulerMetadata Database에 있는 스케줄 정보를 확인하고 실행할 태스크를 결정
  3. Scheduler는 결정된 태스크를 Worker에게 전달
  4. Worker는 태스크를 실행하고 결과를 Metadata Database에 저장
  5. Web ServerMetadata Database의 정보를 읽어 사용자 인터페이스에 표시

이러한 구조는 각 컴포넌트가 독립적으로 동작하면서도 전체적으로 조화롭게 워크플로를 관리할 수 있게 합니다.


📌 Scheduler (스케줄러)

Scheduler의 역할

Scheduler는 Airflow의 심장부라고 할 수 있으며, 다음과 같은 핵심 역할을 수행합니다:

  1. DAG 파싱: DAG 디렉토리에서 Python 파일을 읽어 DAG 객체를 생성
  2. 스케줄링: 각 DAG의 실행 일정을 확인하고 실행할 시점인 DAG를 트리거
  3. 태스크 인스턴스 생성: DAG Run 내의 태스크를 Task Instance로 변환
  4. 의존성 확인: 태스크 간의 의존성을 확인하여 실행 순서를 결정
  5. 실행 큐 관리: 실행 준비가 된 태스크를 Executor에게 전달
# airflow.cfg 파일에서 Scheduler 관련 주요 설정
[scheduler]
# DAG 디렉토리를 얼마나 자주 스캔할지 설정 (초 단위)
# 값이 작을수록 새 DAG가 빨리 감지되지만 시스템 부하가 증가함
dag_dir_list_interval = 300

# 스케줄러가 한 번의 루프에서 처리할 최대 태스크 수
# 값이 클수록 처리량이 증가하지만 리소스 사용량도 증가함
max_tis_per_query = 512

# 스케줄러가 DAG 파일을 재처리하는 주기 (초 단위)
# 이 값을 통해 DAG 코드 변경이 반영되는 시간을 조절할 수 있음
min_file_process_interval = 30

# 스케줄러 자체의 실행 간격 (초 단위)
# 낮은 값은 더 즉각적인 실행을 제공하지만 DB 부하가 증가함
scheduler_heartbeat_sec = 5

# 동시에 실행할 수 있는 태스크 인스턴스의 최대 수
# 과도한 값은 시스템 리소스를 고갈시킬 수 있음
max_active_tasks_per_dag = 16

Scheduler 동작 원리

Scheduler는 다음과 같은 주기적인 프로세스로 작동합니다:

  1. DAG 파싱 주기:
    • dag_dir_list_interval 설정에 따라 DAG 디렉토리를 스캔
    • 변경된 DAG 파일만 다시 파싱하여 메모리 효율성 유지
    • 파싱된 DAG 정보는 메타데이터 데이터베이스에 저장
  2. 스케줄링 주기:
    • scheduler_heartbeat_sec 설정에 따라 주기적으로 실행
    • 실행 예정 시간이 지난 DAG를 찾아 DAG Run 생성
    • catchup 설정에 따라 과거 실행 건너뛰기 여부 결정
  3. 태스크 스케줄링:
    • 생성된 DAG Run에서 실행 가능한 태스크 식별
    • 의존성을 확인하여 실행 순서 결정
    • 실행 준비가 된 태스크를 Executor에게 전달
  4. 상태 모니터링:
    • 실행 중인 태스크의 상태 주기적 확인
    • 실행 시간 초과 태스크 식별 및 처리
    • 완료된 태스크의 결과 처리 및 의존 태스크 트리거
# 스케줄러 시작 명령어
airflow scheduler

# 백그라운드로 실행하는 경우
nohup airflow scheduler > scheduler.log 2>&1 &

 

Scheduler 설정 최적화 방법

  1. DAG 파싱 성능 최적화:
    • min_file_process_interval을 증가시켜 파싱 빈도 감소
    • 많은 수의 DAG를 사용하는 경우, 관련 DAG들을 별도의 폴더로 구성
    • DAG 파일 내에서 불필요한 임포트 및 계산 최소화
  2. 스케줄링 성능 최적화:
    • max_tis_per_query를 시스템 성능에 맞게 조정
    • catchup=False를 사용하여 불필요한 과거 실행 방지
    • 데이터베이스 커넥션 풀 크기 최적화
  3. 리소스 관리:
    • max_active_tasks_per_dag 설정으로 병렬 실행 수 제한
    • 태스크 우선순위 설정으로 중요 태스크 먼저 실행
    • 충분한 CPU 및 메모리 할당

📌 Web Server (웹 서버)

Web Server의 역할

Web Server는 사용자가 Airflow와 상호작용할 수 있는 인터페이스를 제공하며, 다음과 같은 역할을 수행합니다:

  1. 사용자 인터페이스(UI) 제공: DAG 및 태스크의 상태, 실행 이력, 로그 등을 시각적으로 제공
  2. 인증 및 권한 관리: 사용자 인증 및 권한 부여 관리
  3. REST API 제공: 프로그래밍 방식으로 Airflow를 제어할 수 있는 API 제공
  4. 대시보드 및 모니터링: DAG 및 태스크의 성공/실패 통계, 시각화된 의존성 그래프 제공
# airflow.cfg 파일에서 Web Server 관련 주요 설정
[webserver]
# 웹 서버가 사용할 포트 번호
web_server_port = 8080

# 최대 동시 웹 요청 수
workers = 4

# 웹 서버의 타임아웃 설정 (초 단위)
web_server_timeout = 120

# UI에 표시할 DAG 당 최대 실행 이력 수
dag_run_display_number = 25

# 웹 UI에서 사용할 인증 방식
# - password: 기본 데이터베이스 인증
# - ldap: LDAP 기반 인증
# - oauth: OAuth 기반 인증
authenticate = password

# RBAC(Role-Based Access Control) 사용 여부
rbac = True

 

Web Server 주요 기능

  1. DAG 관리 인터페이스:
    • DAG 목록 및 상세 정보 조회
    • DAG 활성화/비활성화 전환
    • DAG 수동 트리거
    • 태스크 인스턴스 상태 확인 및 관리
  2. 모니터링 및 디버깅:
    • 태스크 로그 조회
    • DAG 그래프 시각화
    • 실행 이력 및 타임라인 뷰
    • 태스크 간 데이터 전달(XCom) 확인
  3. 관리 기능:
    • 변수(Variables) 관리
    • 연결(Connections) 설정
    • 풀(Pools) 및 슬롯(Slots) 관리
    • 사용자 및 역할 관리(RBAC 사용 시)

[Airflow 웹 UI의 주요 화면을 보여주는 이미지. DAG 목록, 그래프 뷰, 실행 이력, 로그 조회 등의 인터페이스]

 

Web Server 보안 설정

Airflow Web Server의 보안을 강화하기 위한 주요 설정:

  1. 인증 설정:
# airflow.cfg의 인증 관련 설정
[webserver]
authenticate = password  # 인증 방식 선택
auth_backend = airflow.api.auth.backend.basic_auth  # 인증 백엔드 지정

# LDAP 인증 사용 시
[ldap]
uri = ldap://ldap.example.com
user_filter = objectClass=*
user_name_attr = uid
superuser_filter = memberOf=CN=airflow-super-users
data_profiler_filter = memberOf=CN=airflow-data-profilers
bind_user = uid=admin,ou=users,dc=example,dc=com
bind_password = admin_password
  1. RBAC(Role-Based Access Control) 설정:
# airflow.cfg에서 RBAC 활성화
[webserver]
rbac = True
  1. HTTPS 설정:
# airflow.cfg에서 SSL 설정
[webserver]
web_server_ssl_cert = /path/to/cert.pem
web_server_ssl_key = /path/to/key.pem
  1. API 인증 설정:
# RESTful API 보안 설정
[api]
auth_backend = airflow.api.auth.backend.deny_all  # 기본적으로 모든 API 접근 거부
# 또는
auth_backend = airflow.api.auth.backend.basic_auth  # 기본 인증 사용

📌 Executor & Worker (실행자 & 작업자)

Executor의 역할과 종류

Executor는 Scheduler로부터 받은 태스크를 실제로 실행하는 방식을 결정하는 구성 요소입니다:

  1. SequentialExecutor:
    • 가장 기본적인 실행자로, 한 번에 하나의 태스크만 실행
    • SQLite와 함께 사용되며, 주로 테스트 및 개발 환경에 적합
    • 병렬 처리가 불가능하여 프로덕션 환경에는 적합하지 않음
# airflow.cfg에서 SequentialExecutor 설정
[core]
executor = SequentialExecutor
  1. LocalExecutor:
    • 로컬 환경에서 병렬로 태스크 실행
    • 프로세스 기반 병렬 처리를 통해 단일 머신에서 여러 태스크 동시 실행
    • 중소 규모 워크로드에 적합하며, MySQL 등의 데이터베이스 필요
# airflow.cfg에서 LocalExecutor 설정
[core]
executor = LocalExecutor
# 최대 병렬 태스크 수 (0은 제한 없음)
parallelism = 32
  1. CeleryExecutor:
    • 분산 환경에서 여러 Worker 노드를 통해 태스크 실행
    • Celery 메시지 큐를 통해 태스크 분배
    • 대규모 워크로드 및 고가용성 요구 환경에 적합
# airflow.cfg에서 CeleryExecutor 설정
[core]
executor = CeleryExecutor

[celery]
# Celery 브로커 URL (RabbitMQ, Redis 등)
broker_url = redis://redis:6379/0
# Celery 결과 백엔드
result_backend = db+postgresql://airflow:airflow@postgres:5432/airflow
# 각 워커당 동시 실행 태스크 수
worker_concurrency = 16
  1. KubernetesExecutor:
    • Kubernetes 클러스터에서 각 태스크를 별도의 Pod으로 실행
    • 자원 격리 및 동적 스케일링이 가능
    • 클라우드 네이티브 환경에 최적화된 실행자
# airflow.cfg에서 KubernetesExecutor 설정
[core]
executor = KubernetesExecutor

[kubernetes]
# Kubernetes 클러스터 연결 설정
in_cluster = True  # 클러스터 내부에서 실행 시
# 또는
kube_config = /path/to/kube_config  # 클러스터 외부에서 실행 시
# 기본 네임스페이스
namespace = airflow
# 기본 이미지
worker_container_repository = apache/airflow
worker_container_tag = 2.3.0
  1. CeleryKubernetesExecutor:
    • CeleryExecutor와 KubernetesExecutor를 함께 사용
    • 태스크 특성에 따라 적절한 Executor를 선택하여 실행
# airflow.cfg에서 CeleryKubernetesExecutor 설정
[core]
executor = CeleryKubernetesExecutor

[celery_kubernetes_executor]
# 태스크 라우팅 설정
kubernetes_queue = kubernetes

 

Worker의 역할과 구성

Worker는 실제로 태스크를 실행하는 프로세스 또는 컨테이너입니다:

  1. Worker 프로세스:
    • LocalExecutor: 부모 프로세스에서 생성된 자식 프로세스
    • CeleryExecutor: Celery Worker 프로세스
    • KubernetesExecutor: Kubernetes Pod
  2. Worker 구성 요소:
    • 태스크 실행 환경 (Python 인터프리터)
    • 필요한 라이브러리 및 의존성
    • 태스크 실행 로직
    • 결과 및 로그 처리 메커니즘
  3. Worker 설정 최적화:
# LocalExecutor의 Worker 설정
[core]
# 태스크당 최대 실행 시간 (초 단위)
task_execution_timeout = 6000

# CeleryExecutor의 Worker 설정
[celery]
# Worker 프로세스당 동시 실행 태스크 수
worker_concurrency = 16
# Worker 자동 종료 전 최대 실행 태스크 수
worker_autoscale = 16,8
# Worker 큐 설정
worker_queues = default,high_priority,low_priority

# KubernetesExecutor의 Worker 설정
[kubernetes]
# 태스크 Pod의 리소스 요청 및 제한
worker_container_repository = apache/airflow
worker_container_tag = 2.3.0
# 기본 CPU 요청
worker_request_cpu = 200m
# 기본 메모리 요청
worker_request_memory = 1Gi
# 기본 CPU 제한
worker_limit_cpu = 1
# 기본 메모리 제한
worker_limit_memory = 2Gi

 

환경에 따른 Executor 선택 가이드

  1. 개발 및 테스트 환경:
    • SequentialExecutor: 가장 간단하게 설정 가능, 단일 태스크 실행에 적합
    • LocalExecutor: 병렬 처리가 필요한 개발 환경에 적합
  2. 소규모 프로덕션 환경:
    • LocalExecutor: 단일 서버에서 중소 규모 워크로드 처리에 적합
    • 설정이 간단하고 추가 서비스(메시지 큐 등) 불필요
  3. 대규모 프로덕션 환경:
    • CeleryExecutor: 수평적 확장이 가능한 분산 환경에 적합
    • 고가용성 및 내결함성 지원
    • 다양한 워크로드 패턴에 유연하게 대응 가능
  4. 클라우드 네이티브 환경:
    • KubernetesExecutor: Kubernetes 클러스터에서 동적 확장이 필요한 경우
    • 태스크별 리소스 격리가 중요한 경우
    • 다양한 컴퓨팅 리소스 요구사항을 가진 워크로드에 적합
  5. 하이브리드 환경:
    • CeleryKubernetesExecutor: 다양한 유형의 태스크를 실행하는 복잡한 환경
    • 일부 태스크는 Celery, 일부는 Kubernetes에서 실행하고자 할 때

Airflow Executor 유형 비교

 


📌 Metadata Database (메타데이터 데이터베이스)

Metadata Database의 역할

Metadata Database는 Airflow의 '두뇌'로, 다음과 같은 정보를 저장하고 관리합니다:

  1. DAG 메타데이터: DAG 정의, 속성, 스케줄 정보
  2. 실행 이력: DAG Run 및 Task Instance의 실행 기록 및 상태
  3. 변수 및 연결 정보: Airflow 변수 및 외부 시스템 연결 정보
  4. 사용자 및 권한: 인증 및 권한 관련 정보(RBAC 사용 시)
  5. XCom 데이터: 태스크 간 데이터 공유를 위한 XCom 값

지원되는 데이터베이스 유형

Airflow는 다양한 관계형 데이터베이스를 메타데이터 저장소로 사용할 수 있습니다:

  1. SQLite:
    • 가장 간단한 설정으로, 파일 기반 데이터베이스
    • SequentialExecutor와만 호환됨
    • 개발 및 테스트 환경에만 권장
# airflow.cfg에서 SQLite 설정
[core]
sql_alchemy_conn = sqlite:////path/to/airflow.db
  1. PostgreSQL:
    • 안정성과 성능이 좋아 프로덕션 환경에 권장됨
    • 모든 Executor 유형과 호환
    • 트랜잭션 및 동시성 지원이 우수
# airflow.cfg에서 PostgreSQL 설정
[core]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
  1. MySQL:
    • 널리 사용되는 데이터베이스로, 프로덕션 환경에 적합
    • 설정이 비교적 간단하고 성능이 우수
    • 버전 8.0 이상 권장
# airflow.cfg에서 MySQL 설정
[core]
sql_alchemy_conn = mysql://airflow:airflow@mysql:3306/airflow
  1. 기타 데이터베이스:
    • MSSQL, Oracle 등 SQLAlchemy가 지원하는 다른 데이터베이스도 사용 가능
    • 특정 환경 요구사항에 따라 선택

데이터베이스 설정 및 최적화

  1. 연결 설정:
# airflow.cfg의 데이터베이스 연결 설정
[core]
# SQLAlchemy 연결 문자열
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow

# 연결 풀 크기
sql_alchemy_pool_size = 5
# 연결 풀 초과 허용치
sql_alchemy_pool_recycle = 1800
# 연결 오버헤드(초)
sql_alchemy_connect_args = {"connect_timeout": 30}
  1. 성능 최적화:
# PostgreSQL 성능 최적화 설정
# postgresql.conf
max_connections = 200
shared_buffers = 2GB
work_mem = 16MB
maintenance_work_mem = 256MB
effective_cache_size = 4GB
  1. 데이터베이스 관리:
# 데이터베이스 초기화
airflow db init

# 데이터베이스 업그레이드 (Airflow 버전 업데이트 후)
airflow db upgrade

# 데이터베이스 재설정 (주의: 모든 데이터 삭제)
airflow db reset

# 데이터베이스 확인
airflow db check
  1. 백업 및 복구:
# PostgreSQL 백업
pg_dump -U airflow -d airflow -f airflow_backup.sql

# PostgreSQL 복구
psql -U airflow -d airflow -f airflow_backup.sql

# MySQL 백업
mysqldump -u airflow -p airflow > airflow_backup.sql

# MySQL 복구
mysql -u airflow -p airflow < airflow_backup.sql

 

데이터베이스 스키마 개요

Airflow 메타데이터 데이터베이스의 주요 테이블:

  1. dag: DAG 메타데이터 저장
  2. dag_run: DAG 실행 이력 저장
  3. task_instance: 태스크 실행 이력 및 상태 저장
  4. job: Scheduler, Webserver 등 Airflow 프로세스 정보 저장
  5. variable: Airflow 변수 저장
  6. connection: 외부 시스템 연결 정보 저장
  7. xcom: 태스크 간 데이터 공유를 위한 XCom 값 저장
  8. sla_miss: SLA 미달성 이벤트 저장
  9. log: 로그 정보 저장 (로그 데이터베이스 사용 시)
-- 주요 데이터베이스 쿼리 예시

-- 가장 최근 실행된 DAG 목록 조회
SELECT dag_id, execution_date, state 
FROM dag_run 
ORDER BY execution_date DESC 
LIMIT 10;

-- 실패한 태스크 인스턴스 조회
SELECT dag_id, task_id, execution_date 
FROM task_instance 
WHERE state = 'failed' 
ORDER BY execution_date DESC;

-- 특정 DAG의 실행 시간 통계
SELECT 
    dag_id, 
    AVG(EXTRACT(EPOCH FROM (end_date - start_date))) as avg_duration,
    MAX(EXTRACT(EPOCH FROM (end_date - start_date))) as max_duration
FROM task_instance
WHERE dag_id = 'example_dag'
GROUP BY dag_id;

📌 DAG 저장소 (DAG Storage)

DAG 저장소의 역할

DAG 저장소는 모든 DAG 파일이 저장되는 위치로, 다음과 같은 역할을 합니다:

  1. DAG 코드 저장: Python으로 작성된 DAG 정의 파일 저장
  2. 관련 파일 저장: DAG에서 참조하는 Python 모듈, 설정 파일, 템플릿 등 저장
  3. 스케줄러 접근 제공: Scheduler가 주기적으로 DAG를 파싱하기 위한 접근 제공

DAG 저장소 설정

  1. 기본 설정:
# airflow.cfg에서 DAG 디렉토리 설정
[core]
# DAG 파일이 저장될 디렉토리 경로
dags_folder = /path/to/airflow/dags

# DAG 파일 감지 설정
# Python 파일이 아닌 파일을 무시할지 여부
ignore_dags_by_default = True

# DAG 검색 시 서브디렉토리까지 검색할지 여부
dags_folder_list_interval = 300
  1. 다양한 저장소 유형:
    # 기본 로컬 파일 시스템 설정
    dags_folder = /path/to/airflow/dags
    
    b. NFS나 공유 파일 시스템:c. Git 동기화:d. Kubernetes ConfigMap/Git-Sync: Kubernetes 환경에서는 ConfigMap이나 Git-Sync 사이드카를 통해 DAG 관리
  2. # Git 기반 DAG 동기화 설정 (외부 도구 사용) dags_folder = /path/to/airflow/dags # 외부 Git 동기화 스크립트 설정 필요
  3. # 네트워크 공유 파일 시스템 설정 dags_folder = /mnt/nfs/airflow/dags
  4. a. 로컬 파일 시스템:
  5. DAG 파일 감지 설정:
# DAG 파일 인식 패턴 설정
[core]
# DAG 파일 패턴 (기본값은 *.py)
dag_file_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
dag_discovery_safe_mode = True

 

DAG 코드 관리 모범 사례

  1. 버전 관리 시스템 사용:
    • Git을 사용하여 DAG 코드 변경 사항 추적
    • 브랜치 전략 수립 (예: dev, staging, production)
    • 코드 리뷰 및 PR(Pull Request) 프로세스 도입
# DAG 저장소 버전 관리 예시
git init /path/to/airflow/dags
cd /path/to/airflow/dags
git remote add origin https://github.com/your-org/airflow-dags.git

# 개발 환경에서 테스트 후 프로덕션에 배포
git checkout -b feature/new-data-pipeline
# DAG 파일 개발 및 수정
git add new_dag.py
git commit -m "Add new data pipeline DAG"
git push origin feature/new-data-pipeline
# PR 생성 및 코드 리뷰 후 메인 브랜치에 병합
  1. 디렉토리 구조화:
dags/
├── data_pipelines/          # 데이터 파이프라인 관련 DAG
│   ├── sales_pipeline.py
│   └── marketing_pipeline.py
├── maintenance/             # 유지보수 관련 DAG
│   ├── cleanup_logs.py
│   └── system_health_check.py
├── utils/                   # 공통 유틸리티 모듈
│   ├── slack_alerts.py
│   └── db_helpers.py
├── include/                 # SQL 쿼리, 구성 파일 등 포함 파일
│   ├── sql/
│   └── config/
└── tests/                   # DAG 테스트 코드
    ├── test_sales_pipeline.py
    └── test_marketing_pipeline.py
  1. 변경 관리 및 배포:
    • CI/CD 파이프라인 구축하여 자동 테스트 및 배포
    • 변경 로그 유지
    • 롤백 절차 마련
  2. 다중 환경 관리:
    • 환경별 변수 분리
    • 환경별 설정 파일 분리
    • 환경별 태그 사용
# 환경별 설정 분리 예시
def create_dag(env):
    if env == 'dev':
        schedule = None  # 개발 환경에서는 수동 트리거만
        db_conn_id = 'dev_db'
    elif env == 'staging':
        schedule = '@daily'  # 스테이징에서 일일 실행
        db_conn_id = 'staging_db'
    else:  # 프로덕션
        schedule = '0 */3 * * *'  # 3시간마다 실행
        db_conn_id = 'prod_db'
    
    dag = DAG(
        f'data_pipeline_{env}',
        schedule_interval=schedule,
        # 기타 설정...
    )
    # DAG 태스크 정의...
    return dag

# 환경별 DAG 생성
for env in ['dev', 'staging', 'prod']:
    globals()[f'data_pipeline_{env}'] = create_dag(env)

📌 Airflow 구성요소 간 통신

컴포넌트 통신 흐름

Airflow 구성요소 간의 주요 통신 흐름:

  1. Scheduler ↔ Metadata DB:
    • DAG 및 태스크 메타데이터 읽기/쓰기
    • 실행 상태 업데이트
    • 스케줄링 정보 관리
  2. Scheduler ↔ Executor:
    • 실행할 태스크 전달
    • 태스크 상태 및 결과 수신
  3. Executor ↔ Worker:
    • (CeleryExecutor의 경우) 메시지 큐를 통한 태스크 분배
    • (KubernetesExecutor의 경우) Kubernetes API를 통한 Pod 생성 및 관리
  4. Worker ↔ Metadata DB:
    • 태스크 실행 상태 업데이트
    • XCom을 통한 태스크 간 데이터 공유
  5. Web Server ↔ Metadata DB:
    • UI 표시를 위한 DAG 및 태스크 정보 조회
    • 사용자 작업(DAG 트리거 등) 처리
  6. Web Server ↔ DAG Storage:
    • DAG 코드 조회 및 표시
# 컴포넌트 간 통신 설정 예시

# Metadata DB 연결 설정
[core]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow

# CeleryExecutor 사용 시 메시지 큐 설정
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://airflow:airflow@postgres:5432/airflow

# KubernetesExecutor 사용 시 Kubernetes API 설정
[kubernetes]
in_cluster = True
# 또는
kube_config = /path/to/kube_config

통신 보안 설정

  1. 데이터베이스 연결 보안:
# DB 연결 암호화 설정
[core]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow?sslmode=require
  1. Celery 메시지 큐 보안:
# Celery 브로커 보안 설정
[celery]
broker_url = redis://:password@redis:6379/0
# 또는
broker_url = amqp://user:password@rabbitmq:5672/airflow
  1. API 보안:
# API 인증 설정
[api]
auth_backend = airflow.api.auth.backend.basic_auth
  1. 네트워크 수준 보안:
    • 컴포넌트 간 통신을 위한 전용 내부 네트워크 구성
    • 방화벽 규칙을 통한 접근 제한
    • VPC(Virtual Private Cloud) 또는 네트워크 세그먼트 분리

📌 Airflow 설치 구성 옵션

단일 노드 설치

모든 구성요소가 하나의 머신에서 실행되는 가장 간단한 설치 방식:

  1. 장점:
    • 설정이 간단함
    • 리소스 요구사항이 적음
    • 개발 및 테스트 환경에 적합
  2. 단점:
    • 확장성 제한
    • 단일 장애점 존재
    • 리소스 경합 가능성
  3. 설치 예시:
# pip를 통한 설치
pip install apache-airflow

# 초기화
airflow db init

# 사용자 생성
airflow users create \
    --username admin \
    --password admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com

# 서비스 시작
airflow webserver --port 8080
airflow scheduler

다중 노드 설치

구성요소가 여러 머신에 분산되어 실행되는 고급 설치 방식:

  1. 장점:
    • 높은 확장성
    • 컴포넌트별 리소스 최적화
    • 내결함성 및 고가용성 지원
  2. 단점:
    • 설정 복잡성 증가
    • 추가 인프라 요구사항
    • 관리 오버헤드 증가
  3. 설치 예시:
# 모든 노드에 Airflow 설치
pip install apache-airflow[celery,redis,postgres]

# Metadata DB 노드 설정
# PostgreSQL 설정 및 데이터베이스 생성

# Web Server 노드 설정
airflow webserver --port 8080

# Scheduler 노드 설정
airflow scheduler

# Worker 노드 설정 (CeleryExecutor 사용 시)
airflow celery worker

컨테이너 기반 설치

Docker 및 Kubernetes를 활용한 컨테이너화된 설치 방식:

  1. Docker Compose:
    • 개발 및 중소 규모 환경에 적합
    • 단일 호스트에서 여러 컨테이너로 구성요소 분리
# docker-compose.yaml 예시
version: '3'
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
  
  redis:
    image: redis:latest
  
  webserver:
    image: apache/airflow:2.3.0
    depends_on:
      - postgres
      - redis
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
    command: webserver
    ports:
      - "8080:8080"
  
  scheduler:
    image: apache/airflow:2.3.0
    depends_on:
      - postgres
      - redis
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
    command: scheduler
    volumes:
      - ./dags:/opt/airflow/dags
  
  worker:
    image: apache/airflow:2.3.0
    depends_on:
      - postgres
      - redis
    environment:
      AIRFLOW__CORE__EXECUTOR: CeleryExecutor
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
      AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
    command: celery worker
    volumes:
      - ./dags:/opt/airflow/dags

volumes:
  postgres-db-volume:
  1. Kubernetes:
    • 대규모 프로덕션 환경에 적합
    • 자동 스케일링 및 고가용성 지원
    • Helm 차트를 통한 설치 관리
# Helm을 사용한 Kubernetes 설치
helm repo add apache-airflow https://airflow.apache.org
helm install airflow apache-airflow/airflow \
  --set executor=CeleryExecutor \
  --set webserver.replicas=2 \
  --set worker.replicas=3

 

클라우드 관리형 서비스

클라우드 제공업체가 관리하는 Airflow 서비스:

  1. Amazon MWAA (Managed Workflows for Apache Airflow):
    • AWS에서 제공하는 관리형 Airflow 서비스
    • 인프라 관리 오버헤드 감소
    • AWS 서비스와의 통합 지원
  2. Google Cloud Composer:
    • GCP에서 제공하는 관리형 Airflow 서비스
    • GCP 서비스와의 통합 지원
    • 자동 스케일링 및 업그레이드 지원
  3. Microsoft Azure Data Factory:
    • Airflow는 아니지만 유사한 워크플로 관리 서비스
    • Azure 서비스와의 통합 지원
  4. Astronomer:
    • Airflow에 특화된 독립 관리형 서비스
    • 기업용 기능 및 지원 제공

Summary

  • Airflow 아키텍처는 Web Server, Scheduler, Worker, Metadata Database, DAG Storage로 구성된 분산 시스템입니다.
  • Scheduler는 DAG 파싱, 스케줄링, 의존성 확인, 실행 큐 관리 등 Airflow의 핵심 역할을 담당합니다.
  • Web Server는 사용자 인터페이스, 인증 및 권한 관리, REST API, 모니터링 기능을 제공합니다.
  • Executor는 SequentialExecutor, LocalExecutor, CeleryExecutor, KubernetesExecutor 등 다양한 유형이 있으며, 워크로드 특성에 맞게 선택해야 합니다.
  • Metadata Database는 PostgreSQL, MySQL 등의 관계형 데이터베이스를 사용하여 DAG, 실행 이력, 변수, 연결 정보 등을 저장합니다.
  • DAG Storage는 Python으로 작성된 DAG 파일과 관련 리소스를 저장하며, 버전 관리 시스템과 연동하여 효율적으로 관리할 수 있습니다.
  • Airflow 설치 옵션으로는 단일 노드 설치, 다중 노드 설치, 컨테이너 기반 설치, 클라우드 관리형 서비스 등이 있으며, 환경 요구사항에 맞게 선택할 수 있습니다.
  • 각 구성요소를 최적화하여 Airflow의 성능, 안정성, 확장성을 향상시킬 수 있습니다.

 

728x90