이 글에서는 Airflow의 핵심 개념인 DAG(Directed Acyclic Graph)에 대해 자세히 알아봅니다. DAG의 기본 구조부터 트리거 방식까지, 실제 코드 예제와 함께 DAG의 모든 것을 살펴보겠습니다. 초보자도 쉽게 이해할 수 있도록 상세히 설명합니다.
📌 DAG의 개념과 중요성
✅ DAG란 무엇인가?
DAG(Directed Acyclic Graph)는 Airflow에서 워크플로를 정의하는 핵심 개념입니다. 쉽게 말해, DAG는 실행해야 할 작업들과 그 작업들 간의 의존성을 표현하는 방향성 있는 비순환 그래프입니다.
- 방향성(Directed): 각 작업(Task)은 특정 방향으로 연결됩니다. A → B는 "A 작업이 완료된 후 B 작업이 실행된다"는 의미입니다.
- 비순환(Acyclic): 그래프 내에 순환 경로가 없습니다. 즉, A → B → C → A와 같은 구조가 허용되지 않습니다.
- 그래프(Graph): 노드(작업)와 엣지(의존성)로 구성된 구조입니다.
▶️ 실무 비유: DAG는 요리 레시피와 비슷합니다. 재료 준비, 손질, 조리 등의 순서가 정해져 있고, 특정 단계는 이전 단계가 완료되어야만 진행할 수 있습니다. 예를 들어, 양파를 볶기 전에 먼저 양파를 준비하고 썰어야 합니다.
✅ DAG가 중요한 이유
- 명확한 워크플로 정의: 복잡한 데이터 파이프라인도 명확하게 정의할 수 있습니다.
- 의존성 관리: 작업 간의 의존성을 명시적으로 정의하여 올바른 순서로 실행됩니다.
- 재사용성: 잘 설계된 DAG는 다양한 상황에서 재사용할 수 있습니다.
- 모니터링 및 디버깅: DAG 구조를 통해 워크플로의 실행 상태를 시각적으로 확인할 수 있습니다.
- 확장성: 새로운 작업을 추가하거나 기존 작업을 수정하기 쉽습니다.
📌 DAG 구성 요소와 구조
✅ DAG의 핵심 구성 요소
- DAG 객체: 전체 워크플로를 정의하는 객체입니다.
- Task(Operator): DAG 내에서 실행할 개별 작업을 정의합니다.
- Task Dependencies: 작업 간의 의존성을 정의합니다.
# 기본적인 DAG 정의 예시
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# DAG의 기본 인자 설정
default_args = {
'owner': 'data_engineer', # DAG 소유자/관리자 - 책임자 지정
'depends_on_past': False, # 이전 실행 결과에 의존하지 않음 - True로 설정 시 이전 DAG Run이 성공해야만 실행됨
'start_date': datetime(2023, 3, 1), # DAG 시작 날짜 - 처음 스케줄링되는 시간
'email': ['alert@example.com'], # 알림 이메일 - 실패 시 알림을 받을 이메일 주소
'email_on_failure': True, # 실패 시 이메일 알림 - 작업 실패 시 이메일 발송 여부
'email_on_retry': False, # 재시도 시 이메일 알림 안 함 - 작업 재시도 시 이메일 발송 여부
'retries': 1, # 실패 시 재시도 횟수 - 작업 실패 시 재시도할 최대 횟수
'retry_delay': timedelta(minutes=5),# 재시도 간격 - 재시도 사이의 대기 시간
}
# DAG 정의
dag = DAG(
'example_data_pipeline', # DAG ID (고유 식별자) - Airflow 내에서 유일해야 함
default_args=default_args, # 기본 인자 - 위에서 정의한 default_args 적용
description='데이터 파이프라인 예시', # 설명 - UI에 표시되는 DAG 설명
schedule_interval='0 0 * * *', # 실행 주기 (매일 자정) - cron 표현식으로 스케줄 지정
catchup=False, # 과거 실행 건너뛰기 - False면 start_date부터 현재까지의 누락된 실행을 따라잡지 않음
tags=['example', 'tutorial'], # 태그 (UI에서 필터링 용도) - DAG를 분류하는 라벨
)
✅ DAG 생성 시 주요 파라미터
- dag_id: DAG의 고유 식별자로, 같은 Airflow 인스턴스 내에서 유일해야 합니다.
- default_args: 모든 Task에 적용될 기본 인자입니다.
- description: DAG에 대한 설명으로, UI에 표시됩니다.
- schedule_interval: DAG의 실행 주기를 정의합니다. cron 표현식이나 timedelta 객체, 또는 '@daily'와 같은 사전 정의된 값을 사용할 수 있습니다.
- start_date: DAG이 처음 실행될 날짜입니다.
- catchup: 과거 날짜(start_date부터 현재까지)에 대해 실행을 따라잡을지 여부입니다.
- tags: DAG를 분류하는 태그로, UI에서 필터링에 사용됩니다.
✅ Task(Operator) 종류와 특징
Airflow는 다양한 유형의 Operator를 제공하여 다양한 작업을 수행할 수 있게 합니다:
- BashOperator: Bash 명령어를 실행합니다.
from airflow.operators.bash_operator import BashOperator
bash_task = BashOperator(
task_id='bash_example', # Task ID - 이 DAG 내에서 유일한 태스크 식별자
bash_command='echo "Hello World"', # 실행할 Bash 명령어 - 셸에서 실행될 명령어 문자열
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
- PythonOperator: Python 함수를 실행합니다.
from airflow.operators.python_operator import PythonOperator
def print_hello():
# 간단한 인사 메시지를 반환하는 함수
# 이 함수는 Airflow 워커에 의해 실행됨
return 'Hello from Python!'
python_task = PythonOperator(
task_id='python_example', # Task ID - 이 DAG 내에서 유일한 태스크 식별자
python_callable=print_hello, # 실행할 Python 함수 - 호출될 Python 함수 참조
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
- EmailOperator: 이메일을 발송합니다.
from airflow.operators.email_operator import EmailOperator
email_task = EmailOperator(
task_id='send_email', # Task ID - 이 DAG 내에서 유일한 태스크 식별자
to='recipient@example.com', # 수신자 - 이메일을 받을 주소
subject='Airflow Alert', # 제목 - 이메일 제목
html_content='DAG has completed.', # 내용 - HTML 형식의 이메일 본문
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
- SQLOperator: SQL 쿼리를 실행합니다.
from airflow.operators.mysql_operator import MySqlOperator
sql_task = MySqlOperator(
task_id='create_table', # Task ID - 이 DAG 내에서 유일한 태스크 식별자
mysql_conn_id='mysql_default', # Connection ID - Airflow UI에서 설정된 MySQL 연결 ID
sql='''
CREATE TABLE IF NOT EXISTS example_table (
id INT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''', # 실행할 SQL 쿼리 - 테이블 생성 DDL 구문
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
- HTTPOperator: HTTP 요청을 보냅니다.
from airflow.operators.http_operator import SimpleHttpOperator
http_task = SimpleHttpOperator(
task_id='get_data', # Task ID - 이 DAG 내에서 유일한 태스크 식별자
http_conn_id='http_default', # Connection ID - Airflow UI에서 설정된 HTTP 연결 ID
endpoint='/api/data', # 엔드포인트 - 요청할 API 경로
method='GET', # HTTP 메소드 - 사용할 HTTP 메소드(GET, POST 등)
response_check=lambda response: True if len(response.json()) > 0 else False, # 응답 검증 - 응답이 유효한지 확인하는 함수
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
✅ Task Dependencies (작업 간 의존성)
Airflow에서는 Task 간의 의존성을 정의하는 여러 방법을 제공합니다:
- 비트시프트 연산자 (>>, <<): 가장 많이 사용되는 방식으로, 직관적입니다.
# A가 완료된 후 B 실행, B가 완료된 후 C 실행
# '>>' 연산자는 왼쪽 태스크가 오른쪽 태스크보다 먼저 실행되어야 함을 의미
task_a >> task_b >> task_c
# 위와 동일한 의미
# '<<' 연산자는 오른쪽 태스크가 왼쪽 태스크보다 먼저 실행되어야 함을 의미
task_c << task_b << task_a
- set_upstream, set_downstream 메소드:
# A가 완료된 후 B 실행
# B의 upstream으로 A를 설정 - B는 A에 의존함
task_b.set_upstream(task_a)
# 또는
# A의 downstream으로 B를 설정 - A 이후에 B가 실행됨
task_a.set_downstream(task_b)
- 복잡한 의존성 예시:
# 분기 구조: A 완료 후 B와 C 동시 실행, 그 후 D 실행
# A는 B와 C의 부모 태스크, B와 C는 D의 부모 태스크
task_a >> [task_b, task_c] >> task_d
# 합류 구조: A와 B 완료 후 C 실행
# A와 B는 C의 부모 태스크, C는 A와 B 모두 완료된 후 실행됨
[task_a, task_b] >> task_c
# 복합 구조
# A 완료 후 B와 C 병렬 실행
task_a >> [task_b, task_c]
# B 완료 후 D 실행
task_b >> task_d
# C 완료 후 E 실행
task_c >> task_e
# D와 E 모두 완료 후 F 실행
[task_d, task_e] >> task_f
📌 DAG 파일 구성 및 모범 사례
✅ DAG 파일 위치 및 검색 방식
Airflow는 dags_folder 설정에 지정된 디렉토리에서 DAG 파일을 스캔합니다. 기본적으로는 $AIRFLOW_HOME/dags 디렉토리입니다.
# airflow.cfg 파일의 설정 예시
# dags_folder = /path/to/airflow/dags
# 이 설정은 Airflow가 DAG 파일을 검색할 디렉토리 경로를 지정
# Airflow는 이 폴더와 하위 폴더를 모두 스캔하여 .py 파일에서 DAG 객체를 찾음
dags_folder = /path/to/airflow/dags
Airflow 스케줄러는 주기적으로 이 디렉토리를 스캔하여 새로운 DAG 또는 변경된 DAG를 감지하고 로드합니다. 스캔 주기는 dag_dir_list_interval 설정으로 제어할 수 있습니다.
✅ DAG 파일 구성 모범 사례
- 모듈화: 관련 있는 작업들을 하나의 DAG로 그룹화하고, 관련 없는 작업은 별도의 DAG로 분리합니다.
- 명명 규칙: DAG 및 Task ID는 명확하고 일관된 규칙으로 작성합니다.
# 좋은 예시 - 명확하고 설명적인 이름 사용
dag_id = 'sales_data_processing_daily' # 어떤 데이터를, 어떤 주기로 처리하는지 명확히 표현
task_id = 'extract_sales_data_from_db' # 무슨 작업을 하는지 구체적으로 표현
# 나쁜 예시 - 의미를 알 수 없는 추상적인 이름
dag_id = 'dag1' # 어떤 용도인지 알 수 없음
task_id = 'task1' # 어떤 작업을 하는지 알 수 없음
- 문서화: DAG 및 Task에 충분한 설명을 포함합니다.
dag = DAG(
'data_quality_checks',
default_args=default_args,
description='''
이 DAG는 데이터 품질 검사를 수행합니다.
1. 누락된 데이터 검사
2. 중복 데이터 검사
3. 이상치 데이터 검사
''', # 상세한 DAG 설명 - UI에서 이 설명을 볼 수 있음
# 기타 파라미터...
)
task = PythonOperator(
task_id='check_missing_data',
python_callable=check_missing_data,
doc_md='''
## 누락된 데이터 검사
이 태스크는 데이터셋에 누락된 값이 있는지 검사합니다.
- NULL 값 검사
- 빈 문자열 검사
- 기본값 검사
임계값: 5% 이상 누락되면 실패
''', # 마크다운 형식의 태스크 문서 - UI에서 이 문서를 볼 수 있음
dag=dag,
)
- 매개변수화: 재사용 가능한 DAG를 만들기 위해 매개변수를 사용합니다.
def create_data_processing_dag(
dag_id, # DAG ID - 생성할 DAG의 고유 식별자
schedule_interval, # 실행 주기 - DAG의 스케줄링 간격
source_system, # 소스 시스템 - 데이터를 추출할 시스템
target_system, # 대상 시스템 - 데이터를 로드할 시스템
start_date=datetime(2023, 1, 1), # 시작 날짜 - 기본값 제공
):
"""
데이터 처리 DAG을 생성하는 팩토리 함수
여러 데이터 소스에 대해 비슷한 형태의 DAG을 생성할 때 사용
매개변수를 통해 필요한 설정을 커스터마이징 가능
"""
dag = DAG(
dag_id,
schedule_interval=schedule_interval,
start_date=start_date,
# 기타 파라미터...
)
# 태스크 정의...
return dag
# 여러 DAG 인스턴스 생성
# 같은 함수로 다른 매개변수를 사용하여 여러 DAG 생성
sales_dag = create_data_processing_dag(
'sales_data_processing', # 영업 데이터 처리용 DAG ID
'@daily', # 매일 실행
'sales_db', # 영업 데이터베이스에서 추출
'data_warehouse', # 데이터 웨어하우스에 적재
)
marketing_dag = create_data_processing_dag(
'marketing_data_processing', # 마케팅 데이터 처리용 DAG ID
'@weekly', # 매주 실행
'marketing_api', # 마케팅 API에서 추출
'data_warehouse', # 데이터 웨어하우스에 적재
)
- 태그 활용: 관련 DAG를 그룹화하고 UI에서 쉽게 찾을 수 있도록 태그를 활용합니다.
dag = DAG(
'sales_data_processing',
# 기타 파라미터...
tags=['sales', 'data_processing', 'daily'], # 태그 - UI에서 이 태그로 필터링 가능
)
✅ 효율적인 DAG 설계 팁
- 적절한 작업 분할: 너무 작은 작업으로 나누면 오버헤드가 증가하고, 너무 큰 작업은 재사용성과 유연성이 떨어집니다.
- 멱등성 유지: 같은 입력에 대해 여러 번 실행해도 동일한 결과가 나오도록 작업을 설계합니다.
- 타임아웃 설정: 무한대로 실행되는 작업을 방지하기 위해 타임아웃을 설정합니다.
task = PythonOperator(
task_id='long_running_task',
python_callable=process_data,
execution_timeout=timedelta(hours=2), # 2시간 후 타임아웃 - 이 시간을 초과하면 작업이 실패로 표시됨
dag=dag,
)
- 적절한 재시도 정책: 일시적인 오류에 대응하기 위한 재시도 정책을 설정합니다.
- 자원 관리: 메모리나 CPU를 많이 사용하는 작업은 적절히 분배합니다.
📌 DAG 트리거 방식
✅ DAG 트리거 유형
Airflow에서 DAG는 다음과 같은 방식으로 트리거(실행)될 수 있습니다:
- Schedule Trigger: DAG에 정의된 schedule_interval에 따라 자동으로 실행됩니다.
- Manual Trigger: UI 또는 CLI를 통해 수동으로 실행됩니다.
- API Trigger: REST API를 통해 외부 시스템에서 트리거합니다.
- Sensor Trigger: 특정 조건이 충족될 때 트리거됩니다.
- Dependencies Trigger: 다른 DAG의 완료에 의해 트리거됩니다.
✅ Schedule 기반 트리거
schedule_interval 파라미터를 통해 DAG의 실행 주기를 정의할 수 있습니다:
- Cron 표현식: Unix cron 형식의 표현식을 사용합니다.
# 매일 오전 8시에 실행
# cron 표현식: 분(0-59) 시(0-23) 일(1-31) 월(1-12) 요일(0-6, 0=일요일)
dag = DAG(
'daily_morning_report',
schedule_interval='0 8 * * *', # 매일(* * *) 8시(8) 0분(0)에 실행
# 기타 파라미터...
)
# 주중(월~금) 오후 6시에 실행
dag = DAG(
'weekday_evening_report',
schedule_interval='0 18 * * 1-5', # 월-금요일(1-5) 18시(18) 0분(0)에 실행
# 기타 파라미터...
)
- 사전 정의된 간격: 편의를 위한 사전 정의된 값을 사용할 수 있습니다.
# 매일 자정에 실행
# '@daily'는 '0 0 * * *'과 동일한 cron 표현식
dag = DAG(
'daily_report',
schedule_interval='@daily', # 매일 자정에 실행 - 사전 정의된 매크로
# 기타 파라미터...
)
사전 정의된 간격의 예:
- @once: 한 번만 실행 - 스케줄링 없이 수동으로 트리거하거나 의존성에 의해 실행
- @hourly: 매시간 실행 (0분에) - '0 * * * *'과 동일
- @daily: 매일 실행 (자정에) - '0 0 * * *'과 동일
- @weekly: 매주 실행 (일요일 자정에) - '0 0 * * 0'과 동일
- @monthly: 매월 실행 (매월 1일 자정에) - '0 0 1 * *'과 동일
- @yearly: 매년 실행 (1월 1일 자정에) - '0 0 1 1 *'과 동일
- None: 스케줄 없음 (수동으로만 트리거) - 자동 스케줄링 없음
- timedelta 객체: Python의 timedelta를 사용하여 간격을 정의할 수 있습니다.
from datetime import timedelta
# 12시간마다 실행
# datetime.timedelta를 사용하여 시간 간격 정의
dag = DAG(
'twelve_hour_report',
schedule_interval=timedelta(hours=12), # 12시간 간격으로 실행 - 더 유연한 시간 정의
# 기타 파라미터...
)
✅ Manual Trigger (수동 트리거)
수동으로 DAG를 트리거하는 방법입니다:
- UI에서 트리거:
- Airflow UI에서 DAG 목록 또는 DAG 상세 페이지에서 "Trigger DAG" 버튼을 사용합니다.
- 실행 날짜(execution_date)를 지정하거나 기본값을 사용할 수 있습니다.
- 선택적으로 설정 가능한 파라미터를 전달할 수 있습니다.
- CLI를 통한 트리거:
# 기본 옵션으로 DAG 트리거
# 'example_dag'라는 이름의 DAG를 현재 시간을 기준으로 트리거
airflow dags trigger example_dag
# 특정 실행 날짜로 트리거
# 'example_dag'를 2023년 3월 15일 00:00:00를 execution_date로 설정하여 트리거
airflow dags trigger example_dag --exec-date "2023-03-15T00:00:00"
# 설정 가능한 파라미터 전달
# JSON 형식의 설정을 포함하여 DAG 트리거 - 설정은 DAG 내에서 접근 가능
airflow dags trigger example_dag --conf '{"key":"value"}'
✅ API Trigger
Airflow의 REST API를 사용하여 외부 시스템에서 DAG를 트리거할 수 있습니다:
import requests
import json
from datetime import datetime
# Airflow API 엔드포인트
# Airflow 웹서버의 URL과 API 엔드포인트 경로
url = 'http://airflow-webserver:8080/api/v1/dags/example_dag/dagRuns'
# 인증 토큰 (Airflow 설정에 따라 다름)
# API 요청에 필요한 인증 정보를 헤더에 포함
headers = {
'Content-Type': 'application/json', # JSON 형식 명시
'Authorization': 'Bearer YOUR_TOKEN' # Bearer 토큰 인증 방식
}
# DAG Run 설정
# DAG 실행에 필요한 정보를 JSON 형식으로 구성
data = {
'dag_run_id': f'manual_{datetime.now().isoformat()}', # 고유한 실행 ID 생성
'execution_date': datetime.now().isoformat(), # 실행 날짜 설정
'conf': { # DAG에 전달할 설정 매개변수
'key': 'value', # 문자열 값 전달
'another_key': 42 # 숫자 값 전달
}
}
# API 요청 보내기
# POST 요청으로 DAG 실행 트리거
response = requests.post(url, headers=headers, data=json.dumps(data))
print(response.status_code) # 응답 상태 코드 출력 (201이면 성공)
print(response.json()) # 응답 데이터 출력
✅ Sensor Trigger
Sensor는 특정 조건이 충족될 때까지 대기했다가 조건이 충족되면 다음 작업을 트리거하는 특별한 유형의 Operator입니다:
- FileSensor: 파일이 존재할 때 트리거합니다.
from airflow.sensors.filesystem import FileSensor
file_sensor = FileSensor(
task_id='wait_for_file', # Task ID - 이 태스크의 고유 식별자
filepath='/path/to/file', # 감시할 파일 경로 - 이 파일이 존재하면 성공
poke_interval=60, # 60초마다 확인 - 파일 존재 여부를 확인하는 간격
timeout=60 * 60 * 24, # 최대 24시간 대기 - 이 시간 내에 파일이 나타나지 않으면 실패
mode='poke', # poke 모드 (기본값) - 지속적으로 확인하며 워커를 점유
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
- ExternalTaskSensor: 다른 DAG의 특정 Task가 완료되었을 때 트리거합니다.
from airflow.sensors.external_task import ExternalTaskSensor
external_task_sensor = ExternalTaskSensor(
task_id='wait_for_other_dag', # Task ID - 이 태스크의 고유 식별자
external_dag_id='other_dag', # 감시할 DAG ID - 다른 DAG의 식별자
external_task_id='final_task', # 감시할 Task ID - 다른 DAG의 특정 태스크 식별자
allowed_states=['success'], # 허용할 상태 - 이 상태가 되었을 때 성공으로 처리
execution_delta=timedelta(hours=1), # 1시간 전 실행된 태스크 확인 - 상대 시간으로 찾을 태스크 지정
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
- HttpSensor: HTTP 엔드포인트의 응답이 특정 조건을 만족할 때 트리거합니다.
from airflow.sensors.http_sensor import HttpSensor
http_sensor = HttpSensor(
task_id='wait_for_api', # Task ID - 이 태스크의 고유 식별자
http_conn_id='http_default', # Connection ID - Airflow UI에서 설정된 HTTP 연결 ID
endpoint='/api/ready', # 엔드포인트 - 확인할 API 경로
response_check=lambda response: response.status_code == 200, # 응답 검증 - 성공 조건 정의
poke_interval=60, # 60초마다 확인 - API 호출 간격
timeout=60 * 60, # 최대 1시간 대기 - 이 시간 내에 조건이 충족되지 않으면 실패
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
- SqlSensor: SQL 쿼리의 결과가 조건을 만족할 때 트리거합니다.
from airflow.sensors.sql import SqlSensor
sql_sensor = SqlSensor(
task_id='wait_for_data', # Task ID - 이 태스크의 고유 식별자
conn_id='mysql_default', # Connection ID - Airflow UI에서 설정된 DB 연결 ID
sql='''
SELECT COUNT(*)
FROM my_table
WHERE processing_date = '{{ ds }}'
''', # 실행할 SQL 쿼리 - 조건 확인용 쿼리, {{ ds }}는 실행 날짜
success=lambda cnt: cnt > 0, # 결과가 0보다 크면 성공 - 쿼리 결과 검증 함수
poke_interval=300, # 5분마다 확인 - 쿼리 실행 간격
timeout=60 * 60 * 2, # 최대 2시간 대기 - 이 시간 내에 조건이 충족되지 않으면 실패
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
✅ Cross-DAG 의존성 (DAG 간 의존성)
여러 DAG 간의 의존성을 설정하는 방법입니다:
- ExternalTaskSensor 사용: 앞서 설명한 ExternalTaskSensor를 사용하여 다른 DAG의 특정 태스크가 완료되었을 때 트리거되도록 할 수 있습니다.
- TriggerDagRunOperator 사용: 특정 태스크가 완료된 후 다른 DAG를 트리거합니다.
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_other_dag = TriggerDagRunOperator(
task_id='trigger_other_dag', # Task ID - 이 태스크의 고유 식별자
trigger_dag_id='other_dag', # 트리거할 DAG ID - 실행할 DAG의 식별자
conf={'parent_dag': 'current_dag'}, # 전달할 설정 - 하위 DAG에 전달할 설정 파라미터
wait_for_completion=True, # 완료 대기 여부 - True면 하위 DAG가 완료될 때까지 대기
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
- 데이터 공유를 통한 간접적인 의존성: XCom(Cross-Communication)을 사용하여 DAG 간에 데이터를 공유할 수 있습니다.
# DAG A에서 데이터 푸시
def push_data(**context):
# XCom을 사용하여 다른 태스크나 DAG에서 사용할 데이터 저장
data = {'value': 42, 'message': 'Hello from DAG A'}
# context['ti']는 TaskInstance를 참조
# xcom_push로 키-값 쌍 형태로 데이터 저장
context['ti'].xcom_push(key='shared_data', value=data)
task_push = PythonOperator(
task_id='push_data', # Task ID - 이 태스크의 고유 식별자
python_callable=push_data, # 실행할 Python 함수 - XCom에 데이터를 푸시하는 함수
provide_context=True, # 컨텍스트 제공 - 태스크에 컨텍스트(ti 포함) 전달
dag=dag_a, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
# DAG B에서 데이터 풀
def pull_data(**context):
# 다른 DAG(dag_a)의 태스크(push_data)에서 저장한 데이터를 가져옴
data = context['ti'].xcom_pull(
dag_id='dag_a', # 데이터 출처 DAG ID - 데이터가 저장된 DAG
task_ids='push_data', # 데이터 출처 Task ID - 데이터를 저장한 태스크
key='shared_data' # 데이터 키 - 저장 시 사용한 동일한 키
)
print(f"Received data: {data}") # 받은 데이터 출력
task_pull = PythonOperator(
task_id='pull_data', # Task ID - 이 태스크의 고유 식별자
python_callable=pull_data, # 실행할 Python 함수 - XCom에서 데이터를 가져오는 함수
provide_context=True, # 컨텍스트 제공 - 태스크에 컨텍스트(ti 포함) 전달
dag=dag_b, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
📌 DAG 실행 흐름 제어
✅ 조건부 실행과 브랜칭
워크플로 내에서 조건에 따라 다른 경로로 분기하는 방법입니다:
- BranchPythonOperator 사용: 특정 조건에 따라 다음에 실행할 태스크를 동적으로 결정합니다.
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
def branch_func(**context):
# 예: 요일에 따라 다른 경로 선택
# 현재 요일을 확인하여 주말/주중 태스크 중 어떤 것을 실행할지 결정
weekday = datetime.now().strftime('%A')
if weekday in ['Saturday', 'Sunday']:
return 'weekend_task' # 주말이면 weekend_task 실행
else:
return 'weekday_task' # 주중이면 weekday_task 실행
branch_task = BranchPythonOperator(
task_id='branch_task', # Task ID - 이 태스크의 고유 식별자
python_callable=branch_func, # 브랜칭 결정 함수 - 다음 실행 태스크 ID를 반환하는 함수
provide_context=True, # 컨텍스트 제공 - 태스크에 컨텍스트 전달
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
weekday_task = DummyOperator(
task_id='weekday_task', # Task ID - 주중에 실행할 태스크 식별자
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
weekend_task = DummyOperator(
task_id='weekend_task', # Task ID - 주말에 실행할 태스크 식별자
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
# 의존성 설정
# branch_task에서 weekday_task 또는 weekend_task 중 하나만 실행
branch_task >> [weekday_task, weekend_task]
- ShortCircuitOperator 사용: 조건이 False면 이후 태스크를 모두 건너뜁니다.
from airflow.operators.python_operator import ShortCircuitOperator
def check_condition(**context):
# 예: 특정 조건 확인
# 데이터 가용성을 확인하는 함수
data_available = check_data_availability()
return data_available # True면 계속 진행, False면 중단
check_data = ShortCircuitOperator(
task_id='check_data', # Task ID - 이 태스크의 고유 식별자
python_callable=check_condition, # 조건 확인 함수 - 진행 여부를 반환하는 함수
provide_context=True, # 컨텍스트 제공 - 태스크에 컨텍스트 전달
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
process_data = DummyOperator(
task_id='process_data', # Task ID - 데이터 처리 태스크 식별자
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
send_notification = DummyOperator(
task_id='send_notification', # Task ID - 알림 발송 태스크 식별자
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
# 의존성 설정
# check_data가 True를 반환하면 process_data 실행, False면 모두 건너뜀
check_data >> process_data >> send_notification
✅ 동적 태스크 생성
실행 시점에 동적으로 태스크를 생성하는 방법입니다:
- 파라미터를 통한 동적 태스크 생성:
# 여러 지역에 대해 동일한 처리를 수행하는 태스크 생성
regions = ['us', 'eu', 'asia'] # 처리할 지역 목록
tasks = [] # 생성된 태스크를 저장할 리스트
# 각 지역마다 개별 태스크 생성
for region in regions:
# 지역별 처리 태스크 생성
task = PythonOperator(
task_id=f'process_{region}', # 동적 Task ID - 지역명을 포함한 고유 식별자
python_callable=process_region, # 처리 함수 - 모든 지역에 동일한 함수 사용
op_kwargs={'region': region}, # 함수에 전달할 매개변수 - 지역 정보 전달
dag=dag, # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
tasks.append(task) # 생성된 태스크를 리스트에 추가
# 시작과 종료 더미 태스크 생성
start_task = DummyOperator(
task_id='start', # 시작 태스크 ID
dag=dag, # 속한 DAG
)
end_task = DummyOperator(
task_id='end', # 종료 태스크 ID
dag=dag, # 속한 DAG
)
# 실행 순서 설정: 시작 -> 모든 지역 처리(병렬) -> 종료
start_task >> tasks >> end_task
- TaskGroups를 사용한 그룹화: 관련 태스크를 그룹으로 묶어 UI에서 더 깔끔하게 볼 수 있습니다.
from airflow.utils.task_group import TaskGroup
# 각 지역에 대한 데이터 처리 워크플로 그룹 생성
regions = ['us', 'eu', 'asia']
with DAG('regional_processing', ...) as dag:
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
# 각 지역별로 TaskGroup 생성
for region in regions:
# 지역명으로 태스크 그룹 생성 - UI에서 확장/축소 가능한 그룹으로 표시됨
with TaskGroup(group_id=f'process_{region}') as region_group:
# 해당 지역 내에서의 작업 단계 정의
extract = PythonOperator(
task_id='extract', # 그룹 내 태스크 ID - 실제 ID는 'process_{region}.extract'
python_callable=extract_data,
op_kwargs={'region': region},
)
transform = PythonOperator(
task_id='transform', # 그룹 내 태스크 ID
python_callable=transform_data,
op_kwargs={'region': region},
)
load = PythonOperator(
task_id='load', # 그룹 내 태스크 ID
python_callable=load_data,
op_kwargs={'region': region},
)
# 그룹 내 태스크 간 의존성 설정
extract >> transform >> load
# 메인 워크플로 의존성: 시작 -> 지역별 처리 -> 종료
start >> region_group >> end
Summary
- **DAG(Directed Acyclic Graph)**는 Airflow의 핵심 개념으로, 작업 흐름을 방향성 있는 비순환 그래프로 표현합니다.
- DAG는 명확한 워크플로 정의, 의존성 관리, 재사용성, 모니터링 및 디버깅, 확장성을 제공합니다.
- DAG의 핵심 구성 요소는 DAG 객체, Task(Operator), Task Dependencies입니다.
- Airflow는 BashOperator, PythonOperator, EmailOperator 등 다양한 Operator를 제공합니다.
- Task 간 의존성은 비트시프트 연산자(>>, <<), set_upstream/set_downstream 메소드로 정의합니다.
- DAG 설계 시 모듈화, 명명 규칙, 문서화, 매개변수화, 태그 활용이 중요합니다.
- DAG는 Schedule, Manual, API, Sensor, Dependencies 등 다양한 방식으로 트리거됩니다.
- BranchPythonOperator와 ShortCircuitOperator를 사용해 조건부 실행과 분기 처리가 가능합니다.
- 동적 태스크 생성과 TaskGroups를 활용해 복잡한 워크플로를 효율적으로 관리할 수 있습니다.
- XCom을 통해 DAG 간 데이터를 공유하고, ExternalTaskSensor와 TriggerDagRunOperator로 DAG 간 의존성을 구현할 수 있습니다.