Data Engineering/python

EP20 | 고급 Python 활용 #9 | 데이터 자동화 및 작업 스케줄링 (Airflow)

ygtoken 2025. 3. 19. 23:06
728x90

이 글에서 다루는 개념

Python에서는 Apache Airflow를 사용하여 작업 자동화 및 데이터 파이프라인을 관리할 수 있습니다.
이번 글에서는 다음 내용을 학습합니다.

  • Airflow 개념 및 아키텍처
  • Airflow 설치 및 실행 방법
  • DAG(Directed Acyclic Graph) 개념 및 기본 작성법
  • 태스크(Task) 및 연산자(Operator) 활용
  • 크론(Cron) 스케줄링을 활용한 자동 실행

1️⃣ Apache Airflow란?

📌 Apache Airflow란?

  • 데이터 파이프라인 및 작업 자동화를 위한 워크플로우 관리 도구
  • DAG(Directed Acyclic Graph) 개념을 기반으로 태스크 실행 흐름을 제어
  • 스케줄링 기능을 통해 주기적인 작업 자동화 가능

📌 Airflow 주요 개념

 

개념 설명
DAG 작업 흐름을 정의하는 그래프
Task 개별 실행 단위 (Python 함수, Shell 명령 등)
Operator Task를 실행하는 방법 (PythonOperator, BashOperator 등)
Scheduler DAG을 실행할 시점을 결정
Executor 태스크를 실행하는 실행 환경

2️⃣ Apache Airflow 설치 및 실행

📌 Airflow 설치 방법 (Docker 사용)

pip install apache-airflow

 

📌 Airflow 초기화 및 실행

airflow db init   # 데이터베이스 초기화
airflow users create -u admin -p admin -r Admin -e admin@example.com  # 관리자 계정 생성
airflow webserver --port 8080  # 웹 UI 실행
airflow scheduler  # 스케줄러 실행

 

📌 Airflow 웹 UI 접속

  • 브라우저에서 http://localhost:8080 열기

3️⃣ DAG(Directed Acyclic Graph) 개념 및 기본 작성법

📌 DAG란?

  • 작업(Task) 간의 의존 관계를 정의하는 그래프
  • 각 태스크가 방향성을 가지며 순환되지 않는 구조(Acyclic)

📌 기본 DAG 예제 (example_dag.py)

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# DAG 기본 설정
default_args = {
    "owner": "airflow",
    "start_date": datetime(2024, 3, 1),
}

# DAG 정의
dag = DAG(
    "example_dag",
    default_args=default_args,
    schedule_interval="@daily",
)

# Python 함수 정의
def hello():
    print("Hello, Airflow!")

# Task 생성
task_hello = PythonOperator(
    task_id="hello_task",
    python_callable=hello,
    dag=dag,
)

task_hello  # DAG에 태스크 등록

 

📌 코드 설명

  • DAG() → DAG 객체 생성 (schedule_interval="@daily": 매일 실행)
  • PythonOperator() → Python 함수를 실행하는 Task 생성
  • task_hello → DAG 내에서 실행할 태스크

📌 DAG 등록 경로

  • 생성한 DAG 파일을 Airflow의 dags/ 폴더에 저장
mv example_dag.py ~/airflow/dags/

4️⃣ 다양한 연산자(Operator) 활용

Airflow에서는 다양한 Operator를 제공하여 여러 작업을 자동화할 수 있습니다.

🔹 PythonOperator (Python 코드 실행)

from airflow.operators.python import PythonOperator

def print_message():
    print("PythonOperator 실행!")

task_python = PythonOperator(
    task_id="python_task",
    python_callable=print_message,
    dag=dag,
)

🔹 BashOperator (터미널 명령 실행)

from airflow.operators.bash import BashOperator

task_bash = BashOperator(
    task_id="bash_task",
    bash_command="echo 'Hello from BashOperator'",
    dag=dag,
)

🔹 BranchPythonOperator (조건 분기)

from airflow.operators.python import BranchPythonOperator

def choose_branch():
    return "task_a" if datetime.now().day % 2 == 0 else "task_b"

branch_task = BranchPythonOperator(
    task_id="branch_task",
    python_callable=choose_branch,
    dag=dag,
)

📌 조건에 따라 task_a 또는 task_b 실행


5️⃣ DAG 스케줄링 (크론 스케줄러 활용)

📌 Airflow 스케줄링 주기 설정

 

설정 값 설명
@daily 매일 실행
@hourly 매시간 실행
@weekly 매주 실행
@monthly 매월 실행
cron 표현식 0 6 * * * → 매일 오전 6시 실행

\📌 크론 표현식으로 매일 오전 6시에 실행하는 DAG

dag = DAG(
    "scheduled_dag",
    default_args=default_args,
    schedule_interval="0 6 * * *",  # 매일 오전 6시 실행
)

📌 실전 문제: Airflow DAG 작성 및 실행


문제 1: Airflow DAG을 생성하고 Python 함수 실행하기

📌 새로운 DAG을 생성하여 print("Hello Airflow!")를 실행하세요.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# 🔽 여기에 코드 작성
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

default_args = {"owner": "airflow", "start_date": datetime(2024, 3, 1)}

dag = DAG("hello_airflow", default_args=default_args, schedule_interval="@daily")

def print_message():
    print("Hello Airflow!")

task = PythonOperator(
    task_id="print_task",
    python_callable=print_message,
    dag=dag,
)

문제 2: BashOperator를 사용하여 echo 명령 실행

📌 BashOperator를 사용하여 "Airflow에서 BashOperator 실행!"을 출력하세요.

from airflow.operators.bash import BashOperator
# 🔽 여기에 코드 작성
from airflow.operators.bash import BashOperator

task_bash = BashOperator(
    task_id="bash_task",
    bash_command="echo 'Airflow에서 BashOperator 실행!'",
    dag=dag,
)

문제 3: BranchPythonOperator를 사용하여 짝수/홀수 날짜 분기 처리

📌 현재 날짜가 짝수면 task_even, 홀수면 task_odd를 실행하도록 DAG을 설정하세요.

from airflow.operators.python import BranchPythonOperator
# 🔽 여기에 코드 작성
from airflow.operators.python import BranchPythonOperator

def choose_task():
    return "task_even" if datetime.now().day % 2 == 0 else "task_odd"

branch_task = BranchPythonOperator(
    task_id="branch_task",
    python_callable=choose_task,
    dag=dag,
)

 

 

728x90