Data Engineering/Airflow

[Airflow 가이드 ep.6] 2부 DAG 작성 #1 | DAG 생성 기초: 기본 구조와 파라미터 이해

ygtoken 2025. 3. 25. 15:54
728x90

이 글에서는 Airflow DAG 파일을 작성하는 기본 방법에 대해 알아봅니다. DAG 파일의 기본 구조와 필수 파라미터들을 상세히 살펴보고, 실제 예제를 통해 DAG 작성법을 익혀보겠습니다. 초보자도 쉽게 이해할 수 있도록 각 요소를 하나씩 설명합니다.


📌 DAG 파일의 기본 구조

DAG 파일이란?

DAG(Directed Acyclic Graph) 파일은 Airflow에서 워크플로우를 정의하는 Python 파일입니다. 이 파일은 작업(Task)과 작업 간의 의존성(Dependencies)을 정의하여 워크플로우의 실행 순서와 방식을 결정합니다.

DAG 파일은 기본적으로 다음과 같은 구조를 갖습니다:

  1. 필요한 라이브러리 임포트
  2. 기본 인자(default_args) 정의
  3. DAG 객체 인스턴스 생성
  4. Task 정의
  5. Task 간 의존성 설정

DAG 파일의 기본 구조

 

가장 간단한 DAG 파일 예제

다음은 가장 기본적인 DAG 파일의 예시입니다:

# 필요한 라이브러리 임포트
# datetime: 날짜와 시간 처리를 위한 Python 표준 라이브러리
# DAG: Airflow의 핵심 클래스로 워크플로우를 정의
# BashOperator: 배시 명령어를 실행하는 연산자
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

# DAG의 기본 인자 설정 - 모든 태스크에 공통으로 적용될 파라미터
default_args = {
    'owner': 'airflow',                    # DAG의 소유자 (책임자)
    'depends_on_past': False,              # 이전 실행 결과에 의존하지 않음
    'start_date': datetime(2023, 1, 1),    # DAG 시작 날짜 - 스케줄링 시작점
    'email': ['alert@example.com'],        # 알림을 받을 이메일 주소
    'email_on_failure': True,              # 실패 시 이메일 알림 활성화
    'email_on_retry': False,               # 재시도 시 이메일 알림 비활성화
    'retries': 1,                          # 실패 시 재시도 횟수
    'retry_delay': timedelta(minutes=5),   # 재시도 간격 (5분)
}

# DAG 객체 생성
dag = DAG(
    'simple_example',                      # DAG ID - Airflow UI에서 표시되는 고유 식별자
    default_args=default_args,             # 위에서 정의한 기본 인자 적용
    description='A simple example DAG',    # DAG 설명
    schedule_interval='0 0 * * *',         # 실행 주기 - 매일 자정 (cron 표현식)
    catchup=False,                         # 과거 실행 건너뛰기 (시작 날짜부터 현재까지 실행하지 않음)
)

# 첫 번째 태스크 정의 - 현재 날짜 출력
t1 = BashOperator(
    task_id='print_date',                  # 태스크 ID - 이 DAG 내에서 고유한 식별자
    bash_command='date',                   # 실행할 Bash 명령어
    dag=dag,                               # 이 태스크가 속한 DAG
)

# 두 번째 태스크 정의 - 대기 후 메시지 출력
t2 = BashOperator(
    task_id='sleep_and_echo',              # 태스크 ID
    bash_command='sleep 5 && echo "Finished sleeping!"',  # 5초 대기 후 메시지 출력
    dag=dag,                               # 이 태스크가 속한 DAG
)

# 태스크 간 의존성 설정 - t1 실행 후 t2 실행
t1 >> t2  # 비트시프트 연산자를 사용한 의존성 설정 (t1 -> t2)

위 예제는 매우 간단한 DAG로, 두 개의 Bash 명령어를 순차적으로 실행합니다. 첫 번째 태스크는 현재 날짜를 출력하고, 두 번째 태스크는 5초 대기 후 메시지를 출력합니다.


📌 DAG 파라미터 상세 설명

default_args (기본 인자)

default_args는 DAG 내의 모든 태스크에 공통으로 적용되는 기본 인자들을 정의하는 딕셔너리입니다. 이를 통해 반복적인 코드를 줄이고 일관성을 유지할 수 있습니다.

주요 default_args 파라미터:

