Data Engineering/Airflow

[Airflow 가이드 ep.8] 2부 DAG 작성 #3 | Branching, SubDAG, Dynamic DAG 활용 전략

ygtoken 2025. 3. 25. 17:52
728x90

이 글에서는 Airflow DAG 작성의 고급 기법인 Branching, SubDAG, Dynamic DAG에 대해 알아봅니다. 기본적인 DAG 구조를 넘어 더 복잡하고 유연한 워크플로우를 구현하는 방법을 실전 예제와 함께 살펴보겠습니다. 다양한 비즈니스 로직을 Airflow에 효과적으로 구현하기 위한 전략을 배워보세요.


📌 Branching 이해하기

Airflow에서 Branching은 조건에 따라 다른 경로로 워크플로우를 분기하는 기능입니다. 데이터 처리 파이프라인에서 상황에 맞게 다른 작업을 수행해야 할 때 유용합니다.

✅ BranchPythonOperator의 기본 원리

BranchPythonOperator는 Python 함수의 반환값에 따라 워크플로우의 다음 경로를 결정합니다.

 

BranchPythonOperator 작동방식 다이어그램

 

from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta

# DAG 기본 설정값 정의
# owner: DAG 소유자/관리자 지정
# depends_on_past: False로 설정하여 이전 DAG 실행 결과에 의존하지 않음
# start_date: DAG 실행 시작 날짜 (스케줄러가 이 날짜부터 DAG 실행 시작)
# email_on_failure: 태스크 실패 시 이메일 알림 비활성화
# email_on_retry: 태스크 재시도 시 이메일 알림 비활성화
# retries: 태스크 실패 시 재시도 횟수 (1회)
# retry_delay: 재시도 간 대기 시간 (5분)
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG 정의
# dag_id: DAG의 고유 식별자 (Airflow UI에 표시됨)
# default_args: 위에서 정의한 기본 설정값 적용
# description: DAG에 대한 설명 (Airflow UI에 표시됨)
# schedule_interval: DAG 실행 주기 (매일 한 번)
dag = DAG(
    'branching_example',
    default_args=default_args,
    description='분기 처리 예제 DAG',
    schedule_interval=timedelta(days=1),
)

# 분기 결정을 위한 함수 정의
# context: Airflow가 제공하는 컨텍스트 변수 (execution_date, task_instance 등)
def branch_func(**context):
    # 현재 시간을 기준으로 분기 결정 (홀수 시간/짝수 시간)
    hour = datetime.now().hour
    
    # 현재 시간이 짝수인지 홀수인지 확인
    if hour % 2 == 0:
        return 'even_hour_task'  # 짝수 시간에는 이 태스크로 분기
    else:
        return 'odd_hour_task'   # 홀수 시간에는 이 태스크로 분기
    # 중요: 이 함수는 다음에 실행할 태스크의 task_id를 문자열로 반환해야 함

# 분기 태스크 정의
# task_id: 태스크의 고유 식별자
# python_callable: 실행할 Python 함수 (위에서 정의한 branch_func)
# provide_context: True로 설정하여 Airflow 컨텍스트 변수를 함수에 전달
# dag: 이 태스크가 속한 DAG 객체
branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=branch_func,  # 위에서 정의한 분기 함수 사용
    provide_context=True,         # 컨텍스트 제공 (XCom 등 활용 가능)
    dag=dag,
)

# 홀수 시간 처리 태스크
# kwargs: Airflow가 전달하는 키워드 인자 (task_instance, execution_date 등)
def odd_hour_process(**kwargs):
    print("홀수 시간에 실행되는 프로세스입니다.")
    # 실제 환경에서는 여기에 데이터 처리 로직 구현
    # 예: 특정 DB 테이블 업데이트, API 호출, 파일 처리 등
    return "홀수 시간 처리 완료"  # 반환값은 XCom에 저장됨

# 홀수 시간 태스크 정의
# python_callable: 실행할 Python 함수 (위에서 정의한 odd_hour_process)
odd_hour_task = PythonOperator(
    task_id='odd_hour_task',
    python_callable=odd_hour_process,
    dag=dag,
)

# 짝수 시간 처리 태스크
def even_hour_process(**kwargs):
    print("짝수 시간에 실행되는 프로세스입니다.")
    # 실제 환경에서는 여기에 다른 데이터 처리 로직 구현
    return "짝수 시간 처리 완료"  # 반환값은 XCom에 저장됨

# 짝수 시간 태스크 정의
even_hour_task = PythonOperator(
    task_id='even_hour_task',
    python_callable=even_hour_process,
    dag=dag,
)

# 공통 종료 태스크
# trigger_rule: 선행 태스크 실행 조건
# - 'one_success': 선행 태스크 중 하나만 성공해도 실행 (분기 처리에 적합)
# - 기본값은 'all_success'로, 모든 선행 태스크가 성공해야 실행
end_task = DummyOperator(
    task_id='end_task',
    trigger_rule='one_success',  # 선행 태스크 중 하나만 성공해도 실행
    dag=dag,
)

# 태스크 의존성 설정
# branch_task -> [odd_hour_task, even_hour_task] -> end_task
# 1. branch_task는 odd_hour_task와 even_hour_task의 선행 태스크
# 2. branch_task의 반환값에 따라 odd_hour_task 또는 even_hour_task 중 하나만 실행
# 3. 실행된 태스크가 완료되면 end_task가 실행됨
branch_task >> [odd_hour_task, even_hour_task] >> end_task

✅ 복잡한 분기 처리 전략

여러 조건을 기반으로 한 복잡한 분기 처리도 가능합니다.

 

