Data Engineering/python

EP26 | 고급 Python 활용 #15 | Spark를 활용한 실시간 데이터 파이프라인 구축

ygtoken 2025. 3. 19. 23:18
728x90

이 글에서 다루는 개념

Apache Spark는 대용량 데이터를 실시간으로 수집, 처리, 저장하는 데이터 파이프라인을 구축할 수 있습니다.
이번 글에서는 다음 내용을 학습합니다.

  • 데이터 파이프라인 개념 및 Spark 구조
  • Spark Streaming을 활용한 실시간 데이터 처리
  • Kafka와 연동하여 데이터 스트림 구축
  • Spark와 데이터베이스 연동하여 데이터 저장
  • ELT(Extract, Load, Transform) 및 ETL(Extract, Transform, Load) 비교

1️⃣ 데이터 파이프라인 개념 및 Spark 구조

📌 데이터 파이프라인이란?

  • 데이터를 수집(Extract) → 변환(Transform) → 저장(Load)하는 과정
  • 실시간 데이터 처리를 위해 Spark Streaming, Kafka, 데이터베이스와 연동 가능

📌 데이터 파이프라인 흐름

Kafka (데이터 수집) → Spark Streaming (데이터 처리) → 데이터베이스 (저장)

 

📌 ELT vs ETL 차이점

방식 설명
ETL 추출(Extract) → 변환(Transform) → 적재(Load) (전통적인 방식)
ELT 추출(Extract) → 적재(Load) → 변환(Transform) (대용량 데이터에서 유리)

2️⃣ Spark Streaming을 활용한 실시간 데이터 처리

📌 PySpark Streaming 환경 설정

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

spark = SparkSession.builder.appName("StreamingPipeline").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 5)  # 5초 단위로 배치 처리

 

📌 소켓 데이터 스트림 생성

lines = ssc.socketTextStream("localhost", 9999)

 

📌 데이터 변환 및 출력

processed = lines.map(lambda x: x.upper())  # 대문자로 변환
processed.pprint()

 

📌 스트리밍 실행

ssc.start()
ssc.awaitTermination()

 

📌 테스트 방법 (터미널에서 실행)

nc -lk 9999
  • 메시지 입력하면 Spark에서 변환된 데이터 출력

3️⃣ Kafka와 연동하여 데이터 스트림 구축

📌 Kafka Docker 설정 (docker-compose.yml)

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

 

📌 Kafka 실행

docker-compose up -d

 

📌 Kafka 토픽 생성

docker exec -it kafka kafka-topics.sh --create --topic logs --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

 

📌 Kafka 데이터 송신 (Producer)

docker exec -it kafka kafka-console-producer.sh --topic logs --bootstrap-server localhost:9092

 

📌 Kafka 데이터 수신 (Consumer)

docker exec -it kafka kafka-console-consumer.sh --topic logs --from-beginning --bootstrap-server localhost:9092

4️⃣ Spark Streaming을 사용하여 Kafka 데이터 처리

📌 Spark와 Kafka 연동 (pyspark.streaming.kafka 사용)

from pyspark.streaming.kafka import KafkaUtils

ssc = StreamingContext(sc, 5)

# Kafka 스트림 생성
kafkaStream = KafkaUtils.createDirectStream(ssc, ["logs"], {"metadata.broker.list": "localhost:9092"})

# 메시지 출력
messages = kafkaStream.map(lambda x: x[1])
messages.pprint()

ssc.start()
ssc.awaitTermination()

📌 Kafka에 메시지를 보내면 실시간으로 Spark에서 처리 가능


5️⃣ Spark와 데이터베이스 연동하여 데이터 저장

📌 PostgreSQL 설정 (Docker 사용)

docker run --name postgres -e POSTGRES_USER=user -e POSTGRES_PASSWORD=password -e POSTGRES_DB=mydb -p 5432:5432 -d postgres

 

📌 Spark DataFrame을 PostgreSQL에 저장

df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/mydb") \
    .option("dbtable", "logs") \
    .option("user", "user") \
    .option("password", "password") \
    .save()

 

📌 PostgreSQL에서 데이터 확인

docker exec -it postgres psql -U user -d mydb -c "SELECT * FROM logs;"

📌 실전 문제: Spark 실시간 데이터 파이프라인 연습하기


문제 1: Spark Streaming을 사용하여 TCP 소켓에서 데이터 수신

📌 포트 9999에서 데이터를 수신하고 출력하세요.

from pyspark.streaming import StreamingContext
# 🔽 여기에 코드 작성
ssc = StreamingContext(sc, 5)

lines = ssc.socketTextStream("localhost", 9999)
lines.pprint()

ssc.start()
ssc.awaitTermination()

문제 2: Kafka에서 메시지를 수신하여 Spark에서 처리

📌 Kafka의 logs 토픽에서 데이터를 읽고 출력하세요.

from pyspark.streaming.kafka import KafkaUtils
# 🔽 여기에 코드 작성
kafkaStream = KafkaUtils.createDirectStream(ssc, ["logs"], {"metadata.broker.list": "localhost:9092"})
messages = kafkaStream.map(lambda x: x[1])
messages.pprint()

문제 3: Kafka 데이터를 PostgreSQL에 저장

📌 Kafka에서 읽은 데이터를 PostgreSQL에 저장하세요.

# 🔽 여기에 코드 작성
from pyspark.sql import Row

def save_to_postgres(rdd):
    if not rdd.isEmpty():
        df = spark.createDataFrame(rdd.map(lambda x: Row(message=x)))
        df.write \
            .format("jdbc") \
            .option("url", "jdbc:postgresql://localhost:5432/mydb") \
            .option("dbtable", "logs") \
            .option("user", "user") \
            .option("password", "password") \
            .save()

messages.foreachRDD(save_to_postgres)

 

728x90