# DAG의 기본 인자를 상세하게 설정한 예시
default_args = {
    # 태스크 소유자 - 책임자 또는 팀 이름을 지정 (UI에서 필터링 가능)
    'owner': 'data_team',
    
    # 이전 실행에 의존 여부 - True일 경우 이전 실행이 성공해야만 현재 실행이 시작됨
    'depends_on_past': False,
    
    # 시작 날짜 - DAG 스케줄링을 시작할 시점
    # 과거 날짜로 설정 시 catchup 설정에 따라 누락된 실행을 따라잡을 수 있음
    'start_date': datetime(2023, 1, 1),
    
    # 종료 날짜(선택사항) - 이 날짜 이후에는 DAG가 스케줄링되지 않음
    'end_date': datetime(2023, 12, 31),
    
    # 이메일 알림 설정
    'email': ['team@example.com', 'admin@example.com'],
    'email_on_failure': True,      # 태스크 실패 시 이메일 발송
    'email_on_retry': False,       # 태스크 재시도 시 이메일 발송하지 않음
    
    # 재시도 설정
    'retries': 3,                  # 최대 재시도 횟수
    'retry_delay': timedelta(minutes=5),  # 재시도 간 대기 시간
    'retry_exponential_backoff': True,    # 지수적으로 재시도 간격 증가 (5분, 10분, 20분...)
    'max_retry_delay': timedelta(minutes=60),  # 최대 재시도 간격
    
    # 실행 제한 설정
    'execution_timeout': timedelta(hours=2),   # 최대 실행 시간 (2시간 초과 시 실패)
    
    # 우선순위 가중치 - 높을수록 우선 순위가 높음 (기본값: 1)
    'priority_weight': 10,
    
    # 큐 이름 - CeleryExecutor 사용 시 특정 워커 큐에 태스크 할당
    'queue': 'high_priority',
    
    # 풀 이름 - 동시 실행을 제한하기 위한 그룹
    'pool': 'default_pool',
    
    # 태그 - UI에서 필터링 및 그룹화에 사용
    'tags': ['example', 'tutorial', 'daily'],
}

 

DAG 파라미터

DAG 객체 생성 시 사용할 수 있는 주요 파라미터들입니다:

# DAG 객체 생성 시 다양한 파라미터 설정 예시
dag = DAG(
    # DAG ID - Airflow 내에서 고유한 식별자
    # 알파벳, 숫자, 밑줄, 점, 하이픈만 사용 가능
    'detailed_example_dag',
    
    # 기본 인자 - 위에서 정의한 default_args 적용
    default_args=default_args,
    
    # 설명 - UI에 표시되는 DAG 설명
    description='A more detailed example DAG with various parameters',
    
    # 스케줄 간격 - DAG 실행 주기 설정
    # cron 형식 또는 내장 매크로 사용 가능
    schedule_interval='0 9 * * 1-5',  # 평일 오전 9시에 실행
    # 또는 다음과 같은 미리 정의된 간격 사용 가능:
    # schedule_interval='@daily'      # 매일 자정
    # schedule_interval='@hourly'     # 매시간
    # schedule_interval='@weekly'     # 매주 일요일 자정
    # schedule_interval='@monthly'    # 매월 1일 자정
    # schedule_interval='@once'       # 한 번만 실행
    # schedule_interval=None          # 수동으로만 트리거
    # schedule_interval=timedelta(days=1)  # timedelta 객체 사용
    
    # 캐치업 설정 - 과거 실행 처리 방식
    # True면 start_date부터 현재까지 누락된 모든 실행을 따라잡음
    # False면 과거 실행을 무시하고 다음 스케줄부터 실행
    catchup=False,
    
    # 태그 - UI에서 필터링에 사용
    tags=['example', 'documentation'],
    
    # DAG 타임존 - 기본값은 시스템 타임존 또는 airflow.cfg의 설정값
    dagrun_timeout=timedelta(hours=1),  # DAG 실행 최대 시간
    
    # 생성 시 일시 중지 상태로 설정
    is_paused_upon_creation=True,
    
    # 최대 활성 실행 수 - 동시에 실행될 수 있는 최대 DAG 인스턴스 수
    max_active_runs=1,
    
    # 문서화 - UI에 표시될 마크다운 형식의 문서
    doc_md="""
    # 자세한 예제 DAG
    
    이 DAG는 다양한 파라미터 설정을 보여주기 위한 예제입니다.
    
    ## 기능
    - 다양한 스케줄링 옵션
    - 상세한 재시도 정책
    - 문서화 기능
    
    ## 사용법
    이 DAG는 매일 아침 9시에 자동으로 실행됩니다.
    """,
)

 