복잡한 분기 처리 전략을 이용한 워크플로우

 

from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta

# DAG 기본 설정값 정의
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,     # 이전 DAG 실행 결과에 의존하지 않음
    'start_date': datetime(2023, 1, 1),  # DAG 실행 시작 날짜
    'email_on_failure': False,    # 태스크 실패 시 이메일 알림 비활성화
    'retries': 1,                 # 태스크 실패 시 재시도 횟수
    'retry_delay': timedelta(minutes=5),  # 재시도 간 대기 시간
}

# DAG 정의: 복잡한 분기 로직을 담은 DAG
dag = DAG(
    'complex_branching',          # DAG ID (고유 식별자)
    default_args=default_args,    # 기본 설정값 적용
    description='복잡한 분기 로직을 갖는 DAG',  # DAG 설명
    schedule_interval='@daily',   # 매일 실행 (cronexp: '0 0 * * *')
)

# 데이터 검증 및 분기 결정 함수
# 여러 조건(요일, 시간, 데이터 볼륨)에 따라 다른 태스크로 분기
def check_data_quality(**context):
    # context에서 execution_date 가져오기 (DAG가 실행되는 논리적 날짜/시간)
    execution_date = context['execution_date']
    
    # 요일 확인 (0=월요일, 6=일요일)
    weekday = execution_date.weekday()
    # 시간 확인 (0-23)
    hour = execution_date.hour
    
    # 복잡한 조건부 분기 로직
    if weekday < 5:  # 평일 (월-금)
        if hour < 12:  # 오전
            # 평일 오전 처리 태스크로 분기
            return 'process_morning_workday'
        else:  # 오후
            # 평일 오후 처리 태스크로 분기
            return 'process_afternoon_workday'
    else:  # 주말 (토-일)
        # 데이터 볼륨 체크 (외부 시스템이나 DB 쿼리 결과를 시뮬레이션)
        data_volume = check_data_volume()  # 가상 함수 호출
        if data_volume > 1000:  # 대용량 데이터
            # 주말 대용량 처리 태스크로 분기
            return 'process_high_volume_weekend'
        else:  # 소용량 데이터
            # 주말 소용량 처리 태스크로 분기
            return 'process_low_volume_weekend'

# 가상의 데이터 볼륨 체크 함수
# 실제로는 DB 쿼리, API 호출 등으로 데이터 양을 확인
def check_data_volume():
    # 시뮬레이션: 500~1500 사이의 임의 값 반환
    import random
    return random.randint(500, 1500)  # 임의의 데이터 볼륨 반환

# 분기 태스크 정의
# BranchPythonOperator: Python 함수 결과에 따라 다음 실행 경로를 결정
branch_operator = BranchPythonOperator(
    task_id='check_data_condition',  # 태스크 ID
    python_callable=check_data_quality,  # 분기 결정 함수
    provide_context=True,  # Airflow 컨텍스트 제공
    dag=dag,  # 이 태스크가 속한 DAG
)

# 각 경로별 처리 태스크 정의
# 실제 환경에서는 DummyOperator 대신 실제 작업을 수행하는 Operator 사용
# 평일 오전 처리 태스크
process_morning_workday = DummyOperator(
    task_id='process_morning_workday',
    dag=dag,
)

# 평일 오후 처리 태스크
process_afternoon_workday = DummyOperator(
    task_id='process_afternoon_workday',
    dag=dag,
)

# 주말 대용량 처리 태스크
process_high_volume_weekend = DummyOperator(
    task_id='process_high_volume_weekend',
    dag=dag,
)

# 주말 소용량 처리 태스크
process_low_volume_weekend = DummyOperator(
    task_id='process_low_volume_weekend',
    dag=dag,
)

# 후속 병합 태스크: 평일 처리 결과 취합
# trigger_rule='one_success': 선행 태스크 중 하나만 성공해도 실행
join_workday = DummyOperator(
    task_id='join_workday',
    trigger_rule='one_success',  # 평일 오전/오후 태스크 중 하나만 성공해도 실행
    dag=dag,
)

# 후속 병합 태스크: 주말 처리 결과 취합
join_weekend = DummyOperator(
    task_id='join_weekend',
    trigger_rule='one_success',  # 주말 대용량/소용량 태스크 중 하나만 성공해도 실행
    dag=dag,
)

# 최종 태스크: 모든 처리 완료 후 실행
# trigger_rule='all_success': 모든 선행 태스크가 성공해야 실행
end_task = DummyOperator(
    task_id='end_processing',
    trigger_rule='all_success',  # join_workday와 join_weekend 모두 성공해야 실행
    dag=dag,
)

# 태스크 의존성 설정
# 1. 분기 태스크 -> 각 경로별 처리 태스크
branch_operator >> [process_morning_workday, process_afternoon_workday, 
                   process_high_volume_weekend, process_low_volume_weekend]

# 2. 평일 처리 태스크 -> 평일 병합 태스크
process_morning_workday >> join_workday
process_afternoon_workday >> join_workday

# 3. 주말 처리 태스크 -> 주말 병합 태스크
process_high_volume_weekend >> join_weekend
process_low_volume_weekend >> join_weekend

# 4. 평일/주말 병합 태스크 -> 최종 태스크
[join_workday, join_weekend] >> end_task

