Data Engineering/Airflow

[Airflow 가이드 ep.2] 1부 개념과 설정 #2 | DAG란 무엇인가? DAG 구조와 트리거 방식 이해

ygtoken 2025. 3. 25. 15:14
728x90

이 글에서는 Airflow의 핵심 개념인 DAG(Directed Acyclic Graph)에 대해 자세히 알아봅니다. DAG의 기본 구조부터 트리거 방식까지, 실제 코드 예제와 함께 DAG의 모든 것을 살펴보겠습니다. 초보자도 쉽게 이해할 수 있도록 상세히 설명합니다.


📌 DAG의 개념과 중요성

DAG란 무엇인가?

DAG(Directed Acyclic Graph)는 Airflow에서 워크플로를 정의하는 핵심 개념입니다. 쉽게 말해, DAG는 실행해야 할 작업들과 그 작업들 간의 의존성을 표현하는 방향성 있는 비순환 그래프입니다.

  • 방향성(Directed): 각 작업(Task)은 특정 방향으로 연결됩니다. A → B는 "A 작업이 완료된 후 B 작업이 실행된다"는 의미입니다.
  • 비순환(Acyclic): 그래프 내에 순환 경로가 없습니다. 즉, A → B → C → A와 같은 구조가 허용되지 않습니다.
  • 그래프(Graph): 노드(작업)와 엣지(의존성)로 구성된 구조입니다.

▶️ 실무 비유: DAG는 요리 레시피와 비슷합니다. 재료 준비, 손질, 조리 등의 순서가 정해져 있고, 특정 단계는 이전 단계가 완료되어야만 진행할 수 있습니다. 예를 들어, 양파를 볶기 전에 먼저 양파를 준비하고 썰어야 합니다.

 

DAG가 중요한 이유

  1. 명확한 워크플로 정의: 복잡한 데이터 파이프라인도 명확하게 정의할 수 있습니다.
  2. 의존성 관리: 작업 간의 의존성을 명시적으로 정의하여 올바른 순서로 실행됩니다.
  3. 재사용성: 잘 설계된 DAG는 다양한 상황에서 재사용할 수 있습니다.
  4. 모니터링 및 디버깅: DAG 구조를 통해 워크플로의 실행 상태를 시각적으로 확인할 수 있습니다.
  5. 확장성: 새로운 작업을 추가하거나 기존 작업을 수정하기 쉽습니다.

 

DAG 기본 구조


📌 DAG 구성 요소와 구조

DAG의 핵심 구성 요소

  1. DAG 객체: 전체 워크플로를 정의하는 객체입니다.
  2. Task(Operator): DAG 내에서 실행할 개별 작업을 정의합니다.
  3. Task Dependencies: 작업 간의 의존성을 정의합니다.
# 기본적인 DAG 정의 예시
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

# DAG의 기본 인자 설정
default_args = {
    'owner': 'data_engineer',           # DAG 소유자/관리자 - 책임자 지정
    'depends_on_past': False,           # 이전 실행 결과에 의존하지 않음 - True로 설정 시 이전 DAG Run이 성공해야만 실행됨
    'start_date': datetime(2023, 3, 1), # DAG 시작 날짜 - 처음 스케줄링되는 시간
    'email': ['alert@example.com'],     # 알림 이메일 - 실패 시 알림을 받을 이메일 주소
    'email_on_failure': True,           # 실패 시 이메일 알림 - 작업 실패 시 이메일 발송 여부
    'email_on_retry': False,            # 재시도 시 이메일 알림 안 함 - 작업 재시도 시 이메일 발송 여부
    'retries': 1,                       # 실패 시 재시도 횟수 - 작업 실패 시 재시도할 최대 횟수
    'retry_delay': timedelta(minutes=5),# 재시도 간격 - 재시도 사이의 대기 시간
}

# DAG 정의
dag = DAG(
    'example_data_pipeline',            # DAG ID (고유 식별자) - Airflow 내에서 유일해야 함
    default_args=default_args,          # 기본 인자 - 위에서 정의한 default_args 적용
    description='데이터 파이프라인 예시',   # 설명 - UI에 표시되는 DAG 설명
    schedule_interval='0 0 * * *',      # 실행 주기 (매일 자정) - cron 표현식으로 스케줄 지정
    catchup=False,                      # 과거 실행 건너뛰기 - False면 start_date부터 현재까지의 누락된 실행을 따라잡지 않음
    tags=['example', 'tutorial'],       # 태그 (UI에서 필터링 용도) - DAG를 분류하는 라벨
)

 