schedule_interval 설정 방법

schedule_interval은 DAG가 실행되는 주기를 정의합니다. 다양한 방식으로 설정할 수 있습니다:

  1. Cron 표현식:
    • 표준 Unix cron 형식을 사용합니다.
    • 형식: 분 시 일 월 요일
    # Cron 표현식 예시
    schedule_interval='0 9 * * 1-5'     # 평일(월-금) 오전 9시
    schedule_interval='0 */2 * * *'     # 2시간마다 (0, 2, 4, ..., 22시 정각)
    schedule_interval='0 0 1 * *'       # 매월 1일 자정
    schedule_interval='0 0 * * 0'       # 매주 일요일 자정
    schedule_interval='30 9 * * 1'      # 매주 월요일 9시 30분
    
  2. 미리 정의된 매크로:
    • Airflow에서 제공하는, 자주 사용되는 간격에 대한 단축어입니다.
    schedule_interval='@hourly'         # 매시간 시작 (0분)
    schedule_interval='@daily'          # 매일 자정
    schedule_interval='@weekly'         # 매주 일요일 자정
    schedule_interval='@monthly'        # 매월 1일 자정
    schedule_interval='@yearly'         # 매년 1월 1일 자정
    schedule_interval='@once'           # 한 번만 실행
    schedule_interval=None              # 수동 트리거만 가능
    
  3. timedelta 객체:
    • Python의 timedelta를 사용한 상대적인 시간 간격입니다.
    from datetime import timedelta
    
    schedule_interval=timedelta(days=1)    # 24시간마다
    schedule_interval=timedelta(hours=6)   # 6시간마다
    schedule_interval=timedelta(minutes=30)  # 30분마다
    

중요한 점: Airflow에서 schedule_interval은 실제로 스케줄링된 시간 이후에 실행됩니다. 즉, @daily로 설정된 DAG는 다음 날 자정 이후에 트리거됩니다.

 

start_date와 end_date 설정

start_date와 end_date는 DAG의 실행 기간을 제어합니다:

# 시작 날짜와 종료 날짜 설정 예시
from datetime import datetime, timezone

# UTC 시간으로 시작 날짜 설정
start_date = datetime(2023, 1, 1, tzinfo=timezone.utc)

# 로컬 시간대 사용 (timezone 라이브러리 사용)
from pendulum import timezone
local_tz = timezone('Asia/Seoul')
start_date = datetime(2023, 1, 1, tzinfo=local_tz)

# 종료 날짜 설정 (선택사항)
end_date = datetime(2023, 12, 31, tzinfo=timezone.utc)

# DAG에 적용
dag = DAG(
    'scheduled_example',
    schedule_interval='@daily',
    start_date=start_date,
    end_date=end_date,
    catchup=False
)

 

시간대(timezone) 관련 주의사항:

  • 시간대를 명시적으로 지정하지 않으면 시스템 기본 시간대가 사용됩니다.
  • 프로덕션 환경에서는 혼란을 방지하기 위해 항상 UTC 또는 특정 시간대를 명시적으로 지정하는 것이 좋습니다.
  • 여러 지역에 걸친 팀이 사용하는 경우, UTC를 표준으로 사용하는 것이 혼란을 줄일 수 있습니다.

 

📌 Task 생성과 의존성 설정

 

다양한 Operator를 사용한 Task 생성

Airflow는 다양한 유형의 작업을 수행할 수 있는 여러 Operator를 제공합니다:

# 다양한 Operator를 사용한 태스크 생성 예시
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

# 시작 태스크 (빈 태스크)
start = EmptyOperator(
    task_id='start',        # 태스크 ID
    dag=dag,                # 속한 DAG
)

# Bash 명령어 실행 태스크
bash_task = BashOperator(
    task_id='run_bash_command',
    bash_command='echo "Current date: $(date)" > /tmp/date.txt',  # 실행할 Bash 명령
    dag=dag,
)

# Python 함수 실행 태스크
def process_data(**context):
    """데이터 처리 함수"""
    # context를 통해 Airflow의 다양한 메타데이터와 변수에 접근 가능
    execution_date = context['execution_date']
    print(f"Processing data for: {execution_date}")
    return "Data processed successfully!"

python_task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,  # 실행할 Python 함수
    provide_context=True,          # 컨텍스트 정보 전달
    dag=dag,
)