✅ 분기 처리 시 주의사항

  1. SKIPPED 상태 처리: BranchPythonOperator는 선택되지 않은 경로의 모든 태스크를 SKIPPED 상태로 만듭니다.
  2. trigger_rule 활용: 분기 이후 태스크의 trigger_rule을 적절히 설정하는 것이 중요합니다.
    • all_success: 모든 선행 태스크가 성공해야 실행 (기본값)
    • one_success: 하나의 선행 태스크만 성공해도 실행
    • none_failed: 실패한 선행 태스크가 없으면 실행 (SKIPPED는 실패로 간주하지 않음)

📌 SubDAG로 모듈화하기

SubDAG는 복잡한 워크플로우를 모듈화하여 관리할 수 있게 해주는 기능입니다. 반복적인 태스크 패턴을 재사용하거나 DAG 구조를 더 깔끔하게 유지할 수 있습니다.

 

SubDAG 구조 및 계층 관계

 

✅ SubDAG의 기본 개념

SubDAG는 별도의 DAG 객체로 정의되어 부모 DAG 내에서 하나의 태스크처럼 사용됩니다.

from airflow import DAG
from airflow.operators.subdag import SubDagOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta

# 기본 DAG 설정
# owner: DAG 소유자/관리자
# depends_on_past: 이전 DAG 실행 결과에 의존하지 않음
# start_date: DAG 실행 시작 날짜
# email_on_failure: 태스크 실패 시 이메일 알림 비활성화
# retries: 태스크 실패 시 재시도 횟수
# retry_delay: 재시도 간 대기 시간
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# 메인 DAG 정의
# dag_id: 메인 DAG의 고유 식별자
# default_args: 기본 설정값 적용
# schedule_interval: 매일 실행
main_dag = DAG(
    'main_dag_with_subdag',
    default_args=default_args,
    schedule_interval='@daily',
)

# SubDAG를 생성하는 함수
# parent_dag_id: 부모 DAG의 ID
# child_dag_id: SubDAG의 태스크 ID (부모 DAG 내에서)
# default_args: 기본 설정값 (부모 DAG와 동일하게 설정)
def create_processing_subdag(parent_dag_id, child_dag_id, default_args):
    # SubDAG 정의
    # dag_id: SubDAG의 고유 식별자 (부모DAG.자식DAG 형식으로 설정해야 함)
    # default_args: 기본 설정값 적용
    # schedule_interval: None으로 설정 (SubDAG는 부모 DAG에 의해 실행됨)
    dag_subdag = DAG(
        dag_id=f'{parent_dag_id}.{child_dag_id}',  # 중요: SubDAG ID는 parent.child 형식
        default_args=default_args,
        schedule_interval=None,  # SubDAG는 독립적인 스케줄을 가지지 않음
    )
    
    # SubDAG의 시작 태스크
    # 실제로는 데이터 전처리, 환경 설정 등을 수행
    start = DummyOperator(
        task_id='start_subdag',  # SubDAG 내 태스크 ID
        dag=dag_subdag,  # 이 태스크가 속한 SubDAG
    )
    
    # 여러 병렬 처리 태스크 생성
    # 실제로는 데이터 변환, 검증, 집계 등 다양한 작업 수행
    for i in range(5):  # 5개의 병렬 태스크 생성
        # 각 태스크는 고유 ID를 가져야 함
        process = DummyOperator(
            task_id=f'process_task_{i}',  # 태스크 ID: process_task_0, process_task_1, ...
            dag=dag_subdag,  # 이 태스크가 속한 SubDAG
        )
        
        # 태스크 의존성 설정: 시작 태스크 -> 각 처리 태스크
        start >> process
    
    # 생성된 SubDAG 반환
    return dag_subdag

# 메인 DAG의 시작 태스크
# 실제로는 데이터 수집, 초기화 등을 수행
start_task = DummyOperator(
    task_id='start',  # 태스크 ID
    dag=main_dag,  # 이 태스크가 속한 DAG
)

# SubDAG 연결
# task_id: 부모 DAG 내에서의 SubDAG 태스크 ID
# subdag: 위에서 생성한 SubDAG 객체
# dag: 이 태스크가 속한 부모 DAG
processing_subdag = SubDagOperator(
    task_id='processing_tasks',  # 이 ID가 SubDAG의 부모 DAG 내에서의 ID
    subdag=create_processing_subdag(
        parent_dag_id='main_dag_with_subdag',  # 부모 DAG ID
        child_dag_id='processing_tasks',       # 자식 DAG ID (SubDAG의 task_id와 동일)
        default_args=default_args,             # 기본 설정값
    ),
    dag=main_dag,  # 이 태스크가 속한 부모 DAG
)

# 종료 태스크
# 실제로는 결과 저장, 알림 발송, 리소스 정리 등을 수행
end_task = DummyOperator(
    task_id='end',  # 태스크 ID
    dag=main_dag,  # 이 태스크가 속한 DAG
)

# 메인 DAG 태스크 의존성 설정
# 시작 태스크 -> SubDAG -> 종료 태스크
start_task >> processing_subdag >> end_task

✅ SubDAG 활용 사례: 데이터 검증 로직

반복적인 데이터 검증 로직을 SubDAG로 모듈화하여 재사용하는 예시입니다.

 

ETL 파이프라인의 데이터 검증 SubDAG

