이 글에서는 실시간 스트리밍 데이터를 PostgreSQL pgvector와 연동하여 벡터 검색 시스템을 구축하는 방법을 다룹니다.
특히, Kafka 또는 Apache Pulsar를 활용하여 벡터 데이터를 지속적으로 저장하고, 실시간으로 AI 검색에 활용하는 전략을 설명합니다.
✅ Kafka 또는 Pulsar를 활용한 실시간 벡터 데이터 삽입
✅ pgvector와 스트리밍 데이터를 연동하여 실시간 검색 시스템 구축
✅ 성능 최적화를 위한 벡터 데이터 배치 처리 전략
🚀 1. 실시간 스트리밍 데이터와 pgvector를 연동하는 이유
✅ 실시간 벡터 검색 시스템 구축 시 고려해야 할 사항
1️⃣ AI 검색 시스템에서 데이터는 계속해서 추가 & 업데이트됨
2️⃣ 배치(Batch) 처리 방식보다 실시간(Streaming) 데이터 처리가 필요
3️⃣ Kafka 또는 Apache Pulsar를 활용하면 실시간 데이터를 안정적으로 수집 가능
✅ 주요 활용 사례
사용 사례설명
실시간 문서 검색 (RAG) | 새롭게 추가된 문서를 pgvector에 저장 후 즉시 검색 |
추천 시스템 (E-commerce) | 사용자의 행동 데이터를 실시간 벡터화하여 pgvector에 저장 후 추천 |
AI 챗봇 (실시간 응답 강화) | 실시간 사용자 대화 데이터를 pgvector에 저장 후 검색 |
🚀 2. Kafka를 활용한 실시간 벡터 데이터 삽입
Kafka를 활용하면 대량의 데이터를 빠르게 수집하고, PostgreSQL pgvector에 실시간으로 저장할 수 있습니다.
📌 Kafka & PostgreSQL 연동 구조
[데이터 생성] → [Kafka Producer] → [Kafka Topic] → [Kafka Consumer] → [PostgreSQL `pgvector`]
🔹 1️⃣ Kafka 프로듀서(Producer) 설정
Kafka Producer는 실시간 데이터를 Kafka Topic으로 전송하는 역할을 합니다.
Python에서 Kafka Producer를 사용하여 텍스트 데이터를 Kafka로 전송하는 예제입니다.
📌 Python 라이브러리 설치
pip install kafka-python
📌 Kafka Producer 코드 (kafka_producer.py)
from kafka import KafkaProducer
import json
KAFKA_BROKER = "localhost:9092"
TOPIC_NAME = "vector-data"
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKER,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def send_text_to_kafka(text):
message = {"text": text}
producer.send(TOPIC_NAME, message)
producer.flush()
print(f"Sent: {message}")
# 테스트 실행
send_text_to_kafka("This is a test message for vector storage.")
✅ Kafka Producer를 실행하면 "vector-data" 토픽으로 문장이 전송됨
🔹 2️⃣ Kafka 컨슈머(Consumer) & 벡터 변환
Kafka Consumer는 Kafka에서 데이터를 읽어 pgvector에 저장하는 역할을 합니다.
실시간 문장을 Kafka에서 가져와 OpenAI 또는 Hugging Face 임베딩 모델을 활용하여 벡터로 변환 후 PostgreSQL에 저장합니다.
📌 Python Kafka Consumer 코드 (kafka_consumer.py)
from kafka import KafkaConsumer
import json
import psycopg2
import openai
KAFKA_BROKER = "localhost:9092"
TOPIC_NAME = "vector-data"
# PostgreSQL 연결 정보
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "ragdb"
DB_USER = "postgres"
DB_PASSWORD = "postgresql"
# OpenAI API 키 설정
openai.api_key = "your-openai-api-key"
def get_openai_embedding(text):
response = openai.Embedding.create(input=text, model="text-embedding-ada-002")
return response['data'][0]['embedding']
# PostgreSQL 연결
def get_db_connection():
return psycopg2.connect(
host=DB_HOST, port=DB_PORT,
database=DB_NAME, user=DB_USER, password=DB_PASSWORD
)
# Kafka Consumer 생성
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=KAFKA_BROKER,
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
for message in consumer:
text = message.value["text"]
embedding = get_openai_embedding(text)
conn = get_db_connection()
cur = conn.cursor()
# 벡터 데이터 삽입
cur.execute("INSERT INTO embeddings (content, embedding) VALUES (%s, %s);", (text, embedding))
conn.commit()
cur.close()
conn.close()
print(f"Inserted: {text}")
✅ Kafka Consumer를 실행하면 메시지를 받아 pgvector에 저장됨
🚀 3. 실시간 벡터 검색 API 구축
Kafka를 통해 실시간으로 저장된 벡터 데이터를 검색할 API를 FastAPI로 구축할 수 있습니다.
📌 Python FastAPI 코드 (search_api.py)
from fastapi import FastAPI, Query
import psycopg2
app = FastAPI()
# PostgreSQL 연결 정보
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "ragdb"
DB_USER = "postgres"
DB_PASSWORD = "postgresql"
def get_db_connection():
return psycopg2.connect(
host=DB_HOST, port=DB_PORT,
database=DB_NAME, user=DB_USER, password=DB_PASSWORD
)
@app.get("/search/")
def search_vector(query_text: str):
conn = get_db_connection()
cur = conn.cursor()
cur.execute(
"SELECT content, embedding <-> %s AS similarity FROM embeddings ORDER BY similarity LIMIT 5;",
(query_text,)
)
results = cur.fetchall()
cur.close()
conn.close()
return [{"content": row[0], "similarity": row[1]} for row in results]
📌 FastAPI 실행
uvicorn search_api:app --host 0.0.0.0 --port 8000 --reload
✅ http://localhost:8000/search/?query_text=hello를 호출하면 가장 유사한 문장을 반환
🚀 4. 성능 최적화를 위한 배치 처리 전략
실시간 데이터를 처리할 때 Kafka Consumer가 메시지를 한 번에 여러 개 가져와 처리하면 성능이 향상됩니다.
📌 배치 크기 조정
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=KAFKA_BROKER,
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
fetch_max_bytes=10485760, # 10MB까지 한 번에 가져오기
max_poll_records=50 # 한 번에 최대 50개 메시지 가져오기
)
✅ 배치 크기를 조정하면 Kafka에서 벡터 데이터를 더 빠르게 처리 가능
📌 5. 최종 정리
✅ Kafka 또는 Pulsar를 활용한 실시간 벡터 데이터 삽입
✅ pgvector와 스트리밍 데이터를 연동하여 실시간 검색 시스템 구축
✅ 배치 크기 조정을 통한 성능 최적화
'Data Engineering > Data Infra & Process' 카테고리의 다른 글
[16편] pgvector + LangChain을 활용한 AI 챗봇 구축 (0) | 2025.03.16 |
---|---|
[15편] AI 모델을 활용한 벡터 데이터 분석 (0) | 2025.03.07 |
[13편] 운영 자동화 (Airflow & Kubernetes) (0) | 2025.03.07 |
[12편] 벡터 검색 성능 최적화 (HNSW & IVFFlat 비교 및 튜닝) (0) | 2025.03.07 |
[11편] 벡터 데이터 백업 & 복원 (데이터 유실 방지 및 관리) (0) | 2025.03.07 |