DAG 생성 시 주요 파라미터

  1. dag_id: DAG의 고유 식별자로, 같은 Airflow 인스턴스 내에서 유일해야 합니다.
  2. default_args: 모든 Task에 적용될 기본 인자입니다.
  3. description: DAG에 대한 설명으로, UI에 표시됩니다.
  4. schedule_interval: DAG의 실행 주기를 정의합니다. cron 표현식이나 timedelta 객체, 또는 '@daily'와 같은 사전 정의된 값을 사용할 수 있습니다.
  5. start_date: DAG이 처음 실행될 날짜입니다.
  6. catchup: 과거 날짜(start_date부터 현재까지)에 대해 실행을 따라잡을지 여부입니다.
  7. tags: DAG를 분류하는 태그로, UI에서 필터링에 사용됩니다.

Task(Operator) 종류와 특징

Airflow는 다양한 유형의 Operator를 제공하여 다양한 작업을 수행할 수 있게 합니다:

  1. BashOperator: Bash 명령어를 실행합니다.
from airflow.operators.bash_operator import BashOperator

bash_task = BashOperator(
    task_id='bash_example',            # Task ID - 이 DAG 내에서 유일한 태스크 식별자
    bash_command='echo "Hello World"', # 실행할 Bash 명령어 - 셸에서 실행될 명령어 문자열
    dag=dag,                           # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
  1. PythonOperator: Python 함수를 실행합니다.
from airflow.operators.python_operator import PythonOperator

def print_hello():
    # 간단한 인사 메시지를 반환하는 함수
    # 이 함수는 Airflow 워커에 의해 실행됨
    return 'Hello from Python!'

python_task = PythonOperator(
    task_id='python_example',         # Task ID - 이 DAG 내에서 유일한 태스크 식별자
    python_callable=print_hello,      # 실행할 Python 함수 - 호출될 Python 함수 참조
    dag=dag,                          # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
  1. EmailOperator: 이메일을 발송합니다.
from airflow.operators.email_operator import EmailOperator

email_task = EmailOperator(
    task_id='send_email',                 # Task ID - 이 DAG 내에서 유일한 태스크 식별자
    to='recipient@example.com',           # 수신자 - 이메일을 받을 주소
    subject='Airflow Alert',              # 제목 - 이메일 제목
    html_content='DAG has completed.',    # 내용 - HTML 형식의 이메일 본문
    dag=dag,                              # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
  1. SQLOperator: SQL 쿼리를 실행합니다.
from airflow.operators.mysql_operator import MySqlOperator

sql_task = MySqlOperator(
    task_id='create_table',                # Task ID - 이 DAG 내에서 유일한 태스크 식별자
    mysql_conn_id='mysql_default',         # Connection ID - Airflow UI에서 설정된 MySQL 연결 ID
    sql='''
    CREATE TABLE IF NOT EXISTS example_table (
        id INT PRIMARY KEY,
        name VARCHAR(255) NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    ''',                                   # 실행할 SQL 쿼리 - 테이블 생성 DDL 구문
    dag=dag,                               # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
  1. HTTPOperator: HTTP 요청을 보냅니다.
from airflow.operators.http_operator import SimpleHttpOperator

http_task = SimpleHttpOperator(
    task_id='get_data',                   # Task ID - 이 DAG 내에서 유일한 태스크 식별자
    http_conn_id='http_default',          # Connection ID - Airflow UI에서 설정된 HTTP 연결 ID
    endpoint='/api/data',                 # 엔드포인트 - 요청할 API 경로
    method='GET',                         # HTTP 메소드 - 사용할 HTTP 메소드(GET, POST 등)
    response_check=lambda response: True if len(response.json()) > 0 else False,  # 응답 검증 - 응답이 유효한지 확인하는 함수
    dag=dag,                              # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)

 

Task Dependencies (작업 간 의존성)

Airflow에서는 Task 간의 의존성을 정의하는 여러 방법을 제공합니다:

  1. 비트시프트 연산자 (>>, <<): 가장 많이 사용되는 방식으로, 직관적입니다.
# A가 완료된 후 B 실행, B가 완료된 후 C 실행
# '>>' 연산자는 왼쪽 태스크가 오른쪽 태스크보다 먼저 실행되어야 함을 의미
task_a >> task_b >> task_c

# 위와 동일한 의미
# '<<' 연산자는 오른쪽 태스크가 왼쪽 태스크보다 먼저 실행되어야 함을 의미
task_c << task_b << task_a
  1. set_upstream, set_downstream 메소드:
# A가 완료된 후 B 실행
# B의 upstream으로 A를 설정 - B는 A에 의존함
task_b.set_upstream(task_a)

# 또는
# A의 downstream으로 B를 설정 - A 이후에 B가 실행됨
task_a.set_downstream(task_b)
  1. 복잡한 의존성 예시:
# 분기 구조: A 완료 후 B와 C 동시 실행, 그 후 D 실행
# A는 B와 C의 부모 태스크, B와 C는 D의 부모 태스크
task_a >> [task_b, task_c] >> task_d

# 합류 구조: A와 B 완료 후 C 실행
# A와 B는 C의 부모 태스크, C는 A와 B 모두 완료된 후 실행됨
[task_a, task_b] >> task_c

# 복합 구조
# A 완료 후 B와 C 병렬 실행
task_a >> [task_b, task_c]
# B 완료 후 D 실행
task_b >> task_d
# C 완료 후 E 실행
task_c >> task_e
# D와 E 모두 완료 후 F 실행
[task_d, task_e] >> task_f

 

복잡한 DAG 의존성 구조


📌 DAG 파일 구성 및 모범 사례

DAG 파일 위치 및 검색 방식

Airflow는 dags_folder 설정에 지정된 디렉토리에서 DAG 파일을 스캔합니다. 기본적으로는 $AIRFLOW_HOME/dags 디렉토리입니다.

# airflow.cfg 파일의 설정 예시
# dags_folder = /path/to/airflow/dags
# 이 설정은 Airflow가 DAG 파일을 검색할 디렉토리 경로를 지정
# Airflow는 이 폴더와 하위 폴더를 모두 스캔하여 .py 파일에서 DAG 객체를 찾음
dags_folder = /path/to/airflow/dags

Airflow 스케줄러는 주기적으로 이 디렉토리를 스캔하여 새로운 DAG 또는 변경된 DAG를 감지하고 로드합니다. 스캔 주기는 dag_dir_list_interval 설정으로 제어할 수 있습니다.

 

DAG 파일 구성 모범 사례

  1. 모듈화: 관련 있는 작업들을 하나의 DAG로 그룹화하고, 관련 없는 작업은 별도의 DAG로 분리합니다.
  2. 명명 규칙: DAG 및 Task ID는 명확하고 일관된 규칙으로 작성합니다.
# 좋은 예시 - 명확하고 설명적인 이름 사용
dag_id = 'sales_data_processing_daily'  # 어떤 데이터를, 어떤 주기로 처리하는지 명확히 표현
task_id = 'extract_sales_data_from_db'  # 무슨 작업을 하는지 구체적으로 표현

# 나쁜 예시 - 의미를 알 수 없는 추상적인 이름
dag_id = 'dag1'  # 어떤 용도인지 알 수 없음
task_id = 'task1'  # 어떤 작업을 하는지 알 수 없음
  1. 문서화: DAG 및 Task에 충분한 설명을 포함합니다.
dag = DAG(
    'data_quality_checks',
    default_args=default_args,
    description='''
    이 DAG는 데이터 품질 검사를 수행합니다.
    1. 누락된 데이터 검사
    2. 중복 데이터 검사
    3. 이상치 데이터 검사
    ''',  # 상세한 DAG 설명 - UI에서 이 설명을 볼 수 있음
    # 기타 파라미터...
)

task = PythonOperator(
    task_id='check_missing_data',
    python_callable=check_missing_data,
    doc_md='''
    ## 누락된 데이터 검사
    
    이 태스크는 데이터셋에 누락된 값이 있는지 검사합니다.
    - NULL 값 검사
    - 빈 문자열 검사
    - 기본값 검사
    
    임계값: 5% 이상 누락되면 실패
    ''',  # 마크다운 형식의 태스크 문서 - UI에서 이 문서를 볼 수 있음
    dag=dag,
)
  1. 매개변수화: 재사용 가능한 DAG를 만들기 위해 매개변수를 사용합니다.
def create_data_processing_dag(
    dag_id,               # DAG ID - 생성할 DAG의 고유 식별자
    schedule_interval,    # 실행 주기 - DAG의 스케줄링 간격
    source_system,        # 소스 시스템 - 데이터를 추출할 시스템
    target_system,        # 대상 시스템 - 데이터를 로드할 시스템
    start_date=datetime(2023, 1, 1),  # 시작 날짜 - 기본값 제공
):
    """
    데이터 처리 DAG을 생성하는 팩토리 함수
    
    여러 데이터 소스에 대해 비슷한 형태의 DAG을 생성할 때 사용
    매개변수를 통해 필요한 설정을 커스터마이징 가능
    """
    dag = DAG(
        dag_id,
        schedule_interval=schedule_interval,
        start_date=start_date,
        # 기타 파라미터...
    )
    
    # 태스크 정의...
    
    return dag

# 여러 DAG 인스턴스 생성
# 같은 함수로 다른 매개변수를 사용하여 여러 DAG 생성
sales_dag = create_data_processing_dag(
    'sales_data_processing',  # 영업 데이터 처리용 DAG ID
    '@daily',                 # 매일 실행
    'sales_db',               # 영업 데이터베이스에서 추출
    'data_warehouse',         # 데이터 웨어하우스에 적재
)

marketing_dag = create_data_processing_dag(
    'marketing_data_processing',  # 마케팅 데이터 처리용 DAG ID
    '@weekly',                   # 매주 실행
    'marketing_api',             # 마케팅 API에서 추출
    'data_warehouse',            # 데이터 웨어하우스에 적재
)
  1. 태그 활용: 관련 DAG를 그룹화하고 UI에서 쉽게 찾을 수 있도록 태그를 활용합니다.
dag = DAG(
    'sales_data_processing',
    # 기타 파라미터...
    tags=['sales', 'data_processing', 'daily'],  # 태그 - UI에서 이 태그로 필터링 가능
)

 

효율적인 DAG 설계 팁

  1. 적절한 작업 분할: 너무 작은 작업으로 나누면 오버헤드가 증가하고, 너무 큰 작업은 재사용성과 유연성이 떨어집니다.
  2. 멱등성 유지: 같은 입력에 대해 여러 번 실행해도 동일한 결과가 나오도록 작업을 설계합니다.
  3. 타임아웃 설정: 무한대로 실행되는 작업을 방지하기 위해 타임아웃을 설정합니다.
task = PythonOperator(
    task_id='long_running_task',
    python_callable=process_data,
    execution_timeout=timedelta(hours=2),  # 2시간 후 타임아웃 - 이 시간을 초과하면 작업이 실패로 표시됨
    dag=dag,
)
  1. 적절한 재시도 정책: 일시적인 오류에 대응하기 위한 재시도 정책을 설정합니다.
  2. 자원 관리: 메모리나 CPU를 많이 사용하는 작업은 적절히 분배합니다.

📌 DAG 트리거 방식

DAG 트리거 유형

Airflow에서 DAG는 다음과 같은 방식으로 트리거(실행)될 수 있습니다:

  1. Schedule Trigger: DAG에 정의된 schedule_interval에 따라 자동으로 실행됩니다.
  2. Manual Trigger: UI 또는 CLI를 통해 수동으로 실행됩니다.
  3. API Trigger: REST API를 통해 외부 시스템에서 트리거합니다.
  4. Sensor Trigger: 특정 조건이 충족될 때 트리거됩니다.
  5. Dependencies Trigger: 다른 DAG의 완료에 의해 트리거됩니다.

Schedule 기반 트리거

schedule_interval 파라미터를 통해 DAG의 실행 주기를 정의할 수 있습니다:

  1. Cron 표현식: Unix cron 형식의 표현식을 사용합니다.
# 매일 오전 8시에 실행
# cron 표현식: 분(0-59) 시(0-23) 일(1-31) 월(1-12) 요일(0-6, 0=일요일)
dag = DAG(
    'daily_morning_report',
    schedule_interval='0 8 * * *',  # 매일(* * *) 8시(8) 0분(0)에 실행
    # 기타 파라미터...
)

# 주중(월~금) 오후 6시에 실행
dag = DAG(
    'weekday_evening_report',
    schedule_interval='0 18 * * 1-5',  # 월-금요일(1-5) 18시(18) 0분(0)에 실행
    # 기타 파라미터...
)
  1. 사전 정의된 간격: 편의를 위한 사전 정의된 값을 사용할 수 있습니다.
# 매일 자정에 실행
# '@daily'는 '0 0 * * *'과 동일한 cron 표현식
dag = DAG(
    'daily_report',
    schedule_interval='@daily',  # 매일 자정에 실행 - 사전 정의된 매크로
    # 기타 파라미터...
)

사전 정의된 간격의 예:

  • @once: 한 번만 실행 - 스케줄링 없이 수동으로 트리거하거나 의존성에 의해 실행
  • @hourly: 매시간 실행 (0분에) - '0 * * * *'과 동일
  • @daily: 매일 실행 (자정에) - '0 0 * * *'과 동일
  • @weekly: 매주 실행 (일요일 자정에) - '0 0 * * 0'과 동일
  • @monthly: 매월 실행 (매월 1일 자정에) - '0 0 1 * *'과 동일
  • @yearly: 매년 실행 (1월 1일 자정에) - '0 0 1 1 *'과 동일
  • None: 스케줄 없음 (수동으로만 트리거) - 자동 스케줄링 없음
  1. timedelta 객체: Python의 timedelta를 사용하여 간격을 정의할 수 있습니다.
from datetime import timedelta

# 12시간마다 실행
# datetime.timedelta를 사용하여 시간 간격 정의
dag = DAG(
    'twelve_hour_report',
    schedule_interval=timedelta(hours=12),  # 12시간 간격으로 실행 - 더 유연한 시간 정의
    # 기타 파라미터...
)

 

Manual Trigger (수동 트리거)

수동으로 DAG를 트리거하는 방법입니다:

  1. UI에서 트리거:
    • Airflow UI에서 DAG 목록 또는 DAG 상세 페이지에서 "Trigger DAG" 버튼을 사용합니다.
    • 실행 날짜(execution_date)를 지정하거나 기본값을 사용할 수 있습니다.
    • 선택적으로 설정 가능한 파라미터를 전달할 수 있습니다.
  2. CLI를 통한 트리거:
# 기본 옵션으로 DAG 트리거
# 'example_dag'라는 이름의 DAG를 현재 시간을 기준으로 트리거
airflow dags trigger example_dag

# 특정 실행 날짜로 트리거
# 'example_dag'를 2023년 3월 15일 00:00:00를 execution_date로 설정하여 트리거
airflow dags trigger example_dag --exec-date "2023-03-15T00:00:00"

# 설정 가능한 파라미터 전달
# JSON 형식의 설정을 포함하여 DAG 트리거 - 설정은 DAG 내에서 접근 가능
airflow dags trigger example_dag --conf '{"key":"value"}'

 

API Trigger

Airflow의 REST API를 사용하여 외부 시스템에서 DAG를 트리거할 수 있습니다:

import requests
import json
from datetime import datetime

# Airflow API 엔드포인트
# Airflow 웹서버의 URL과 API 엔드포인트 경로
url = 'http://airflow-webserver:8080/api/v1/dags/example_dag/dagRuns'

# 인증 토큰 (Airflow 설정에 따라 다름)
# API 요청에 필요한 인증 정보를 헤더에 포함
headers = {
    'Content-Type': 'application/json',  # JSON 형식 명시
    'Authorization': 'Bearer YOUR_TOKEN'  # Bearer 토큰 인증 방식
}

# DAG Run 설정
# DAG 실행에 필요한 정보를 JSON 형식으로 구성
data = {
    'dag_run_id': f'manual_{datetime.now().isoformat()}',  # 고유한 실행 ID 생성
    'execution_date': datetime.now().isoformat(),  # 실행 날짜 설정
    'conf': {  # DAG에 전달할 설정 매개변수
        'key': 'value',  # 문자열 값 전달
        'another_key': 42  # 숫자 값 전달
    }
}

# API 요청 보내기
# POST 요청으로 DAG 실행 트리거
response = requests.post(url, headers=headers, data=json.dumps(data))
print(response.status_code)  # 응답 상태 코드 출력 (201이면 성공)
print(response.json())  # 응답 데이터 출력

 

Sensor Trigger

Sensor는 특정 조건이 충족될 때까지 대기했다가 조건이 충족되면 다음 작업을 트리거하는 특별한 유형의 Operator입니다:

  1. FileSensor: 파일이 존재할 때 트리거합니다.
from airflow.sensors.filesystem import FileSensor

file_sensor = FileSensor(
    task_id='wait_for_file',       # Task ID - 이 태스크의 고유 식별자
    filepath='/path/to/file',      # 감시할 파일 경로 - 이 파일이 존재하면 성공
    poke_interval=60,              # 60초마다 확인 - 파일 존재 여부를 확인하는 간격
    timeout=60 * 60 * 24,          # 최대 24시간 대기 - 이 시간 내에 파일이 나타나지 않으면 실패
    mode='poke',                   # poke 모드 (기본값) - 지속적으로 확인하며 워커를 점유
    dag=dag,                       # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
  1. ExternalTaskSensor: 다른 DAG의 특정 Task가 완료되었을 때 트리거합니다.
from airflow.sensors.external_task import ExternalTaskSensor

external_task_sensor = ExternalTaskSensor(
    task_id='wait_for_other_dag',      # Task ID - 이 태스크의 고유 식별자
    external_dag_id='other_dag',       # 감시할 DAG ID - 다른 DAG의 식별자
    external_task_id='final_task',     # 감시할 Task ID - 다른 DAG의 특정 태스크 식별자
    allowed_states=['success'],        # 허용할 상태 - 이 상태가 되었을 때 성공으로 처리
    execution_delta=timedelta(hours=1), # 1시간 전 실행된 태스크 확인 - 상대 시간으로 찾을 태스크 지정
    dag=dag,                           # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
  1. HttpSensor: HTTP 엔드포인트의 응답이 특정 조건을 만족할 때 트리거합니다.
from airflow.sensors.http_sensor import HttpSensor

http_sensor = HttpSensor(
    task_id='wait_for_api',            # Task ID - 이 태스크의 고유 식별자
    http_conn_id='http_default',       # Connection ID - Airflow UI에서 설정된 HTTP 연결 ID
    endpoint='/api/ready',             # 엔드포인트 - 확인할 API 경로
    response_check=lambda response: response.status_code == 200,  # 응답 검증 - 성공 조건 정의
    poke_interval=60,                  # 60초마다 확인 - API 호출 간격
    timeout=60 * 60,                   # 최대 1시간 대기 - 이 시간 내에 조건이 충족되지 않으면 실패
    dag=dag,                           # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
  1. SqlSensor: SQL 쿼리의 결과가 조건을 만족할 때 트리거합니다.
from airflow.sensors.sql import SqlSensor

sql_sensor = SqlSensor(
    task_id='wait_for_data',           # Task ID - 이 태스크의 고유 식별자
    conn_id='mysql_default',           # Connection ID - Airflow UI에서 설정된 DB 연결 ID
    sql='''
    SELECT COUNT(*) 
    FROM my_table 
    WHERE processing_date = '{{ ds }}'
    ''',                               # 실행할 SQL 쿼리 - 조건 확인용 쿼리, {{ ds }}는 실행 날짜
    success=lambda cnt: cnt > 0,       # 결과가 0보다 크면 성공 - 쿼리 결과 검증 함수
    poke_interval=300,                 # 5분마다 확인 - 쿼리 실행 간격
    timeout=60 * 60 * 2,               # 최대 2시간 대기 - 이 시간 내에 조건이 충족되지 않으면 실패
    dag=dag,                           # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)

 

Cross-DAG 의존성 (DAG 간 의존성)

여러 DAG 간의 의존성을 설정하는 방법입니다:

  1. ExternalTaskSensor 사용: 앞서 설명한 ExternalTaskSensor를 사용하여 다른 DAG의 특정 태스크가 완료되었을 때 트리거되도록 할 수 있습니다.
  2. TriggerDagRunOperator 사용: 특정 태스크가 완료된 후 다른 DAG를 트리거합니다.
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_other_dag = TriggerDagRunOperator(
    task_id='trigger_other_dag',          # Task ID - 이 태스크의 고유 식별자
    trigger_dag_id='other_dag',           # 트리거할 DAG ID - 실행할 DAG의 식별자
    conf={'parent_dag': 'current_dag'},   # 전달할 설정 - 하위 DAG에 전달할 설정 파라미터
    wait_for_completion=True,             # 완료 대기 여부 - True면 하위 DAG가 완료될 때까지 대기
    dag=dag,                              # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)
  1. 데이터 공유를 통한 간접적인 의존성: XCom(Cross-Communication)을 사용하여 DAG 간에 데이터를 공유할 수 있습니다.
# DAG A에서 데이터 푸시
def push_data(**context):
    # XCom을 사용하여 다른 태스크나 DAG에서 사용할 데이터 저장
    data = {'value': 42, 'message': 'Hello from DAG A'}
    # context['ti']는 TaskInstance를 참조
    # xcom_push로 키-값 쌍 형태로 데이터 저장
    context['ti'].xcom_push(key='shared_data', value=data)

task_push = PythonOperator(
    task_id='push_data',            # Task ID - 이 태스크의 고유 식별자
    python_callable=push_data,      # 실행할 Python 함수 - XCom에 데이터를 푸시하는 함수
    provide_context=True,           # 컨텍스트 제공 - 태스크에 컨텍스트(ti 포함) 전달
    dag=dag_a,                      # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)

# DAG B에서 데이터 풀
def pull_data(**context):
    # 다른 DAG(dag_a)의 태스크(push_data)에서 저장한 데이터를 가져옴
    data = context['ti'].xcom_pull(
        dag_id='dag_a',             # 데이터 출처 DAG ID - 데이터가 저장된 DAG
        task_ids='push_data',       # 데이터 출처 Task ID - 데이터를 저장한 태스크
        key='shared_data'           # 데이터 키 - 저장 시 사용한 동일한 키
    )
    print(f"Received data: {data}")  # 받은 데이터 출력

task_pull = PythonOperator(
    task_id='pull_data',            # Task ID - 이 태스크의 고유 식별자
    python_callable=pull_data,      # 실행할 Python 함수 - XCom에서 데이터를 가져오는 함수
    provide_context=True,           # 컨텍스트 제공 - 태스크에 컨텍스트(ti 포함) 전달
    dag=dag_b,                      # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)

 

 

DAG 간 의존성 구현 방법

 

 


📌 DAG 실행 흐름 제어

조건부 실행과 브랜칭

워크플로 내에서 조건에 따라 다른 경로로 분기하는 방법입니다:

  1. BranchPythonOperator 사용: 특정 조건에 따라 다음에 실행할 태스크를 동적으로 결정합니다.
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator

def branch_func(**context):
    # 예: 요일에 따라 다른 경로 선택
    # 현재 요일을 확인하여 주말/주중 태스크 중 어떤 것을 실행할지 결정
    weekday = datetime.now().strftime('%A')
    if weekday in ['Saturday', 'Sunday']:
        return 'weekend_task'  # 주말이면 weekend_task 실행
    else:
        return 'weekday_task'  # 주중이면 weekday_task 실행

branch_task = BranchPythonOperator(
    task_id='branch_task',          # Task ID - 이 태스크의 고유 식별자
    python_callable=branch_func,    # 브랜칭 결정 함수 - 다음 실행 태스크 ID를 반환하는 함수
    provide_context=True,           # 컨텍스트 제공 - 태스크에 컨텍스트 전달
    dag=dag,                        # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)

weekday_task = DummyOperator(
    task_id='weekday_task',         # Task ID - 주중에 실행할 태스크 식별자
    dag=dag,                        # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)

weekend_task = DummyOperator(
    task_id='weekend_task',         # Task ID - 주말에 실행할 태스크 식별자
    dag=dag,                        # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)

# 의존성 설정
# branch_task에서 weekday_task 또는 weekend_task 중 하나만 실행
branch_task >> [weekday_task, weekend_task]
  1. ShortCircuitOperator 사용: 조건이 False면 이후 태스크를 모두 건너뜁니다.
from airflow.operators.python_operator import ShortCircuitOperator

def check_condition(**context):
    # 예: 특정 조건 확인
    # 데이터 가용성을 확인하는 함수
    data_available = check_data_availability()
    return data_available  # True면 계속 진행, False면 중단

check_data = ShortCircuitOperator(
    task_id='check_data',           # Task ID - 이 태스크의 고유 식별자
    python_callable=check_condition, # 조건 확인 함수 - 진행 여부를 반환하는 함수
    provide_context=True,           # 컨텍스트 제공 - 태스크에 컨텍스트 전달
    dag=dag,                        # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)

process_data = DummyOperator(
    task_id='process_data',         # Task ID - 데이터 처리 태스크 식별자
    dag=dag,                        # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)

send_notification = DummyOperator(
    task_id='send_notification',    # Task ID - 알림 발송 태스크 식별자
    dag=dag,                        # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
)

# 의존성 설정
# check_data가 True를 반환하면 process_data 실행, False면 모두 건너뜀
check_data >> process_data >> send_notification

 

동적 태스크 생성

실행 시점에 동적으로 태스크를 생성하는 방법입니다:

  1. 파라미터를 통한 동적 태스크 생성:
# 여러 지역에 대해 동일한 처리를 수행하는 태스크 생성
regions = ['us', 'eu', 'asia']  # 처리할 지역 목록
tasks = []  # 생성된 태스크를 저장할 리스트

# 각 지역마다 개별 태스크 생성
for region in regions:
    # 지역별 처리 태스크 생성
    task = PythonOperator(
        task_id=f'process_{region}',  # 동적 Task ID - 지역명을 포함한 고유 식별자
        python_callable=process_region,  # 처리 함수 - 모든 지역에 동일한 함수 사용
        op_kwargs={'region': region},  # 함수에 전달할 매개변수 - 지역 정보 전달
        dag=dag,  # 속한 DAG - 이 태스크가 속할 DAG 객체 참조
    )
    tasks.append(task)  # 생성된 태스크를 리스트에 추가

# 시작과 종료 더미 태스크 생성
start_task = DummyOperator(
    task_id='start',  # 시작 태스크 ID
    dag=dag,  # 속한 DAG
)

end_task = DummyOperator(
    task_id='end',  # 종료 태스크 ID
    dag=dag,  # 속한 DAG
)

# 실행 순서 설정: 시작 -> 모든 지역 처리(병렬) -> 종료
start_task >> tasks >> end_task
  1. TaskGroups를 사용한 그룹화: 관련 태스크를 그룹으로 묶어 UI에서 더 깔끔하게 볼 수 있습니다.
from airflow.utils.task_group import TaskGroup

# 각 지역에 대한 데이터 처리 워크플로 그룹 생성
regions = ['us', 'eu', 'asia']

with DAG('regional_processing', ...) as dag:
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')
    
    # 각 지역별로 TaskGroup 생성
    for region in regions:
        # 지역명으로 태스크 그룹 생성 - UI에서 확장/축소 가능한 그룹으로 표시됨
        with TaskGroup(group_id=f'process_{region}') as region_group:
            # 해당 지역 내에서의 작업 단계 정의
            extract = PythonOperator(
                task_id='extract',  # 그룹 내 태스크 ID - 실제 ID는 'process_{region}.extract'
                python_callable=extract_data,
                op_kwargs={'region': region},
            )
            
            transform = PythonOperator(
                task_id='transform',  # 그룹 내 태스크 ID
                python_callable=transform_data,
                op_kwargs={'region': region},
            )
            
            load = PythonOperator(
                task_id='load',  # 그룹 내 태스크 ID
                python_callable=load_data,
                op_kwargs={'region': region},
            )
            
            # 그룹 내 태스크 간 의존성 설정
            extract >> transform >> load
        
        # 메인 워크플로 의존성: 시작 -> 지역별 처리 -> 종료
        start >> region_group >> end

Summary

  • **DAG(Directed Acyclic Graph)**는 Airflow의 핵심 개념으로, 작업 흐름을 방향성 있는 비순환 그래프로 표현합니다.
  • DAG는 명확한 워크플로 정의, 의존성 관리, 재사용성, 모니터링 및 디버깅, 확장성을 제공합니다.
  • DAG의 핵심 구성 요소는 DAG 객체, Task(Operator), Task Dependencies입니다.
  • Airflow는 BashOperator, PythonOperator, EmailOperator 등 다양한 Operator를 제공합니다.
  • Task 간 의존성은 비트시프트 연산자(>>, <<), set_upstream/set_downstream 메소드로 정의합니다.
  • DAG 설계 시 모듈화, 명명 규칙, 문서화, 매개변수화, 태그 활용이 중요합니다.
  • DAG는 Schedule, Manual, API, Sensor, Dependencies 등 다양한 방식으로 트리거됩니다.
  • BranchPythonOperatorShortCircuitOperator를 사용해 조건부 실행과 분기 처리가 가능합니다.
  • 동적 태스크 생성과 TaskGroups를 활용해 복잡한 워크플로를 효율적으로 관리할 수 있습니다.
  • XCom을 통해 DAG 간 데이터를 공유하고, ExternalTaskSensorTriggerDagRunOperator로 DAG 간 의존성을 구현할 수 있습니다.
728x90