# HTTP 요청 태스크
http_task = SimpleHttpOperator(
    task_id='get_api_data',
    http_conn_id='api_connection',  # Connection 이름 (Airflow UI에서 정의)
    endpoint='/api/data',           # API 엔드포인트
    method='GET',                   # HTTP 메서드
    headers={"Content-Type": "application/json"},  # 요청 헤더
    response_check=lambda response: len(response.json()) > 0,  # 응답 검증 함수
    dag=dag,
)

# SQL 쿼리 실행 태스크
sql_task = PostgresOperator(
    task_id='create_table',
    postgres_conn_id='postgres_default',  # Connection 이름
    sql='''
    CREATE TABLE IF NOT EXISTS example_table (
        id SERIAL PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    ''',                                  # 실행할 SQL 쿼리
    dag=dag,
)

# 종료 태스크 (빈 태스크)
end = EmptyOperator(
    task_id='end',
    dag=dag,
)

 

Task 의존성 설정 방법

Task 간의 의존성(실행 순서)을 설정하는 여러 방법이 있습니다:

# 태스크 의존성 설정 예시

# 1. 비트시프트 연산자 사용 (가장 일반적인 방법)
# A >> B 는 "A 다음에 B 실행" 의미
start >> bash_task >> python_task >> end

# 2. set_upstream / set_downstream 메서드 사용
start.set_downstream(bash_task)  # start 다음에 bash_task 실행
python_task.set_upstream(bash_task)  # bash_task 다음에 python_task 실행
end.set_upstream(python_task)  # python_task 다음에 end 실행

# 3. 목록을 사용한 병렬 실행 정의
# 다중 의존성 설정 (병렬 실행)
start >> [bash_task, http_task] >> sql_task >> end
# 위 코드는 다음과 같이 해석됩니다:
# - start가 완료된 후
# - bash_task와 http_task가 병렬로 실행됨
# - 두 태스크가 모두 완료된 후 sql_task 실행
# - sql_task가 완료된 후 end 실행

# 4. 교차 의존성 설정
task_a >> task_b
task_a >> task_c
task_b >> task_d
task_c >> task_d
# 위 코드는 다음 구조를 생성합니다:
#     task_a
#    /      \
# task_b    task_c
#    \      /
#     task_d

 

Task 의존성 설정 방법

 


📌 실용적인 DAG 작성 예제

이제 실무에서 자주 사용되는 패턴을 적용한 완전한 DAG 예제를 살펴보겠습니다:

 

ETL 워크플로우 예제

 

데이터 ETL 워크플로우 예제

다음은 데이터를 추출(Extract), 변환(Transform), 적재(Load)하는 일반적인 ETL 파이프라인의 예시입니다:

# ETL(Extract, Transform, Load) 파이프라인 DAG 예제

# 필요한 라이브러리 및 모듈 임포트
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
import json
import csv

# 기본 인자 설정
default_args = {
    'owner': 'data_engineering',           # 소유자/팀 이름
    'depends_on_past': False,              # 이전 실행에 의존하지 않음
    'start_date': datetime(2023, 1, 1),    # 시작 날짜
    'email': ['alerts@example.com'],       # 알림 이메일
    'email_on_failure': True,              # 실패 시 이메일 발송
    'email_on_retry': False,               # 재시도 시 이메일 발송 안 함
    'retries': 2,                          # 최대 재시도 횟수
    'retry_delay': timedelta(minutes=5),   # 재시도 간격
    'execution_timeout': timedelta(hours=1),  # 실행 제한 시간
}

# DAG 정의
dag = DAG(
    'user_data_etl_pipeline',                    # DAG ID
    default_args=default_args,                   # 기본 인자 적용
    description='A daily ETL pipeline for user data',  # 설명
    schedule_interval='0 2 * * *',               # 매일 오전 2시 실행
    catchup=False,                               # 과거 실행 건너뛰기
    tags=['etl', 'user_data', 'daily'],          # 태그
    doc_md="""
    # User Data ETL Pipeline
    
    이 DAG는 외부 API에서 사용자 데이터를 가져와 변환 후 데이터베이스에 적재하는 ETL 프로세스를 수행합니다.
    
    ## 프로세스 단계:
    1. 데이터베이스 테이블 생성/확인
    2. API에서 사용자 데이터 추출
    3. 데이터 변환 및 정제
    4. 데이터베이스에 적재
    5. 결과 검증 및 알림
    
    ## 스케줄:
    - 매일 오전 2시(UTC) 실행
    """,
)

# 1. 시작 태스크
start = EmptyOperator(
    task_id='start_pipeline',
    dag=dag,
)