from airflow import DAG
from airflow.operators.subdag import SubDagOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# 기본 설정값 정의
# owner: DAG 소유자/관리자
# depends_on_past: 이전 DAG 실행 결과에 의존하지 않음
# start_date: DAG 실행 시작 날짜
# email_on_failure: 태스크 실패 시 이메일 알림 활성화
# retries: 태스크 실패 시 재시도 횟수
# retry_delay: 재시도 간 대기 시간
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': True,  # 실패 시 이메일 알림 활성화 (데이터 검증은 중요하므로)
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# 메인 DAG 정의
# dag_id: ETL 파이프라인의 고유 식별자
# default_args: 기본 설정값 적용
# schedule_interval: 매일 실행
# max_active_runs: 동시에 실행될 수 있는 최대 DAG 인스턴스 수 (1로 제한)
main_dag_id = 'etl_pipeline'
main_dag = DAG(
    main_dag_id,
    default_args=default_args,
    schedule_interval='@daily',
    max_active_runs=1,  # 동시에 1개의 인스턴스만 실행 (데이터 정합성 유지)
)

# 데이터 검증용 SubDAG 생성 함수
# parent_dag_id: 부모 DAG의 ID
# child_dag_id: SubDAG의 태스크 ID
# default_args: 기본 설정값
# table_list: 검증할 테이블 목록
def create_data_validation_subdag(parent_dag_id, child_dag_id, default_args, table_list):
    """
    여러 테이블에 대한 데이터 검증을 수행하는 SubDAG
    
    Args:
        parent_dag_id: 부모 DAG의 ID
        child_dag_id: SubDAG의 태스크 ID
        default_args: DAG의 기본 인자
        table_list: 검증할 테이블 목록
    
    Returns:
        생성된 SubDAG 객체
    """
    # SubDAG 정의
    # dag_id: SubDAG의 고유 식별자 (부모DAG.자식DAG 형식)
    # default_args: 기본 설정값 적용
    # schedule_interval: None으로 설정 (SubDAG는 부모 DAG에 의해 실행됨)
    subdag = DAG(
        dag_id=f'{parent_dag_id}.{child_dag_id}',
        default_args=default_args,
        schedule_interval=None,
    )
    
    # 검증 시작 태스크 - 모든 검증 작업 전에 실행
    # 실제로는 검증 환경 설정, 로그 초기화 등을 수행
    start_validation = DummyOperator(
        task_id='start_validation',
        dag=subdag,
    )
    
    # 검증 완료 태스크 - 모든 검증 작업 후에 실행
    # trigger_rule='all_done': 모든 선행 태스크가 완료되면 실행 (성공/실패/스킵 상관없이)
    end_validation = DummyOperator(
        task_id='end_validation',
        trigger_rule='all_done',  # 모든 테이블 검증 작업이 완료되면 진행
        dag=subdag,
    )
    
    # 각 테이블별 데이터 검증 함수
    # table_name: 검증할 테이블 이름
    # kwargs: Airflow가 전달하는 키워드 인자 (task_instance, execution_date 등)
    def validate_table_data(table_name, **kwargs):
        print(f"테이블 {table_name}에 대한 데이터 검증 시작")
        
        # 실제로는 여기서 다양한 검증 로직 수행
        # 1. 데이터 수 체크 - 테이블의 레코드 수가 예상 범위 내인지 확인
        # 2. NULL 값 비율 체크 - 중요 컬럼의 NULL 값 비율이 임계치 이하인지 확인
        # 3. 유효 범위 체크 - 숫자형 데이터가 유효 범위 내인지 확인
        # 4. 중복 데이터 체크 - 고유해야 하는 레코드가 중복되지 않았는지 확인
        # 5. 외래 키 무결성 체크 - 참조 무결성이 유지되는지 확인
        
        # 검증 결과를 XCom에 저장 (다른 태스크에서 활용 가능)
        validation_result = {
            'table_name': table_name,
            'record_count': 1000,  # 예시 값 (실제로는 DB 쿼리 결과)
            'null_ratio': 0.02,    # 예시 값 (실제로는 계산된 값)
            'validation_passed': True,  # 검증 통과 여부
        }
        
        # 결과 반환 (XCom에 자동 저장됨)
        return validation_result
    
    # 각 테이블에 대한 검증 태스크 생성
    validation_tasks = {}
    for table in table_list:
        # 테이블별 검증 태스크 생성
        # task_id: 고유한 태스크 ID (validate_테이블명 형식)
        # python_callable: 실행할 Python 함수
        # op_kwargs: 함수에 전달할 인자 (테이블 이름)
        validation_task = PythonOperator(
            task_id=f'validate_{table}',
            python_callable=validate_table_data,
            op_kwargs={'table_name': table},
            dag=subdag,
        )
        # 생성한 태스크를 딕셔너리에 저장 (필요시 추가 처리를 위해)
        validation_tasks[table] = validation_task
        
        # 태스크 순서 설정
        # 시작 태스크 -> 각 테이블 검증 태스크 -> 종료 태스크
        start_validation >> validation_task >> end_validation
    
    # 생성된 SubDAG 반환
    return subdag

# 검증할 테이블 목록
# 검증할 테이블 목록
tables_to_validate = ['users', 'orders', 'products', 'transactions']

# 메인 DAG 태스크들 정의
# 1. 데이터 추출 태스크 - ETL의 첫 단계
# 실제로는 외부 시스템에서 데이터를 가져오는 작업 수행
extract_task = DummyOperator(
    task_id='extract_data',
    dag=main_dag,
)

# 2. 데이터 변환 태스크 - ETL의 두 번째 단계
# 실제로는 추출한 데이터를 필요한 형식으로 변환하는 작업 수행
transform_task = DummyOperator(
    task_id='transform_data',
    dag=main_dag,
)

