Data Engineering/Data Infra & Process

[13편] 운영 자동화 (Airflow & Kubernetes)

ygtoken 2025. 3. 7. 17:14
728x90

 

이 글에서는 PostgreSQL pgvector 기반 벡터 검색 시스템을 Airflow와 Kubernetes를 활용하여 자동화하는 방법을 다룹니다.

특히, Airflow로 벡터 데이터를 자동 업데이트하고, Kubernetes CronJob & HPA(Auto Scaling)를 적용하여 운영을 최적화하는 실무적인 방법을 정리합니다.

 

Airflow를 활용한 벡터 데이터 자동 업데이트 및 관리

Kubernetes CronJob을 활용한 주기적 벡터 데이터 백업 & 최적화

HPA(Auto Scaling) 적용으로 AI 검색 시스템의 자동 확장

 


🚀 1. Airflow를 활용한 벡터 데이터 자동 업데이트

 

🔹 1️⃣ Airflow를 활용하는 이유

 

✅ AI 검색 시스템에서 벡터 데이터는 지속적으로 추가 & 업데이트됩니다.

Airflow를 활용하면 벡터 임베딩 생성, 데이터 삽입, 정리 등의 작업을 자동화할 수 있습니다.

스케줄링을 통해 특정 주기에 맞춰 벡터 데이터를 동기화할 수 있습니다.

 


🔹 2️⃣ 벡터 데이터 업데이트 DAG 구축

 

📌 Airflow DAG (vector_update_dag.py)

이 DAG는 주기적으로 새로운 데이터를 OpenAI API로 임베딩 변환 후 PostgreSQL에 삽입하는 역할을 합니다.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import psycopg2
import openai

# Airflow DAG 기본 설정
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 3, 15),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'pgvector_update',
    default_args=default_args,
    description='Update pgvector embeddings in PostgreSQL',
    schedule_interval='0 */6 * * *',  # 6시간마다 실행
    catchup=False
)

# PostgreSQL 연결 정보
DB_HOST = "postgresql.database.svc.cluster.local"
DB_PORT = "5432"
DB_NAME = "ragdb"
DB_USER = "postgres"
DB_PASSWORD = "postgresql"

# OpenAI API 키 설정
openai.api_key = "your-openai-api-key"

# 벡터 데이터 업데이트 함수
def update_embeddings():
    conn = psycopg2.connect(
        host=DB_HOST, port=DB_PORT,
        database=DB_NAME, user=DB_USER, password=DB_PASSWORD
    )
    cur = conn.cursor()

    # 벡터 업데이트할 새로운 데이터 조회
    cur.execute("SELECT id, content FROM embeddings WHERE embedding IS NULL LIMIT 10;")
    rows = cur.fetchall()

    for row in rows:
        text_id, content = row
        embedding = openai.Embedding.create(input=content, model="text-embedding-ada-002")['data'][0]['embedding']

        # 벡터 데이터 업데이트
        cur.execute("UPDATE embeddings SET embedding = %s WHERE id = %s;", (embedding, text_id))

    conn.commit()
    cur.close()
    conn.close()

# Airflow Task
update_task = PythonOperator(
    task_id='update_vector_embeddings',
    python_callable=update_embeddings,
    dag=dag,
)

update_task

📌 DAG 설명

schedule_interval='0 */6 * * *'6시간마다 실행

OpenAI API를 호출하여 벡터 데이터를 업데이트

embedding IS NULL인 데이터를 조회하여 벡터 생성 후 DB 업데이트

 

이제 Airflow가 자동으로 벡터 데이터를 업데이트합니다.

 


🚀 2. Kubernetes CronJob을 활용한 주기적 벡터 데이터 백업 & 최적화

 

🔹 1️⃣ Kubernetes CronJob을 활용하는 이유

 

✅ 벡터 데이터는 AI 모델 학습을 위해 정기적으로 백업해야 합니다.

✅ 데이터 크기가 커지므로 주기적으로 인덱스를 재구성하여 검색 성능을 유지해야 합니다.

 


🔹 2️⃣ 벡터 데이터 백업 CronJob

 

📌 Kubernetes backup-cronjob.yaml

apiVersion: batch/v1
kind: CronJob
metadata:
  name: pgvector-backup
  namespace: database
spec:
  schedule: "0 3 * * *"  # 매일 새벽 3시에 실행
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: pgvector-backup
            image: postgres:14
            command:
            - "/bin/sh"
            - "-c"
            - "pg_dump -U postgres -h postgresql.database.svc.cluster.local -d ragdb | gzip > /backup/ragdb_backup.sql.gz"
            volumeMounts:
            - name: backup-volume
              mountPath: /backup
          restartPolicy: OnFailure
          volumes:
          - name: backup-volume
            persistentVolumeClaim:
              claimName: backup-pvc

📌 CronJob 적용

kubectl apply -f backup-cronjob.yaml -n database

이제 PostgreSQL 벡터 데이터가 매일 새벽 자동으로 백업됩니다.

 


🔹 3️⃣ 벡터 데이터 인덱스 최적화 CronJob

 

📌 Kubernetes index-rebuild-cronjob.yaml

apiVersion: batch/v1
kind: CronJob
metadata:
  name: pgvector-index-rebuild
  namespace: database
spec:
  schedule: "0 4 * * *"  # 매일 새벽 4시에 실행
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: pgvector-index-rebuild
            image: postgres:14
            command:
            - "/bin/sh"
            - "-c"
            - "psql -U postgres -h postgresql.database.svc.cluster.local -d ragdb -c 'REINDEX TABLE embeddings;'"
          restartPolicy: OnFailure

📌 CronJob 적용

kubectl apply -f index-rebuild-cronjob.yaml -n database

벡터 인덱스가 매일 새벽 자동으로 재구성됩니다.

 


🚀 3. HPA(Auto Scaling) 적용으로 AI 검색 시스템의 자동 확장

 

AI 검색 시스템은 사용량이 급증할 경우 자동으로 확장(Auto Scaling)되어야 합니다.

 

📌 HPA를 적용하여 CPU 사용량이 70%를 초과하면 자동 확장

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: fastapi-vector-search-hpa
  namespace: fastapi
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: fastapi-vector-search
  minReplicas: 1
  maxReplicas: 5
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

 

📌 HPA 적용

kubectl apply -f hpa.yaml -n fastapi

트래픽이 증가하면 FastAPI 벡터 검색 서비스가 자동으로 확장됩니다.

 


📌 4. 최종 정리

 

Airflow를 활용한 벡터 데이터 자동 업데이트 및 관리

Kubernetes CronJob을 활용한 주기적 벡터 데이터 백업 & 최적화

HPA(Auto Scaling) 적용으로 AI 검색 시스템의 자동 확장

728x90