# 2. 데이터베이스 테이블 생성/확인
create_table = PostgresOperator(
    task_id='create_users_table_if_not_exists',
    postgres_conn_id='postgres_dwh',    # Airflow UI에서 설정한 연결 ID
    sql='''
    CREATE TABLE IF NOT EXISTS users (
        id SERIAL PRIMARY KEY,
        user_id VARCHAR(50) NOT NULL UNIQUE,
        name VARCHAR(100),
        email VARCHAR(100),
        signup_date DATE,
        last_login TIMESTAMP,
        status VARCHAR(20),
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    ''',
    dag=dag,
)

# 3. API에서 데이터 추출
extract_data = SimpleHttpOperator(
    task_id='extract_user_data_from_api',
    http_conn_id='user_api',            # Airflow UI에서 설정한 연결 ID
    endpoint='/api/users',
    method='GET',
    headers={"Content-Type": "application/json", "Authorization": "Bearer {{ var.value.api_token }}"},  # 변수 사용
    response_filter=lambda response: json.loads(response.text),  # JSON 응답 파싱
    log_response=True,
    dag=dag,
)

# 4. 데이터 변환 함수
def transform_user_data(**context):
    """
    API에서 가져온 사용자 데이터를 변환하는 함수
    - 필요 없는 필드 제거
    - 날짜 형식 변환
    - 상태 표준화
    """
    # xcom에서 이전 태스크의 결과 가져오기
    ti = context['ti']
    extracted_data = ti.xcom_pull(task_ids='extract_user_data_from_api')
    
    if not extracted_data or 'users' not in extracted_data:
        raise ValueError("No user data found in the API response")
    
    transformed_users = []
    for user in extracted_data['users']:
        # 필요한 필드만 선택
        transformed_user = {
            'user_id': user['id'],
            'name': f"{user.get('first_name', '')} {user.get('last_name', '')}".strip(),
            'email': user.get('email', '').lower(),
            'signup_date': user.get('signup_date', '').split('T')[0],  # 시간 부분 제거
            'last_login': user.get('last_login'),
            'status': user.get('status', 'unknown').lower(),  # 상태 소문자로 표준화
        }
        transformed_users.append(transformed_user)
    
    # 결과를 임시 CSV 파일로 저장 (다음 태스크에서 사용)
    import csv
    import os
    
    # 실행 날짜를 파일 이름에 포함
    execution_date = context['execution_date'].strftime('%Y%m%d')
    output_file = f"/tmp/transformed_users_{execution_date}.csv"
    
    with open(output_file, 'w', newline='') as csvfile:
        fieldnames = ['user_id', 'name', 'email', 'signup_date', 'last_login', 'status']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(transformed_users)
    
    return output_file  # 파일 경로 반환

# 변환 태스크 정의
transform_data = PythonOperator(
    task_id='transform_user_data',
    python_callable=transform_user_data,
    provide_context=True,
    dag=dag,
)

# 5. 데이터베이스에 적재
def load_data_to_db(**context):
   """변환된 데이터를 데이터베이스에 적재하는 함수"""
   import psycopg2
   from airflow.hooks.postgres_hook import PostgresHook
   
   # 이전 태스크에서 생성된 CSV 파일 경로 가져오기
   ti = context['ti']
   csv_file_path = ti.xcom_pull(task_ids='transform_user_data')
   
   if not csv_file_path or not os.path.exists(csv_file_path):
       raise ValueError(f"CSV file not found: {csv_file_path}")
   
   # PostgreSQL 연결
   pg_hook = PostgresHook(postgres_conn_id='postgres_dwh')
   conn = pg_hook.get_conn()
   cursor = conn.cursor()
   
   # CSV 파일에서 데이터 읽기
   with open(csv_file_path, 'r') as f:
       reader = csv.DictReader(f)
       inserted_count = 0
       updated_count = 0
       
       for row in reader:
           # UPSERT 쿼리 (이미 존재하면 업데이트, 없으면 삽입)
           query = """
           INSERT INTO users (user_id, name, email, signup_date, last_login, status, updated_at)
           VALUES (%s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
           ON CONFLICT (user_id) 
           DO UPDATE SET 
               name = EXCLUDED.name,
               email = EXCLUDED.email,
               last_login = EXCLUDED.last_login,
               status = EXCLUDED.status,
               updated_at = CURRENT_TIMESTAMP;
           """
           
           # 쿼리 실행
           cursor.execute(
               query, 
               (
                   row['user_id'], 
                   row['name'], 
                   row['email'], 
                   row['signup_date'], 
                   row['last_login'], 
                   row['status']
               )
           )
           
           # 영향받은 행 수로 삽입/업데이트 확인
           if cursor.rowcount == 1:
               inserted_count += 1
           else:
               updated_count += 1
   
   # 트랜잭션 커밋
   conn.commit()
   
   # 처리 결과 기록 및 반환
   result = f"Processed {inserted_count + updated_count} users: {inserted_count} inserted, {updated_count} updated"
   print(result)
   return result