# 3. 데이터 검증 태스크 (SubDAG)
# SubDAG를 사용하여 여러 테이블에 대한 검증 로직을 모듈화
validate_data = SubDagOperator(
    task_id='validate_data',  # 메인 DAG 내에서의 SubDAG 태스크 ID
    subdag=create_data_validation_subdag(
        parent_dag_id=main_dag_id,  # 부모 DAG ID
        child_dag_id='validate_data',  # 자식 DAG ID (SubDAG의 task_id와 동일)
        default_args=default_args,  # 기본 설정값
        table_list=tables_to_validate,  # 검증할 테이블 목록
    ),
    dag=main_dag,
)

# 4. 데이터 적재 태스크 - ETL의 마지막 단계
# 실제로는 변환 및 검증이 완료된 데이터를 타겟 시스템에 적재하는 작업 수행
load_task = DummyOperator(
    task_id='load_data',
    dag=main_dag,
)

# 메인 DAG 태스크 의존성 설정
# 추출 -> 변환 -> 검증(SubDAG) -> 적재
# ETL 파이프라인의 일반적인 흐름을 정의
extract_task >> transform_task >> validate_data >> load_task

✅ SubDAG 사용 시 주의사항

  1. 성능 이슈: SubDAG는 기본적으로 부모 DAG와 동일한 executor slot을 공유합니다. 이로 인해 병목 현상이 발생할 수 있습니다.
  2. 데드락 위험: SubDAG가 부모 DAG의 실행을 차단할 위험이 있습니다.
  3. TaskGroup 대체 고려: Airflow 2.0 이후에는 SubDAG 대신 TaskGroup을 사용하는 것이 권장됩니다.

 

TaskGroup vs SubDAG 비교

 

# Airflow 2.0 이상에서는 TaskGroup 사용 권장
from airflow.utils.task_group import TaskGroup

with DAG('example_dag', ...) as dag:
    
    # TaskGroup으로 태스크 그룹화
    # group_id: 태스크 그룹의 고유 식별자
    # UI에서 TaskGroup은 확장/축소 가능한 그룹으로 표시됨
    with TaskGroup(group_id='validation_tasks') as validation_group:
        # 각 테이블에 대한 검증 태스크 생성
        for table in ['users', 'orders', 'products']:
            # 테이블별 검증 태스크
            PythonOperator(
                task_id=f'validate_{table}',  # TaskGroup 내의 태스크 ID
                python_callable=validate_table_data,  # 실행할 함수
                op_kwargs={'table_name': table},  # 함수에 전달할 인자
            )
    
    # TaskGroup은 태스크처럼 사용 가능
    # 예: extract_task >> validation_group >> load_task

 


📌 Dynamic DAG 활용 전략

Dynamic DAG는 외부 설정, 메타데이터 또는 실행 시점의 조건에 따라 DAG 구조를 동적으로 생성하는 기법입니다.

 

Dynamic DAG 생성 프로세스

 

✅ 외부 설정 기반 Dynamic DAG

YAML, JSON 등의 외부 설정 파일을 기반으로 DAG를 동적으로 생성할 수 있습니다.

import yaml
import os
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# 기본 설정값 정의
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG 설정 파일 경로
# 실제 환경에서는 Airflow 설정 디렉토리 내 파일 사용
dags_config_file = '/opt/airflow/dags/configs/dynamic_dags_config.yaml'

# YAML 설정 파일 불러오기
# 파일이 존재하지 않으면 예외 처리
if os.path.exists(dags_config_file):
    with open(dags_config_file, 'r') as file:
        # YAML 파일 파싱
        dags_config = yaml.safe_load(file)
else:
    # 설정 파일이 없을 경우 기본 설정 사용
    dags_config = {
        'dynamic_dags': [
            {
                'dag_id': 'dynamic_dag_default',
                'description': '기본 Dynamic DAG',
                'schedule_interval': '@daily',
                'tasks': [
                    {'task_id': 'task_1', 'task_type': 'python', 'python_callable': 'process_data'}
                ]
            }
        ]
    }

# 각 태스크 유형별 처리 함수
# 실제로는 여기에 다양한 데이터 처리 로직 구현
def process_data(**kwargs):
    """기본 데이터 처리 함수"""
    task_config = kwargs.get('task_config', {})
    print(f"데이터 처리 중: {task_config.get('params', {})}")
    return "데이터 처리 완료"

def send_notification(**kwargs):
    """알림 발송 함수"""
    task_config = kwargs.get('task_config', {})
    recipients = task_config.get('params', {}).get('recipients', [])
    print(f"알림 발송 중: {recipients}")
    return "알림 발송 완료"

def generate_report(**kwargs):
    """보고서 생성 함수"""
    task_config = kwargs.get('task_config', {})
    report_type = task_config.get('params', {}).get('report_type', 'default')
    print(f"{report_type} 보고서 생성 중")
    return f"{report_type} 보고서 생성 완료"

# 함수 매핑 테이블 - 태스크 유형별 처리 함수 매핑
task_functions = {
    'process_data': process_data,
    'send_notification': send_notification,
    'generate_report': generate_report,
}

