이 글에서는 워크플로 자동화의 개념과 Apache Airflow가 등장하게 된 배경, 그리고 Airflow의 기본 개념에 대해 알아봅니다. 데이터 파이프라인 자동화의 필요성부터 Airflow가 제공하는 주요 기능까지, 초보자도 이해할 수 있도록 상세히 설명합니다.
📌 워크플로 자동화란 무엇인가?
✅ 워크플로 자동화의 정의
워크플로 자동화란 일련의 작업들을 사전에 정의된 규칙과 조건에 따라 자동으로 실행되도록 구성하는 것을 말합니다. 이는 반복적인 작업을 줄이고, 인적 오류를 최소화하며, 작업의 효율성과 신뢰성을 높이는 데 큰 도움이 됩니다.
▶️ 실무 예시: 매일 아침 데이터베이스에서 데이터를 추출하여 가공하고, 분석한 결과를 이메일로 보내는 작업을 생각해 보세요. 이 모든 과정을 매번 수동으로 처리한다면, 시간이 많이 소요될 뿐만 아니라 실수할 가능성도 있습니다.
✅ 워크플로 자동화의 이점
- 시간 절약: 반복적인 작업을 자동화하여 귀중한 인력 자원을 보다 가치 있는 업무에 집중시킬 수 있습니다.
- 일관성 유지: 작업이 항상 동일한 방식으로 처리되므로 결과의 일관성이 보장됩니다.
- 오류 감소: 수동 작업에서 발생할 수 있는 인적 오류를 최소화합니다.
- 확장성 향상: 작업량이 증가하더라도 시스템이 이를 효율적으로 처리할 수 있습니다.
- 추적 및 모니터링: 각 작업의 실행 상태와 결과를 쉽게 추적하고 모니터링할 수 있습니다.
✅ 데이터 엔지니어링에서의 워크플로 자동화
데이터 엔지니어링 분야에서는 ETL(Extract, Transform, Load) 프로세스나 데이터 파이프라인 관리를 위해 워크플로 자동화가 필수적입니다. 여러 시스템에서 데이터를 수집하고, 변환하여, 데이터 웨어하우스나 데이터 레이크에 적재하는 과정을 자동화함으로써 데이터의 신선도와 품질을 유지할 수 있습니다.
▶️ 기존 워크플로 도구의 한계: cron, Jenkins 등의 기존 도구들은 간단한 작업 스케줄링은 가능했지만, 복잡한 의존성 관리나 실패 처리, 모니터링 등의 기능이 부족했습니다.
📌 Airflow의 등장 배경과 필요성
✅ 기존 워크플로 도구의 한계
데이터 처리 파이프라인이 복잡해지면서 기존의 스케줄링 도구만으로는 다음과 같은 요구사항을 충족하기 어려워졌습니다:
- 복잡한 의존성 관리: 태스크 간의 복잡한 의존성을 정의하고 관리하는 기능
- 동적 워크플로 생성: 실행 시점에 조건에 따라 워크플로를 동적으로 생성하는 기능
- 세밀한 실패 처리: 특정 태스크가 실패했을 때 재시도하거나 대체 경로를 선택하는 기능
- 확장 가능한 구조: 다양한 실행 환경(로컬, 클라우드, 컨테이너 등)을 지원하는 확장 가능한 구조
- 시각적 모니터링: 복잡한 워크플로의 실행 상태를 직관적으로 모니터링할 수 있는 UI
✅ Airflow의 탄생
Apache Airflow는 2014년 Airbnb의 데이터 엔지니어인 Maxime Beauchemin에 의해 개발되었습니다. 당시 Airbnb는 복잡한 데이터 파이프라인을 관리하기 위한 도구가 필요했고, 이러한 필요성에 의해 Airflow가 탄생하게 되었습니다. 2016년에는 Apache 소프트웨어 재단의 인큐베이터 프로젝트로 승격되었고, 2019년에는 최상위 프로젝트(Top-Level Project)로 선정되었습니다.
✅ Airflow가 해결하고자 한 문제
Airflow는 다음과 같은 문제들을 해결하기 위해 설계되었습니다:
- 코드로서의 워크플로(Workflows as Code): 워크플로를 설정 파일이 아닌 Python 코드로 정의하여 유연성과 확장성을 제공
- 다양한 시스템 통합: 다양한 데이터 소스 및 시스템과의 연동을 위한 풍부한 연산자(Operator) 제공
- 시각적 모니터링 및 관리: 웹 UI를 통한 직관적인 모니터링 및 관리 기능
- 확장 가능한 아키텍처: 분산 실행을 지원하는 확장 가능한 아키텍처
- 커뮤니티 중심 개발: 활발한 오픈소스 커뮤니티를 통한 지속적인 발전
📌 Airflow의 핵심 개념
✅ DAG(Directed Acyclic Graph)
DAG는 Airflow의 가장 핵심적인 개념으로, 작업 흐름을 방향성 있는 비순환 그래프로 나타낸 것입니다.
- 방향성(Directed): 각 작업 간의 관계가 방향을 가집니다 (A → B).
- 비순환(Acyclic): 사이클이 발생하지 않습니다. 즉, A → B → C → A와 같은 순환 구조가 없습니다.
- 그래프(Graph): 노드(작업)와 엣지(의존성)로 구성된 그래프 구조입니다.
# DAG 정의 예시
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# DAG의 기본 인자 설정
default_args = {
'owner': 'airflow', # 소유자 정보
'depends_on_past': False, # 이전 실행 결과에 의존하지 않음
'start_date': datetime(2023, 1, 1), # 시작 날짜
'email': ['alert@example.com'], # 알림을 받을 이메일
'email_on_failure': True, # 실패 시 이메일 발송
'email_on_retry': False, # 재시도 시 이메일 발송 안 함
'retries': 1, # 실패 시 재시도 횟수
'retry_delay': timedelta(minutes=5), # 재시도 간 대기 시간
}
# DAG 생성
dag = DAG(
'sample_data_pipeline', # DAG ID
default_args=default_args, # 기본 인자
description='샘플 데이터 파이프라인', # 설명
schedule_interval=timedelta(days=1), # 실행 주기 (일 단위)
catchup=False # 과거 실행 건너뛰기
)
✅ Task (작업)
Task는 DAG 내에서 실행되는 개별 작업 단위입니다. 각 Task는 다음과 같은 특성을 가집니다:
- 하나의 작업(예: 데이터 추출, 변환, 적재 등)을 담당
- 다른 Task와의 의존성을 가질 수 있음
- 다양한 Operator를 통해 구현됨
# Task 정의 예시
def extract_data():
# 데이터 추출 로직
print("데이터 추출 중...")
return {"extracted_data": "sample_data"}
def transform_data(**context):
# 이전 태스크에서 전달받은 데이터 활용
ti = context['ti']
extracted_data = ti.xcom_pull(task_ids='extract_task')
# 데이터 변환 로직
print("데이터 변환 중...")
transformed_data = extracted_data["extracted_data"].upper()
return {"transformed_data": transformed_data}
def load_data(**context):
# 변환된 데이터 활용
ti = context['ti']
transformed_data = ti.xcom_pull(task_ids='transform_task')
# 데이터 적재 로직
print(f"데이터 적재 중: {transformed_data['transformed_data']}")
# Task 생성
extract_task = PythonOperator(
task_id='extract_task', # Task ID
python_callable=extract_data, # 실행할 함수
dag=dag, # 속한 DAG
)
transform_task = PythonOperator(
task_id='transform_task', # Task ID
python_callable=transform_data, # 실행할 함수
provide_context=True, # 컨텍스트 제공
dag=dag, # 속한 DAG
)
load_task = PythonOperator(
task_id='load_task', # Task ID
python_callable=load_data, # 실행할 함수
provide_context=True, # 컨텍스트 제공
dag=dag, # 속한 DAG
)
# Task 간 의존성 설정
extract_task >> transform_task >> load_task # extract → transform → load 순서로 실행
✅ Operator (연산자)
Operator는 Task를 구현하기 위한 템플릿으로, 특정 작업을 수행하는 방법을 정의합니다. Airflow는 다양한 Operator를 제공합니다:
- BashOperator: Bash 명령어를 실행
- PythonOperator: Python 함수를 실행
- SQLOperator: SQL 쿼리를 실행
- EmailOperator: 이메일을 발송
- SimpleHttpOperator: HTTP 요청을 수행
- DockerOperator: Docker 컨테이너를 실행
- KubernetesOperator: Kubernetes Pod를 실행
▶️ 예시: BashOperator
from airflow.operators.bash_operator import BashOperator
# Bash 명령어를 실행하는 Task
create_directory = BashOperator(
task_id='create_directory', # Task ID
bash_command='mkdir -p /tmp/data', # 실행할 Bash 명령어
dag=dag, # 속한 DAG
)
✅ Scheduler (스케줄러)
Scheduler는 정의된 DAG를 주기적으로 실행하는 역할을 담당합니다. 주요 기능은 다음과 같습니다:
- DAG 파일을 주기적으로 검사하여 변경 사항을 반영
- 각 DAG의 스케줄에 따라 DAG 실행을 트리거
- 의존성에 따라 Task의 실행 순서를 결정
▶️ 스케줄링 표현식: Airflow는 cron 표현식과 유사한 형태로 실행 주기를 정의할 수 있습니다.
# 다양한 스케줄링 표현식 예시
dag_daily = DAG(
'daily_dag',
schedule_interval='@daily', # 매일 자정에 실행
# ...
)
dag_workdays = DAG(
'workday_dag',
schedule_interval='0 8 * * 1-5', # 평일 오전 8시에 실행 (cron 표현식)
# ...
)
dag_monthly = DAG(
'monthly_dag',
schedule_interval='@monthly', # 매월 1일에 실행
# ...
)
✅ Executor (실행자)
Executor는 Task를 실제로 실행하는 주체입니다. Airflow는 다양한 Executor를 제공하여 실행 환경에 맞게 선택할 수 있습니다:
- SequentialExecutor: 가장 기본적인 실행자로, 한 번에 하나의 Task만 실행 (기본값, SQLite와 함께 사용)
- LocalExecutor: 로컬 환경에서 병렬로 Task를 실행
- CeleryExecutor: Celery를 통해 분산 환경에서 Task를 실행
- KubernetesExecutor: Kubernetes 위에서 각 Task를 Pod로 실행
- DaskExecutor: Dask 분산 시스템을 통해 Task를 실행
# airflow.cfg 에서 Executor 설정 예시
[core]
executor = LocalExecutor # 로컬 환경에서 병렬 실행
📌 Airflow의 주요 특징
✅ Python 기반 워크플로 정의
Airflow의 가장 큰 특징 중 하나는 워크플로를 Python 코드로 정의할 수 있다는 점입니다. 이를 통해 다음과 같은 이점을 얻을 수 있습니다:
- 버전 관리: Git과 같은 버전 관리 시스템을 통해 워크플로 변경 사항을 추적
- 동적 워크플로: 실행 시점의 조건에 따라 동적으로 워크플로를 생성
- 재사용성: 공통 로직을 모듈화하여 여러 워크플로에서 재사용
- 테스트 용이성: 단위 테스트 및 통합 테스트를 통한 워크플로 검증
▶️ 동적 워크플로 예시:
# 동적으로 DAG 생성하는 예시
for region in ['us', 'eu', 'asia']:
dag_id = f'data_processing_{region}'
globals()[dag_id] = DAG(
dag_id,
default_args=default_args,
schedule_interval='@daily',
# ...
)
# 각 지역별 데이터 처리 Task 정의
# ...
✅ 풍부한 UI와 모니터링 기능
Airflow는 사용자 친화적인 웹 인터페이스를 제공하여 워크플로의 상태를 시각적으로 모니터링할 수 있습니다:
- DAG 시각화: 복잡한 워크플로를 그래프 형태로 시각화
- 실행 이력: 과거 실행 이력 및 로그 조회
- Task 상태 추적: 각 Task의 실행 상태 및 진행 상황 추적
- 변수 및 연결 관리: 워크플로에서 사용하는 변수 및 외부 시스템 연결 관리
✅ 확장성 및 유연성
Airflow는 다양한 환경 및 시스템과 통합할 수 있는 확장 가능한 구조를 가지고 있습니다:
- 다양한 Executor: 단일 머신부터 분산 환경까지 다양한 실행 환경 지원
- 풍부한 Operator: 다양한 시스템 및 서비스와의 통합을 위한 Operator 제공
- 플러그인 시스템: 사용자 정의 Operator, Hook, 인터페이스 확장 가능
- REST API: 프로그래밍 방식으로 Airflow 제어 가능
▶️ 커스텀 Operator 예시:
# 커스텀 Operator 정의 예시
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class MyCustomOperator(BaseOperator):
"""
커스텀 로직을 수행하는 Operator
"""
@apply_defaults
def __init__(self, my_param, *args, **kwargs):
super().__init__(*args, **kwargs)
self.my_param = my_param
def execute(self, context):
"""
Operator가 실행될 때 호출되는 메서드
"""
print(f"커스텀 Operator 실행 중: {self.my_param}")
# 커스텀 로직 구현
return "커스텀 Operator 실행 결과"
✅ 강력한 오류 처리 및 재시도 메커니즘
Airflow는 워크플로 실행 중 발생할 수 있는 오류를 효과적으로 처리할 수 있는 메커니즘을 제공합니다:
- 자동 재시도: Task 실패 시 사용자 정의 재시도 정책에 따라 자동으로 재시도
- 실패 알림: 이메일, Slack 등을 통한 실패 알림
- 오류 원인 분석: 상세 로그 및 오류 메시지를 통한 원인 분석
- SLA 모니터링: Service Level Agreement 설정 및 모니터링
# 재시도 및 SLA 설정 예시
default_args = {
# ... 기타 인자들 ...
'retries': 3, # 최대 3번 재시도
'retry_delay': timedelta(minutes=5), # 재시도 간 5분 대기
'retry_exponential_backoff': True, # 지수 백오프 적용 (5분, 10분, 20분...)
'email_on_failure': True, # 최종 실패 시 이메일 알림
}
# SLA 설정
task_with_sla = PythonOperator(
task_id='time_sensitive_task',
python_callable=my_function,
sla=timedelta(hours=1), # 1시간 내 완료되어야 함
dag=dag,
)
📌 Airflow 사용 사례
✅ 데이터 파이프라인 자동화
가장 일반적인 Airflow 사용 사례로, 데이터 수집부터 변환, 적재까지의 전 과정을 자동화합니다:
- 데이터 소스(DB, API, 파일 등)에서 데이터 추출
- 데이터 변환 및 가공
- 데이터 웨어하우스나 데이터 레이크에 적재
- 품질 검사 및 검증
▶️ 실제 사례: Netflix는 Airflow를 활용하여 콘텐츠 추천 시스템에 필요한 데이터 파이프라인을 구축했습니다. 사용자 활동 데이터를 수집하고 처리하여 추천 모델을 주기적으로 업데이트합니다.
✅ 기계 학습 워크플로 관리
기계 학습 모델의 훈련, 평가, 배포 과정을 자동화합니다:
- 훈련 데이터 준비 및 전처리
- 모델 훈련 및 하이퍼파라미터 튜닝
- 모델 평가 및 검증
- 모델 배포 및 모니터링
✅ 정기 보고서 생성 자동화
비즈니스 인텔리전스 및 데이터 분석 작업을 자동화합니다:
- 데이터 소스에서 최신 데이터 추출
- 보고서 생성 및 시각화
- 이메일, 대시보드 등을 통한 배포
✅ 인프라 자동화
클라우드 리소스 관리 및 인프라 작업을 자동화합니다:
- 리소스 프로비저닝 및 관리
- 백업 및 복구 작업
- 스케일링 및 최적화 작업
▶️ 실제 사례: Airbnb는 Airflow를 통해 AWS 인프라의 자동 스케일링 및 최적화 작업을 자동화하여 비용을 절감하고 효율성을 높였습니다.
Summary
- 워크플로 자동화는 일련의 작업을 자동으로 실행하여 시간 절약, 일관성 유지, 오류 감소, 확장성 향상, 추적 및 모니터링을 가능하게 합니다.
- Apache Airflow는 기존 워크플로 도구의 한계를 극복하기 위해 Airbnb에서 개발된 오픈소스 워크플로 관리 도구입니다.
- Airflow의 핵심 개념인 **DAG(Directed Acyclic Graph)**는 방향성 있는 비순환 그래프로 워크플로를 표현합니다.
- Task는 DAG 내에서 실행되는 개별 작업 단위이며, 다양한 Operator를 통해 구현됩니다.
- Scheduler는 DAG를 주기적으로 실행하고, Executor는 Task를 실제로 실행하는 역할을 담당합니다.
- Airflow는 Python 기반 워크플로 정의, 풍부한 UI와 모니터링 기능, 확장성 및 유연성, 강력한 오류 처리 및 재시도 메커니즘을 제공합니다.
- Airflow는 데이터 파이프라인 자동화, 기계 학습 워크플로 관리, 정기 보고서 생성, 인프라 자동화 등 다양한 영역에서 활용됩니다.