# 데이터 적재 태스크
load_data = PythonOperator(
   task_id='load_data_to_postgres',
   python_callable=load_data_to_db,
   provide_context=True,
   dag=dag,
)

# 6. 데이터 검증 - 적재된 사용자 수 확인
validate_data = PostgresOperator(
   task_id='validate_loaded_data',
   postgres_conn_id='postgres_dwh',
   sql="""
   SELECT COUNT(*) AS total_users,
          COUNT(CASE WHEN updated_at > NOW() - INTERVAL '1 day' THEN 1 END) AS updated_today
   FROM users;
   """,
   dag=dag,
)

# 7. 정리 작업 - 임시 파일 삭제
cleanup = BashOperator(
   task_id='cleanup_temp_files',
   bash_command='rm -f /tmp/transformed_users_*.csv',
   dag=dag,
)

# 8. 종료 태스크
end = EmptyOperator(
   task_id='end_pipeline',
   dag=dag,
)

# 태스크 의존성 설정 - 워크플로우 정의
start >> create_table >> extract_data >> transform_data >> load_data >> validate_data >> cleanup >> end

 

위 예제는 실제 데이터 파이프라인에서 흔히 볼 수 있는 ETL 프로세스를 구현한 것입니다. API에서 사용자 데이터를 가져와 변환하고 데이터베이스에 적재하는 일련의 과정을 담고 있습니다.


📌, 실무 DAG 작성 모범 사례

코드 구조화 및 관리 팁

더 나은 DAG 코드를 작성하기 위한 몇 가지 모범 사례를 알아보겠습니다:

  1. 모듈화 및 함수 분리
# helper_functions.py - 별도 모듈로 분리된 헬퍼 함수들
def extract_data_from_api(api_endpoint, headers):
    """API에서 데이터 추출 함수"""
    # 구현 내용...
    pass

def transform_data(raw_data):
    """데이터 변환 함수"""
    # 구현 내용...
    pass

def load_data_to_database(transformed_data, conn_id):
    """데이터베이스 적재 함수"""
    # 구현 내용...
    pass

# dag_file.py - DAG 정의 파일
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from helper_functions import extract_data_from_api, transform_data, load_data_to_database

# DAG 정의 및 태스크 생성...
  1. 설정 분리 및 환경 변수 활용
# config.py - 설정 파일
# 환경별 설정 정보를 분리하여 관리
ENVIRONMENTS = {
    'dev': {
        'api_base_url': 'https://dev-api.example.com',
        'db_conn_id': 'postgres_dev',
        'email_on_failure': False,
    },
    'prod': {
        'api_base_url': 'https://api.example.com',
        'db_conn_id': 'postgres_prod',
        'email_on_failure': True,
    }
}

# 현재 환경 설정 가져오기
import os
ENV = os.getenv('AIRFLOW_ENV', 'dev')  # 환경 변수로 환경 설정, 기본값은 'dev'
CONFIG = ENVIRONMENTS[ENV]

# dag_file.py에서 사용
from config import CONFIG

default_args = {
    # ...
    'email_on_failure': CONFIG['email_on_failure'],
    # ...
}

# API 태스크에서 설정 사용
extract_task = SimpleHttpOperator(
    # ...
    http_conn_id=CONFIG['api_conn_id'],
    # ...
)
  1. 일관된 명명 규칙
# 좋은 명명 규칙 예시
dag_id = 'department_data_source_frequency'  # 부서_데이터소스_주기
# 예: marketing_salesforce_daily, finance_sap_monthly

# 태스크 명명 규칙: 동사_명사 형식
task_extract = PythonOperator(
    task_id='extract_salesforce_data',
    # ...
)

task_transform = PythonOperator(
    task_id='transform_customer_records',
    # ...
)

task_load = PythonOperator(
    task_id='load_into_warehouse',
    # ...
)
  1. 문서화 및 주석