# 동적 DAG 생성
for dag_config in dags_config.get('dynamic_dags', []):
    # 각 DAG 설정 가져오기
    dag_id = dag_config.get('dag_id')
    description = dag_config.get('description', f'Dynamic DAG: {dag_id}')
    schedule_interval = dag_config.get('schedule_interval', '@daily')
    
    # DAG 객체 생성
    dag = DAG(
        dag_id=dag_id,
        default_args=default_args,
        description=description,
        schedule_interval=schedule_interval,
        # catchup: False로 설정하여 과거 실행을 건너뜀
        catchup=False,
    )
    
    # 현재 범위에 DAG 객체 등록 (globals()에 추가)
    # Airflow가 이 파일을 파싱할 때 DAG 객체를 찾을 수 있도록 함
    globals()[dag_id] = dag
    
    # 각 DAG에 태스크 추가
    tasks = {}
    
    # 설정 파일에 정의된 태스크 생성
    for task_config in dag_config.get('tasks', []):
        task_id = task_config.get('task_id')
        task_type = task_config.get('task_type')
        
        # Python 함수 태스크인 경우
        if task_type == 'python':
            # 설정에서 함수 이름 가져오기
            func_name = task_config.get('python_callable')
            # 함수 매핑 테이블에서 실제 함수 객체 가져오기
            python_callable = task_functions.get(func_name)
            
            if python_callable:
                # PythonOperator 태스크 생성
                task = PythonOperator(
                    task_id=task_id,
                    python_callable=python_callable,
                    # 태스크 설정을 함수에 전달
                    op_kwargs={'task_config': task_config},
                    dag=dag,
                )
                # 생성된 태스크를 딕셔너리에 저장
                tasks[task_id] = task
    
    # 태스크 의존성 설정
    # 설정 파일에 정의된 의존성 관계 적용
    for task_config in dag_config.get('tasks', []):
        task_id = task_config.get('task_id')
        # 현재 태스크가 의존하는 선행 태스크 목록
        upstream_tasks = task_config.get('upstream_tasks', [])
        
        # 현재 태스크가 딕셔너리에 있고, 선행 태스크가 있는 경우
        if task_id in tasks and upstream_tasks:
            # 각 선행 태스크에 대해 의존성 설정
            for upstream_task_id in upstream_tasks:
                if upstream_task_id in tasks:
                    # 선행 태스크 >> 현재 태스크
                    tasks[upstream_task_id] >> tasks[task_id]

✅ DB 메타데이터 기반 Dynamic DAG

데이터베이스의 테이블 정보를 기반으로 자동으로 ETL 파이프라인을 생성하는 예시입니다.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime, timedelta
import json

# 기본 설정값 정의
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DB에서 테이블 목록 가져오기
# Airflow Connection에 등록된 DB 연결 정보 사용
def get_tables_from_db():
    """
    DB에서 테이블 목록을 가져오는 함수
    
    PostgreSQL 예시:
    - information_schema.tables에서 테이블 정보 조회
    - 시스템 테이블 제외 (pg_로 시작하지 않는 테이블)
    - 특정 스키마의 테이블만 대상 (예: 'public')
    
    Returns:
        list: 테이블 메타데이터 목록 (테이블명, 컬럼, 생성일 등)
    """
    # PostgreSQL DB 연결
    pg_hook = PostgresHook(postgres_conn_id='postgres_dwh')
    
    # 테이블 목록 조회 쿼리
    # 실제 환경에 맞게 쿼리 수정 필요
    tables_query = """
    SELECT 
        table_name,
        table_schema,
        (
            SELECT json_agg(column_name) 
            FROM information_schema.columns 
            WHERE table_name = t.table_name AND table_schema = t.table_schema
        ) as columns,
        to_char(now(), 'YYYY-MM-DD') as extraction_date
    FROM 
        information_schema.tables t
    WHERE 
        table_schema = 'public' AND 
        table_type = 'BASE TABLE' AND
        table_name NOT LIKE 'pg_%'
    ORDER BY 
        table_name;
    """
    
    # 쿼리 실행 및 결과 반환
    tables = pg_hook.get_records(tables_query)
    
    # 결과 가공
    table_metadata = []
    for table in tables:
        # 테이블 메타데이터 구성
        metadata = {
            'table_name': table[0],
            'schema': table[1],
            'columns': json.loads(table[2]) if table[2] else [],
            'extraction_date': table[3]
        }
        table_metadata.append(metadata)
    
    return table_metadata

# 각 테이블별 ETL 작업 함수들
def extract_table_data(table_name, **kwargs):
    """테이블 데이터 추출 함수"""
    print(f"테이블 {table_name}의 데이터 추출 중...")
    # 실제로는 여기서 DB 연결 및 데이터 추출 로직 구현
    return f"테이블 {table_name} 데이터 추출 완료"

def transform_table_data(table_name, **kwargs):
    """테이블 데이터 변환 함수"""
    print(f"테이블 {table_name}의 데이터 변환 중...")
    # 실제로는 여기서 데이터 정제, 변환 로직 구현
    return f"테이블 {table_name} 데이터 변환 완료"

def load_table_data(table_name, **kwargs):
    """테이블 데이터 적재 함수"""
    print(f"테이블 {table_name}의 데이터 적재 중...")
    # 실제로는 여기서 타겟 시스템에 데이터 적재 로직 구현
    return f"테이블 {table_name} 데이터 적재 완료"

# DB에서 테이블 목록 가져오기
table_metadata = get_tables_from_db()

# 각 테이블별 ETL DAG 동적 생성
for metadata in table_metadata:
    table_name = metadata['table_name']
    
    # DAG ID 생성 (테이블명 기반)
    dag_id = f'etl_{table_name}_dag'
    
    # DAG 객체 생성
    dag = DAG(
        dag_id=dag_id,
        default_args=default_args,
        description=f'테이블 {table_name}에 대한 ETL 파이프라인',
        # 테이블 특성에 따라 스케줄 간격 조정 가능
        schedule_interval='@daily',
        catchup=False,
    )
    
    # DAG 객체를 globals()에 등록
    globals()[dag_id] = dag
    
    # 1단계: 데이터 추출 태스크
    extract_task = PythonOperator(
        task_id=f'extract_{table_name}',
        python_callable=extract_table_data,
        op_kwargs={'table_name': table_name,
                   'metadata': metadata},
        dag=dag,
    )
    
    # 2단계: 데이터 변환 태스크
    transform_task = PythonOperator(
        task_id=f'transform_{table_name}',
        python_callable=transform_table_data,
        op_kwargs={'table_name': table_name,
                   'metadata': metadata},
        dag=dag,
    )
    
    # 3단계: 데이터 적재 태스크
    load_task = PythonOperator(
        task_id=f'load_{table_name}',
        python_callable=load_table_data,
        op_kwargs={'table_name': table_name,
                   'metadata': metadata},
        dag=dag,
    )
    
    # 태스크 의존성 설정: 추출 -> 변환 -> 적재
    extract_task >> transform_task >> load_task

