이 글에서는 PostgreSQL pgvector 기반 벡터 검색 시스템을 Airflow와 Kubernetes를 활용하여 자동화하는 방법을 다룹니다.
특히, Airflow로 벡터 데이터를 자동 업데이트하고, Kubernetes CronJob & HPA(Auto Scaling)를 적용하여 운영을 최적화하는 실무적인 방법을 정리합니다.
✅ Airflow를 활용한 벡터 데이터 자동 업데이트 및 관리
✅ Kubernetes CronJob을 활용한 주기적 벡터 데이터 백업 & 최적화
✅ HPA(Auto Scaling) 적용으로 AI 검색 시스템의 자동 확장
🚀 1. Airflow를 활용한 벡터 데이터 자동 업데이트
🔹 1️⃣ Airflow를 활용하는 이유
✅ AI 검색 시스템에서 벡터 데이터는 지속적으로 추가 & 업데이트됩니다.
✅ Airflow를 활용하면 벡터 임베딩 생성, 데이터 삽입, 정리 등의 작업을 자동화할 수 있습니다.
✅ 스케줄링을 통해 특정 주기에 맞춰 벡터 데이터를 동기화할 수 있습니다.
🔹 2️⃣ 벡터 데이터 업데이트 DAG 구축
📌 Airflow DAG (vector_update_dag.py)
이 DAG는 주기적으로 새로운 데이터를 OpenAI API로 임베딩 변환 후 PostgreSQL에 삽입하는 역할을 합니다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import psycopg2
import openai
# Airflow DAG 기본 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 3, 15),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'pgvector_update',
default_args=default_args,
description='Update pgvector embeddings in PostgreSQL',
schedule_interval='0 */6 * * *', # 6시간마다 실행
catchup=False
)
# PostgreSQL 연결 정보
DB_HOST = "postgresql.database.svc.cluster.local"
DB_PORT = "5432"
DB_NAME = "ragdb"
DB_USER = "postgres"
DB_PASSWORD = "postgresql"
# OpenAI API 키 설정
openai.api_key = "your-openai-api-key"
# 벡터 데이터 업데이트 함수
def update_embeddings():
conn = psycopg2.connect(
host=DB_HOST, port=DB_PORT,
database=DB_NAME, user=DB_USER, password=DB_PASSWORD
)
cur = conn.cursor()
# 벡터 업데이트할 새로운 데이터 조회
cur.execute("SELECT id, content FROM embeddings WHERE embedding IS NULL LIMIT 10;")
rows = cur.fetchall()
for row in rows:
text_id, content = row
embedding = openai.Embedding.create(input=content, model="text-embedding-ada-002")['data'][0]['embedding']
# 벡터 데이터 업데이트
cur.execute("UPDATE embeddings SET embedding = %s WHERE id = %s;", (embedding, text_id))
conn.commit()
cur.close()
conn.close()
# Airflow Task
update_task = PythonOperator(
task_id='update_vector_embeddings',
python_callable=update_embeddings,
dag=dag,
)
update_task
📌 DAG 설명
• schedule_interval='0 */6 * * *' → 6시간마다 실행
• OpenAI API를 호출하여 벡터 데이터를 업데이트
• embedding IS NULL인 데이터를 조회하여 벡터 생성 후 DB 업데이트
✅ 이제 Airflow가 자동으로 벡터 데이터를 업데이트합니다.
🚀 2. Kubernetes CronJob을 활용한 주기적 벡터 데이터 백업 & 최적화
🔹 1️⃣ Kubernetes CronJob을 활용하는 이유
✅ 벡터 데이터는 AI 모델 학습을 위해 정기적으로 백업해야 합니다.
✅ 데이터 크기가 커지므로 주기적으로 인덱스를 재구성하여 검색 성능을 유지해야 합니다.
🔹 2️⃣ 벡터 데이터 백업 CronJob
📌 Kubernetes backup-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: pgvector-backup
namespace: database
spec:
schedule: "0 3 * * *" # 매일 새벽 3시에 실행
jobTemplate:
spec:
template:
spec:
containers:
- name: pgvector-backup
image: postgres:14
command:
- "/bin/sh"
- "-c"
- "pg_dump -U postgres -h postgresql.database.svc.cluster.local -d ragdb | gzip > /backup/ragdb_backup.sql.gz"
volumeMounts:
- name: backup-volume
mountPath: /backup
restartPolicy: OnFailure
volumes:
- name: backup-volume
persistentVolumeClaim:
claimName: backup-pvc
📌 CronJob 적용
kubectl apply -f backup-cronjob.yaml -n database
✅ 이제 PostgreSQL 벡터 데이터가 매일 새벽 자동으로 백업됩니다.
🔹 3️⃣ 벡터 데이터 인덱스 최적화 CronJob
📌 Kubernetes index-rebuild-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: pgvector-index-rebuild
namespace: database
spec:
schedule: "0 4 * * *" # 매일 새벽 4시에 실행
jobTemplate:
spec:
template:
spec:
containers:
- name: pgvector-index-rebuild
image: postgres:14
command:
- "/bin/sh"
- "-c"
- "psql -U postgres -h postgresql.database.svc.cluster.local -d ragdb -c 'REINDEX TABLE embeddings;'"
restartPolicy: OnFailure
📌 CronJob 적용
kubectl apply -f index-rebuild-cronjob.yaml -n database
✅ 벡터 인덱스가 매일 새벽 자동으로 재구성됩니다.
🚀 3. HPA(Auto Scaling) 적용으로 AI 검색 시스템의 자동 확장
AI 검색 시스템은 사용량이 급증할 경우 자동으로 확장(Auto Scaling)되어야 합니다.
📌 HPA를 적용하여 CPU 사용량이 70%를 초과하면 자동 확장
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: fastapi-vector-search-hpa
namespace: fastapi
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: fastapi-vector-search
minReplicas: 1
maxReplicas: 5
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
📌 HPA 적용
kubectl apply -f hpa.yaml -n fastapi
✅ 트래픽이 증가하면 FastAPI 벡터 검색 서비스가 자동으로 확장됩니다.
📌 4. 최종 정리
✅ Airflow를 활용한 벡터 데이터 자동 업데이트 및 관리
✅ Kubernetes CronJob을 활용한 주기적 벡터 데이터 백업 & 최적화
✅ HPA(Auto Scaling) 적용으로 AI 검색 시스템의 자동 확장
'Data Engineering > Data Infra & Process' 카테고리의 다른 글
[15편] AI 모델을 활용한 벡터 데이터 분석 (0) | 2025.03.07 |
---|---|
[14편] 실시간 스트리밍 데이터와 pgvector 연동 (0) | 2025.03.07 |
[12편] 벡터 검색 성능 최적화 (HNSW & IVFFlat 비교 및 튜닝) (0) | 2025.03.07 |
[11편] 벡터 데이터 백업 & 복원 (데이터 유실 방지 및 관리) (0) | 2025.03.07 |
[10편] 대규모 벡터 데이터 관리 (샤딩 & 메모리 최적화) (0) | 2025.03.07 |