# docstring과 주석을 활용한 문서화
"""
마케팅 데이터 ETL 파이프라인

이 DAG는 Salesforce에서 마케팅 캠페인 데이터를 추출하여 
데이터 웨어하우스에 적재하는 일일 ETL 작업을 수행합니다.

Owner: Marketing Data Team
SLA: 매일 오전 9시까지 완료되어야 함
"""

# DAG 문서화
dag = DAG(
    'marketing_salesforce_daily',
    default_args=default_args,
    description='Daily Salesforce marketing campaign data ETL',
    doc_md="""
    # 마케팅 Salesforce ETL
    
    ## 데이터 소스
    - Salesforce API
    - 마케팅 캠페인 데이터
    
    ## 목적지
    - 데이터 웨어하우스 'marketing_campaigns' 테이블
    
    ## 처리 단계
    1. Salesforce API에서 캠페인 데이터 추출
    2. 날짜 형식 변환 및 불필요 필드 제거
    3. 데이터 웨어하우스에 적재
    """,
    # ...
)
  1. 오류 처리 및 로깅
def process_data(**context):
    """데이터 처리 함수 - 오류 처리 및 로깅 포함"""
    import logging
    logger = logging.getLogger(__name__)
    
    try:
        # 주요 로직 시작 로그
        logger.info("Starting data processing...")
        
        # 데이터 가져오기
        ti = context['ti']
        data = ti.xcom_pull(task_ids='extract_data')
        
        if not data:
            # 데이터 없음 오류 기록
            logger.error("No data received from extract task")
            raise ValueError("No data available for processing")
        
        # 데이터 처리 로직
        logger.info(f"Processing {len(data)} records")
        result = []
        for item in data:
            try:
                # 각 레코드 처리
                processed = transform_record(item)
                result.append(processed)
            except Exception as e:
                # 개별 레코드 처리 오류는 기록하고 계속 진행
                logger.warning(f"Error processing record {item.get('id')}: {str(e)}")
                continue
        
        # 결과 반환
        logger.info(f"Processed {len(result)} records successfully")
        return result
        
    except Exception as e:
        # 치명적 오류 기록 및 전파
        logger.error(f"Critical error in data processing: {str(e)}")
        # 이메일 알림 등 추가 조치 가능
        raise

 

대규모 DAG 관리 전략

실제 프로덕션 환경에서는 수백 개의 DAG와 수천 개의 태스크를 관리해야 할 수 있습니다. 이런 상황을 위한 전략은 다음과 같습니다:

  1. DAG 팩토리 패턴

여러 비슷한 DAG를 생성해야 할 경우 팩토리 패턴을 사용할 수 있습니다:

# DAG 팩토리 패턴 예시
def create_data_pipeline_dag(
    source_system,
    target_system,
    schedule,
    start_date,
    owner='data_team',
    email=None,
):
    """데이터 파이프라인 DAG 생성 팩토리 함수"""
    
    dag_id = f"{source_system}_to_{target_system}_{schedule.replace('@', '')}"
    
    default_args = {
        'owner': owner,
        'depends_on_past': False,
        'start_date': start_date,
        'email': email or [f"{owner}@example.com"],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    dag = DAG(
        dag_id,
        default_args=default_args,
        description=f'Data pipeline from {source_system} to {target_system}',
        schedule_interval=schedule,
        catchup=False,
    )
    
    # 소스 시스템에 따른 추출 태스크 생성
    if source_system == 'mysql':
        extract = MySqlOperator(
            task_id='extract_data',
            mysql_conn_id=f'{source_system}_conn',
            sql=f'SELECT * FROM {source_system}_table',
            dag=dag,
        )
    elif source_system == 'api':
        extract = SimpleHttpOperator(
            task_id='extract_data',
            http_conn_id=f'{source_system}_conn',
            endpoint='/data',
            method='GET',
            dag=dag,
        )
    # 다른 소스 시스템에 대한 조건 추가...
    
    # 변환 태스크
    transform = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data,
        op_kwargs={'source': source_system, 'target': target_system},
        dag=dag,
    )
    
    # 대상 시스템에 따른 적재 태스크 생성
    if target_system == 'postgres':
        load = PostgresOperator(
            task_id='load_data',
            postgres_conn_id=f'{target_system}_conn',
            sql=f'INSERT INTO {target_system}_table VALUES...',
            dag=dag,
        )
    elif target_system == 'bigquery':
        load = BigQueryOperator(
            task_id='load_data',
            bigquery_conn_id=f'{target_system}_conn',
            sql=f'INSERT INTO {target_system}_table VALUES...',
            dag=dag,
        )
    # 다른 대상 시스템에 대한 조건 추가...
    
    # 태스크 의존성 설정
    extract >> transform >> load
    
    return dag