✅ 고급 Dynamic DAG 패턴: 팩토리 함수

DAG 생성을 위한 팩토리 함수 패턴으로 코드 재사용성을 높입니다.

 

DAG 팩토리 패턴 기반 아키텍처

 

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta
import os

# 기본 설정값 정의
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG 생성 팩토리 함수
def create_data_pipeline_dag(
    dag_id,                  # DAG 고유 식별자
    schedule_interval,       # 실행 주기
    source_system,           # 데이터 소스 시스템
    target_system,           # 데이터 타겟 시스템
    tables,                  # 처리할 테이블 목록
    default_args=default_args  # 기본 설정값
):
    """
    데이터 파이프라인 DAG 생성 팩토리 함수
    
    Args:
        dag_id: DAG 고유 식별자
        schedule_interval: 실행 주기 (cron 표현식 또는 macros)
        source_system: 데이터 소스 시스템 정보 (dict)
        target_system: 데이터 타겟 시스템 정보 (dict)
        tables: 처리할 테이블 목록 (list of dict)
        default_args: DAG 기본 설정값
        
    Returns:
        생성된 DAG 객체
    """
    # DAG 객체 생성
    dag = DAG(
        dag_id=dag_id,
        default_args=default_args,
        description=f'{source_system["name"]}에서 {target_system["name"]}으로의 데이터 파이프라인',
        schedule_interval=schedule_interval,
        catchup=False,
    )
    
    # 시작 태스크 - 파이프라인 초기화
    start_pipeline = DummyOperator(
        task_id='start_pipeline',
        dag=dag,
    )
    
    # 소스 시스템 연결 설정 태스크
    def setup_source_connection(**kwargs):
        """소스 시스템 연결 설정 함수"""
        source_config = kwargs['source_system']
        print(f"{source_config['name']} 시스템 연결 설정 중...")
        # 실제로는 여기서 연결 객체 생성, 인증 등 수행
        return f"{source_config['name']} 연결 설정 완료"
    
    setup_source = PythonOperator(
        task_id='setup_source_connection',
        python_callable=setup_source_connection,
        op_kwargs={'source_system': source_system},
        dag=dag,
    )
    
    # 타겟 시스템 연결 설정 태스크
    def setup_target_connection(**kwargs):
        """타겟 시스템 연결 설정 함수"""
        target_config = kwargs['target_system']
        print(f"{target_config['name']} 시스템 연결 설정 중...")
        # 실제로는 여기서 연결 객체 생성, 인증 등 수행
        return f"{target_config['name']} 연결 설정 완료"
    
    setup_target = PythonOperator(
        task_id='setup_target_connection',
        python_callable=setup_target_connection,
        op_kwargs={'target_system': target_system},
        dag=dag,
    )
    
    # 연결 설정 태스크 의존성: 시작 -> 소스/타겟 연결 설정
    start_pipeline >> [setup_source, setup_target]
    
    # 테이블별 태스크 그룹 생성
    table_tasks = {}
    
    # 각 테이블에 대해 ETL 태스크 생성
    for table_config in tables:
        table_name = table_config['name']
        
        # 데이터 추출 함수
        def extract_table_data(table_config, **kwargs):
            """테이블 데이터 추출 함수"""
            print(f"테이블 {table_config['name']} 데이터 추출 중...")
            # 테이블 특성에 따른 추출 로직
            if table_config.get('incremental', False):
                print(f"증분 추출 모드 사용 (마지막 추출 이후)")
            else:
                print(f"전체 추출 모드 사용")
            # 실제로는 여기서 쿼리 실행, API 호출 등으로 데이터 추출
            return f"테이블 {table_config['name']} 추출 완료"
        
        # 데이터 변환 함수
        def transform_table_data(table_config, **kwargs):
            """테이블 데이터 변환 함수"""
            print(f"테이블 {table_config['name']} 데이터 변환 중...")
            # 테이블 특성에 따른 변환 로직
            transforms = table_config.get('transforms', [])
            for transform in transforms:
                print(f"변환 적용: {transform}")
            # 실제로는 여기서 데이터 정제, 변환, 집계 등 수행
            return f"테이블 {table_config['name']} 변환 완료"
        
        # 데이터 적재 함수
        def load_table_data(table_config, **kwargs):
            """테이블 데이터 적재 함수"""
            print(f"테이블 {table_config['name']} 데이터 적재 중...")
            # 테이블 특성에 따른 적재 로직
            load_mode = table_config.get('load_mode', 'append')
            print(f"적재 모드: {load_mode}")
            # 실제로는 여기서 DB 삽입, 파일 저장 등 수행
            return f"테이블 {table_config['name']} 적재 완료"
        
        # 테이블별 ETL 태스크 생성
        extract = PythonOperator(
            task_id=f'extract_{table_name}',
            python_callable=extract_table_data,
            op_kwargs={'table_config': table_config},
            dag=dag,
        )
        
        transform = PythonOperator(
            task_id=f'transform_{table_name}',
            python_callable=transform_table_data,
            op_kwargs={'table_config': table_config},
            dag=dag,
        )
        
        load = PythonOperator(
            task_id=f'load_{table_name}',
            python_callable=load_table_data,
            op_kwargs={'table_config': table_config},
            dag=dag,
        )
        
        # 테이블 태스크 그룹 저장
        table_tasks[table_name] = {
            'extract': extract,
            'transform': transform,
            'load': load
        }
        
        # 테이블 내 태스크 의존성: 추출 -> 변환 -> 적재
        extract >> transform >> load
        
        # 연결 설정 -> 테이블 처리 의존성
        [setup_source, setup_target] >> extract
    
    # 마무리 태스크 - 리소스 정리, 완료 알림 등
    end_pipeline = DummyOperator(
        task_id='end_pipeline',
        trigger_rule='all_done',  # 모든 테이블 처리가 완료되면 실행
        dag=dag,
    )
    
    # 모든 테이블 적재 완료 -> 마무리 태스크
    for table_name, tasks in table_tasks.items():
        tasks['load'] >> end_pipeline
    
    # 생성된 DAG 반환
    return dag

