Data Engineering/Data Infra & Process

[14편] 실시간 스트리밍 데이터와 pgvector 연동

ygtoken 2025. 3. 7. 17:39
728x90

 

이 글에서는 실시간 스트리밍 데이터를 PostgreSQL pgvector와 연동하여 벡터 검색 시스템을 구축하는 방법을 다룹니다.

특히, Kafka 또는 Apache Pulsar를 활용하여 벡터 데이터를 지속적으로 저장하고, 실시간으로 AI 검색에 활용하는 전략을 설명합니다.

 

Kafka 또는 Pulsar를 활용한 실시간 벡터 데이터 삽입

pgvector와 스트리밍 데이터를 연동하여 실시간 검색 시스템 구축

성능 최적화를 위한 벡터 데이터 배치 처리 전략

 


🚀 1. 실시간 스트리밍 데이터와 pgvector를 연동하는 이유

 

실시간 벡터 검색 시스템 구축 시 고려해야 할 사항

1️⃣ AI 검색 시스템에서 데이터는 계속해서 추가 & 업데이트

2️⃣ 배치(Batch) 처리 방식보다 실시간(Streaming) 데이터 처리가 필요

3️⃣ Kafka 또는 Apache Pulsar를 활용하면 실시간 데이터를 안정적으로 수집 가능

 

주요 활용 사례

 

사용 사례설명

실시간 문서 검색 (RAG) 새롭게 추가된 문서를 pgvector에 저장 후 즉시 검색
추천 시스템 (E-commerce) 사용자의 행동 데이터를 실시간 벡터화하여 pgvector에 저장 후 추천
AI 챗봇 (실시간 응답 강화) 실시간 사용자 대화 데이터를 pgvector에 저장 후 검색

 

 


🚀 2. Kafka를 활용한 실시간 벡터 데이터 삽입

 

Kafka를 활용하면 대량의 데이터를 빠르게 수집하고, PostgreSQL pgvector에 실시간으로 저장할 수 있습니다.

 

📌 Kafka & PostgreSQL 연동 구조

[데이터 생성] → [Kafka Producer] → [Kafka Topic] → [Kafka Consumer] → [PostgreSQL `pgvector`]

 

 


🔹 1️⃣ Kafka 프로듀서(Producer) 설정

 

Kafka Producer는 실시간 데이터를 Kafka Topic으로 전송하는 역할을 합니다.

Python에서 Kafka Producer를 사용하여 텍스트 데이터를 Kafka로 전송하는 예제입니다.

 

📌 Python 라이브러리 설치

pip install kafka-python

 

📌 Kafka Producer 코드 (kafka_producer.py)

from kafka import KafkaProducer
import json

KAFKA_BROKER = "localhost:9092"
TOPIC_NAME = "vector-data"

producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def send_text_to_kafka(text):
    message = {"text": text}
    producer.send(TOPIC_NAME, message)
    producer.flush()
    print(f"Sent: {message}")

# 테스트 실행
send_text_to_kafka("This is a test message for vector storage.")

Kafka Producer를 실행하면 "vector-data" 토픽으로 문장이 전송됨

 


🔹 2️⃣ Kafka 컨슈머(Consumer) & 벡터 변환

 

Kafka Consumer는 Kafka에서 데이터를 읽어 pgvector에 저장하는 역할을 합니다.

실시간 문장을 Kafka에서 가져와 OpenAI 또는 Hugging Face 임베딩 모델을 활용하여 벡터로 변환 후 PostgreSQL에 저장합니다.

 

📌 Python Kafka Consumer 코드 (kafka_consumer.py)

from kafka import KafkaConsumer
import json
import psycopg2
import openai

KAFKA_BROKER = "localhost:9092"
TOPIC_NAME = "vector-data"

# PostgreSQL 연결 정보
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "ragdb"
DB_USER = "postgres"
DB_PASSWORD = "postgresql"

# OpenAI API 키 설정
openai.api_key = "your-openai-api-key"

def get_openai_embedding(text):
    response = openai.Embedding.create(input=text, model="text-embedding-ada-002")
    return response['data'][0]['embedding']

# PostgreSQL 연결
def get_db_connection():
    return psycopg2.connect(
        host=DB_HOST, port=DB_PORT,
        database=DB_NAME, user=DB_USER, password=DB_PASSWORD
    )

# Kafka Consumer 생성
consumer = KafkaConsumer(
    TOPIC_NAME,
    bootstrap_servers=KAFKA_BROKER,
    value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)

for message in consumer:
    text = message.value["text"]
    embedding = get_openai_embedding(text)

    conn = get_db_connection()
    cur = conn.cursor()

    # 벡터 데이터 삽입
    cur.execute("INSERT INTO embeddings (content, embedding) VALUES (%s, %s);", (text, embedding))
    conn.commit()
    
    cur.close()
    conn.close()
    print(f"Inserted: {text}")

Kafka Consumer를 실행하면 메시지를 받아 pgvector에 저장됨

 


🚀 3. 실시간 벡터 검색 API 구축

 

Kafka를 통해 실시간으로 저장된 벡터 데이터를 검색할 API를 FastAPI로 구축할 수 있습니다.

 

📌 Python FastAPI 코드 (search_api.py)

from fastapi import FastAPI, Query
import psycopg2

app = FastAPI()

# PostgreSQL 연결 정보
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "ragdb"
DB_USER = "postgres"
DB_PASSWORD = "postgresql"

def get_db_connection():
    return psycopg2.connect(
        host=DB_HOST, port=DB_PORT,
        database=DB_NAME, user=DB_USER, password=DB_PASSWORD
    )

@app.get("/search/")
def search_vector(query_text: str):
    conn = get_db_connection()
    cur = conn.cursor()

    cur.execute(
        "SELECT content, embedding <-> %s AS similarity FROM embeddings ORDER BY similarity LIMIT 5;",
        (query_text,)
    )
    results = cur.fetchall()

    cur.close()
    conn.close()

    return [{"content": row[0], "similarity": row[1]} for row in results]

 

📌 FastAPI 실행

uvicorn search_api:app --host 0.0.0.0 --port 8000 --reload

http://localhost:8000/search/?query_text=hello를 호출하면 가장 유사한 문장을 반환

 


🚀 4. 성능 최적화를 위한 배치 처리 전략

 

실시간 데이터를 처리할 때 Kafka Consumer가 메시지를 한 번에 여러 개 가져와 처리하면 성능이 향상됩니다.

 

📌 배치 크기 조정

consumer = KafkaConsumer(
    TOPIC_NAME,
    bootstrap_servers=KAFKA_BROKER,
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    fetch_max_bytes=10485760,  # 10MB까지 한 번에 가져오기
    max_poll_records=50  # 한 번에 최대 50개 메시지 가져오기
)

배치 크기를 조정하면 Kafka에서 벡터 데이터를 더 빠르게 처리 가능

 


📌 5. 최종 정리

 

Kafka 또는 Pulsar를 활용한 실시간 벡터 데이터 삽입

pgvector와 스트리밍 데이터를 연동하여 실시간 검색 시스템 구축

배치 크기 조정을 통한 성능 최적화

 

728x90