# 팩토리 함수를 사용한 여러 DAG 생성
mysql_to_postgres_daily = create_data_pipeline_dag(
    source_system='mysql',
    target_system='postgres',
    schedule='@daily',
    start_date=datetime(2023, 1, 1),
    owner='data_eng',
)

api_to_bigquery_weekly = create_data_pipeline_dag(
    source_system='api',
    target_system='bigquery',
    schedule='@weekly',
    start_date=datetime(2023, 1, 1),
    owner='analytics',
)

# 필요한 만큼 더 추가...
  1. SubDAG 활용 (참고: SubDAG는 병렬성 문제로 권장되지 않을 수 있으나, 특정 상황에서는 유용)
# SubDAG 활용 예시
from airflow.operators.subdag import SubDagOperator

def create_subdag(parent_dag_id, child_dag_id, start_date, schedule_interval):
    """반복적인 태스크 그룹을 SubDAG로 추출"""
    
    subdag = DAG(
        dag_id=f'{parent_dag_id}.{child_dag_id}',
        default_args={'start_date': start_date},
        schedule_interval=schedule_interval,
    )
    
    # SubDAG 내부 태스크 정의
    task1 = EmptyOperator(
        task_id='task1',
        dag=subdag,
    )
    
    task2 = EmptyOperator(
        task_id='task2',
        dag=subdag,
    )
    
    task1 >> task2
    
    return subdag

# 메인 DAG에서 SubDAG 사용
main_dag = DAG(
    'main_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
)

start = EmptyOperator(
    task_id='start',
    dag=main_dag,
)

# SubDAG 연산자
processing = SubDagOperator(
    task_id='processing_tasks',
    subdag=create_subdag('main_dag', 'processing_tasks', 
                        datetime(2023, 1, 1), '@daily'),
    dag=main_dag,
)

end = EmptyOperator(
    task_id='end',
    dag=main_dag,
)

start >> processing >> end
  1. 태스크 그룹 활용 (Airflow 2.0 이상에서 권장)
# 태스크 그룹 활용 예시
from airflow.utils.task_group import TaskGroup

with DAG('task_group_example', ...) as dag:
    
    start = EmptyOperator(task_id='start')
    
    # 태스크 그룹 정의
    with TaskGroup(group_id='process_data') as process_group:
        task1 = EmptyOperator(task_id='extract')
        task2 = EmptyOperator(task_id='transform')
        task3 = EmptyOperator(task_id='validate')
        
        task1 >> task2 >> task3
    
    # 다른 태스크 그룹
    with TaskGroup(group_id='load_data') as load_group:
        task4 = EmptyOperator(task_id='stage')
        task5 = EmptyOperator(task_id='load')
        
        task4 >> task5
    
    end = EmptyOperator(task_id='end')
    
    # 그룹 간 의존성 설정
    start >> process_group >> load_group >> end

Summary

  • DAG 파일은 Airflow 워크플로우를 정의하는 Python 파일로, 태스크와 태스크 간 의존성을 정의합니다.
  • 기본 구조는 라이브러리 임포트, 기본 인자(default_args) 정의, DAG 객체 생성, Task 정의, 의존성 설정으로 이루어집니다.
  • DAG 파라미터에는 dag_id, schedule_interval, start_date, catchup, tags 등 다양한 옵션이 있으며, 이를 통해 DAG의 동작을 제어할 수 있습니다.
  • Task는 BashOperator, PythonOperator, HttpOperator 등 다양한 Operator를 사용하여 생성할 수 있으며, 실제 작업을 수행하는 단위입니다.
  • 의존성 설정은 비트시프트 연산자(>>), set_upstream/set_downstream 메서드, 또는 목록을 사용하여 Task 간 실행 순서를 정의합니다.
  • 모범 사례로는 모듈화, 설정 분리, 일관된 명명 규칙, 문서화, 오류 처리 등이 있으며, 대규모 DAG 관리를 위해 팩토리 패턴, TaskGroup 등을 활용할 수 있습니다.

이 글에서는 Airflow DAG 작성의 기초를 다루었습니다. 다음 글에서는 더 복잡한 Task 의존성 설정과 트리거 규칙에 대해 알아보겠습니다.

 

728x90