# 실제 DAG 생성 예시
# 소스 시스템 정보
postgresql_source = {
    "name": "PostgreSQL-Production",
    "type": "postgresql",
    "conn_id": "postgres_prod",
    "schema": "public"
}

# 타겟 시스템 정보
redshift_target = {
    "name": "Redshift-DWH",
    "type": "redshift",
    "conn_id": "redshift_dwh",
    "schema": "analytics"
}

# 처리할 테이블 목록
tables_to_process = [
    {
        "name": "customers",
        "incremental": True,
        "transforms": ["clean_addresses", "normalize_phone_numbers"],
        "load_mode": "upsert"
    },
    {
        "name": "orders",
        "incremental": True,
        "transforms": ["calculate_totals", "add_tax_info"],
        "load_mode": "append"
    },
    {
        "name": "products",
        "incremental": False,
        "transforms": ["standardize_categories"],
        "load_mode": "overwrite"
    }
]

# 팩토리 함수를 이용한 DAG 생성
sales_etl_dag = create_data_pipeline_dag(
    dag_id='sales_data_pipeline',
    schedule_interval='0 2 * * *',  # 매일 02:00에 실행
    source_system=postgresql_source,
    target_system=redshift_target,
    tables=tables_to_process
)

# DAG를 globals()에 등록 (Airflow가 인식할 수 있도록)
globals()['sales_data_pipeline'] = sales_etl_dag

📌 요약 및 모범 사례

✅ Branching, SubDAG, Dynamic DAG 사용 가이드라인

  1. Branching 활용
    • 조건에 따라 다른 처리가 필요한 경우 BranchPythonOperator 사용
    • 분기 이후 태스크에 적절한 trigger_rule 설정 필수
    • 복잡한 분기 로직은 함수로 모듈화하여 가독성 향상
  2. SubDAG 대신 TaskGroup 우선 고려
    • Airflow 2.0 이상에서는 TaskGroup 사용 권장
    • SubDAG를 사용할 경우 부모 DAG와 동일한 실행 주기 설정
    • 복잡한 SubDAG는 성능 이슈 발생 가능성 주의
  3. Dynamic DAG 모범 사례
    • 설정 파일(YAML, JSON)을 통한 관리로 코드 중복 최소화
    • 팩토리 함수 패턴 활용하여 DAG 생성 로직 재사용
    • globals() 딕셔너리에 DAG 등록 필수

✅ 실전 운영 팁

  1. 테스트 우선 개발
    • 복잡한 DAG는 개발 환경에서 충분히 테스트 후 적용
    • 단위 테스트 작성으로 각 분기 로직, 태스크 동작 검증
  2. 모니터링 및 알림 설정
    • 중요 분기점에 알림 설정으로 실행 경로 추적
    • Dynamic DAG에 태그 추가하여 관리 용이성 향상
  3. 문서화
    • DAG 설명에 분기 로직, 동적 생성 원리 명시
    • 복잡한 구조는 다이어그램으로 시각화

Summary

Airflow의 고급 DAG 작성 기법인 Branching, SubDAG, Dynamic DAG에 대해 살펴보았습니다.

  • Branching: BranchPythonOperator를 사용해 조건에 따라 워크플로우를 분기할 수 있습니다. 조건부 로직을 효과적으로 구현하기 위한 핵심 기법입니다.
  • SubDAG: 복잡한 워크플로우를 모듈화하여 재사용성을 높이는 방법입니다. 하지만 Airflow 2.0 이상에서는 성능 및 가시성 문제로 TaskGroup 사용이 권장됩니다.
  • Dynamic DAG: 외부 설정 파일이나 메타데이터를 기반으로 DAG를 동적으로 생성하는 기법입니다. 코드 중복을 줄이고 유지보수성을 높여줍니다.

이러한 고급 기법들을 활용하면 복잡한 비즈니스 로직을 Airflow에서 효과적으로 구현할 수 있습니다. 특히 실무에서는 반복적인 ETL 작업을 자동화하거나, 조건에 따라 다른 데이터 처리 경로를 구현하는 데 큰 도움이 됩니다.

효과적인 Airflow DAG 작성을 위해서는 기본 개념을 이해하고, 적절한 패턴을 선택하며, 코드 재사용성과 가독성을 항상 고려해야 합니다. 또한 테스트와 문서화를 통해 안정적인 워크플로우 운영이 가능해집니다.

728x90