728x90
이 글에서 다루는 개념
Python에서는 Apache Airflow를 사용하여 작업 자동화 및 데이터 파이프라인을 관리할 수 있습니다.
이번 글에서는 다음 내용을 학습합니다.
- Airflow 개념 및 아키텍처
- Airflow 설치 및 실행 방법
- DAG(Directed Acyclic Graph) 개념 및 기본 작성법
- 태스크(Task) 및 연산자(Operator) 활용
- 크론(Cron) 스케줄링을 활용한 자동 실행
1️⃣ Apache Airflow란?
📌 Apache Airflow란?
- 데이터 파이프라인 및 작업 자동화를 위한 워크플로우 관리 도구
- DAG(Directed Acyclic Graph) 개념을 기반으로 태스크 실행 흐름을 제어
- 스케줄링 기능을 통해 주기적인 작업 자동화 가능
📌 Airflow 주요 개념
개념 | 설명 |
DAG | 작업 흐름을 정의하는 그래프 |
Task | 개별 실행 단위 (Python 함수, Shell 명령 등) |
Operator | Task를 실행하는 방법 (PythonOperator, BashOperator 등) |
Scheduler | DAG을 실행할 시점을 결정 |
Executor | 태스크를 실행하는 실행 환경 |
2️⃣ Apache Airflow 설치 및 실행
📌 Airflow 설치 방법 (Docker 사용)
pip install apache-airflow
📌 Airflow 초기화 및 실행
airflow db init # 데이터베이스 초기화
airflow users create -u admin -p admin -r Admin -e admin@example.com # 관리자 계정 생성
airflow webserver --port 8080 # 웹 UI 실행
airflow scheduler # 스케줄러 실행
📌 Airflow 웹 UI 접속
- 브라우저에서 http://localhost:8080 열기
3️⃣ DAG(Directed Acyclic Graph) 개념 및 기본 작성법
📌 DAG란?
- 작업(Task) 간의 의존 관계를 정의하는 그래프
- 각 태스크가 방향성을 가지며 순환되지 않는 구조(Acyclic)
📌 기본 DAG 예제 (example_dag.py)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# DAG 기본 설정
default_args = {
"owner": "airflow",
"start_date": datetime(2024, 3, 1),
}
# DAG 정의
dag = DAG(
"example_dag",
default_args=default_args,
schedule_interval="@daily",
)
# Python 함수 정의
def hello():
print("Hello, Airflow!")
# Task 생성
task_hello = PythonOperator(
task_id="hello_task",
python_callable=hello,
dag=dag,
)
task_hello # DAG에 태스크 등록
📌 코드 설명
- DAG() → DAG 객체 생성 (schedule_interval="@daily": 매일 실행)
- PythonOperator() → Python 함수를 실행하는 Task 생성
- task_hello → DAG 내에서 실행할 태스크
📌 DAG 등록 경로
- 생성한 DAG 파일을 Airflow의 dags/ 폴더에 저장
mv example_dag.py ~/airflow/dags/
4️⃣ 다양한 연산자(Operator) 활용
Airflow에서는 다양한 Operator를 제공하여 여러 작업을 자동화할 수 있습니다.
🔹 PythonOperator (Python 코드 실행)
from airflow.operators.python import PythonOperator
def print_message():
print("PythonOperator 실행!")
task_python = PythonOperator(
task_id="python_task",
python_callable=print_message,
dag=dag,
)
🔹 BashOperator (터미널 명령 실행)
from airflow.operators.bash import BashOperator
task_bash = BashOperator(
task_id="bash_task",
bash_command="echo 'Hello from BashOperator'",
dag=dag,
)
🔹 BranchPythonOperator (조건 분기)
from airflow.operators.python import BranchPythonOperator
def choose_branch():
return "task_a" if datetime.now().day % 2 == 0 else "task_b"
branch_task = BranchPythonOperator(
task_id="branch_task",
python_callable=choose_branch,
dag=dag,
)
📌 조건에 따라 task_a 또는 task_b 실행
5️⃣ DAG 스케줄링 (크론 스케줄러 활용)
📌 Airflow 스케줄링 주기 설정
설정 값 | 설명 |
@daily | 매일 실행 |
@hourly | 매시간 실행 |
@weekly | 매주 실행 |
@monthly | 매월 실행 |
cron 표현식 | 0 6 * * * → 매일 오전 6시 실행 |
\📌 크론 표현식으로 매일 오전 6시에 실행하는 DAG
dag = DAG(
"scheduled_dag",
default_args=default_args,
schedule_interval="0 6 * * *", # 매일 오전 6시 실행
)
📌 실전 문제: Airflow DAG 작성 및 실행
✅ 문제 1: Airflow DAG을 생성하고 Python 함수 실행하기
📌 새로운 DAG을 생성하여 print("Hello Airflow!")를 실행하세요.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# 🔽 여기에 코드 작성
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
default_args = {"owner": "airflow", "start_date": datetime(2024, 3, 1)}
dag = DAG("hello_airflow", default_args=default_args, schedule_interval="@daily")
def print_message():
print("Hello Airflow!")
task = PythonOperator(
task_id="print_task",
python_callable=print_message,
dag=dag,
)
✅ 문제 2: BashOperator를 사용하여 echo 명령 실행
📌 BashOperator를 사용하여 "Airflow에서 BashOperator 실행!"을 출력하세요.
from airflow.operators.bash import BashOperator
# 🔽 여기에 코드 작성
from airflow.operators.bash import BashOperator
task_bash = BashOperator(
task_id="bash_task",
bash_command="echo 'Airflow에서 BashOperator 실행!'",
dag=dag,
)
✅ 문제 3: BranchPythonOperator를 사용하여 짝수/홀수 날짜 분기 처리
📌 현재 날짜가 짝수면 task_even, 홀수면 task_odd를 실행하도록 DAG을 설정하세요.
from airflow.operators.python import BranchPythonOperator
# 🔽 여기에 코드 작성
from airflow.operators.python import BranchPythonOperator
def choose_task():
return "task_even" if datetime.now().day % 2 == 0 else "task_odd"
branch_task = BranchPythonOperator(
task_id="branch_task",
python_callable=choose_task,
dag=dag,
)
728x90
'Data Engineering > python' 카테고리의 다른 글
EP22 | 고급 Python 활용 #11 | Spark Streaming을 활용한 실시간 데이터 처리 (0) | 2025.03.19 |
---|---|
EP21 | 고급 Python 활용 #10 | Spark와 Python을 활용한 대용량 데이터 처리 (0) | 2025.03.19 |
EP19 | 고급 Python 활용 #8 | 웹 스크래핑 (BeautifulSoup, Selenium) (2) | 2025.03.19 |
EP18 | 고급 Python 활용 #7 | API 데이터 활용 (REST API, JSON 처리) (0) | 2025.03.19 |
EP17 | 고급 Python 활용 #6 | Pandas를 활용한 고급 데이터 분석 (0) | 2025.03.19 |