728x90
이 글에서 다루는 개념
Apache Spark는 대용량 데이터를 실시간으로 수집, 처리, 저장하는 데이터 파이프라인을 구축할 수 있습니다.
이번 글에서는 다음 내용을 학습합니다.
- 데이터 파이프라인 개념 및 Spark 구조
- Spark Streaming을 활용한 실시간 데이터 처리
- Kafka와 연동하여 데이터 스트림 구축
- Spark와 데이터베이스 연동하여 데이터 저장
- ELT(Extract, Load, Transform) 및 ETL(Extract, Transform, Load) 비교
1️⃣ 데이터 파이프라인 개념 및 Spark 구조
📌 데이터 파이프라인이란?
- 데이터를 수집(Extract) → 변환(Transform) → 저장(Load)하는 과정
- 실시간 데이터 처리를 위해 Spark Streaming, Kafka, 데이터베이스와 연동 가능
📌 데이터 파이프라인 흐름
Kafka (데이터 수집) → Spark Streaming (데이터 처리) → 데이터베이스 (저장)
📌 ELT vs ETL 차이점
방식 | 설명 |
ETL | 추출(Extract) → 변환(Transform) → 적재(Load) (전통적인 방식) |
ELT | 추출(Extract) → 적재(Load) → 변환(Transform) (대용량 데이터에서 유리) |
2️⃣ Spark Streaming을 활용한 실시간 데이터 처리
📌 PySpark Streaming 환경 설정
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
spark = SparkSession.builder.appName("StreamingPipeline").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 5) # 5초 단위로 배치 처리
📌 소켓 데이터 스트림 생성
lines = ssc.socketTextStream("localhost", 9999)
📌 데이터 변환 및 출력
processed = lines.map(lambda x: x.upper()) # 대문자로 변환
processed.pprint()
📌 스트리밍 실행
ssc.start()
ssc.awaitTermination()
📌 테스트 방법 (터미널에서 실행)
nc -lk 9999
- 메시지 입력하면 Spark에서 변환된 데이터 출력
3️⃣ Kafka와 연동하여 데이터 스트림 구축
📌 Kafka Docker 설정 (docker-compose.yml)
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
📌 Kafka 실행
docker-compose up -d
📌 Kafka 토픽 생성
docker exec -it kafka kafka-topics.sh --create --topic logs --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
📌 Kafka 데이터 송신 (Producer)
docker exec -it kafka kafka-console-producer.sh --topic logs --bootstrap-server localhost:9092
📌 Kafka 데이터 수신 (Consumer)
docker exec -it kafka kafka-console-consumer.sh --topic logs --from-beginning --bootstrap-server localhost:9092
4️⃣ Spark Streaming을 사용하여 Kafka 데이터 처리
📌 Spark와 Kafka 연동 (pyspark.streaming.kafka 사용)
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc, 5)
# Kafka 스트림 생성
kafkaStream = KafkaUtils.createDirectStream(ssc, ["logs"], {"metadata.broker.list": "localhost:9092"})
# 메시지 출력
messages = kafkaStream.map(lambda x: x[1])
messages.pprint()
ssc.start()
ssc.awaitTermination()
📌 Kafka에 메시지를 보내면 실시간으로 Spark에서 처리 가능
5️⃣ Spark와 데이터베이스 연동하여 데이터 저장
📌 PostgreSQL 설정 (Docker 사용)
docker run --name postgres -e POSTGRES_USER=user -e POSTGRES_PASSWORD=password -e POSTGRES_DB=mydb -p 5432:5432 -d postgres
📌 Spark DataFrame을 PostgreSQL에 저장
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "logs") \
.option("user", "user") \
.option("password", "password") \
.save()
📌 PostgreSQL에서 데이터 확인
docker exec -it postgres psql -U user -d mydb -c "SELECT * FROM logs;"
📌 실전 문제: Spark 실시간 데이터 파이프라인 연습하기
✅ 문제 1: Spark Streaming을 사용하여 TCP 소켓에서 데이터 수신
📌 포트 9999에서 데이터를 수신하고 출력하세요.
from pyspark.streaming import StreamingContext
# 🔽 여기에 코드 작성
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream("localhost", 9999)
lines.pprint()
ssc.start()
ssc.awaitTermination()
✅ 문제 2: Kafka에서 메시지를 수신하여 Spark에서 처리
📌 Kafka의 logs 토픽에서 데이터를 읽고 출력하세요.
from pyspark.streaming.kafka import KafkaUtils
# 🔽 여기에 코드 작성
kafkaStream = KafkaUtils.createDirectStream(ssc, ["logs"], {"metadata.broker.list": "localhost:9092"})
messages = kafkaStream.map(lambda x: x[1])
messages.pprint()
✅ 문제 3: Kafka 데이터를 PostgreSQL에 저장
📌 Kafka에서 읽은 데이터를 PostgreSQL에 저장하세요.
# 🔽 여기에 코드 작성
from pyspark.sql import Row
def save_to_postgres(rdd):
if not rdd.isEmpty():
df = spark.createDataFrame(rdd.map(lambda x: Row(message=x)))
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/mydb") \
.option("dbtable", "logs") \
.option("user", "user") \
.option("password", "password") \
.save()
messages.foreachRDD(save_to_postgres)
728x90
'Data Engineering > python' 카테고리의 다른 글
EP28 | 고급 Python 활용 #17 | Spark와 Hadoop을 결합한 대규모 데이터 분석 아키텍처 (0) | 2025.03.19 |
---|---|
EP27 | 고급 Python 활용 #16 | Spark를 활용한 머신러닝 기반 이상 탐지 시스템 구축 (0) | 2025.03.19 |
EP25 | 고급 Python 활용 #14 | Spark를 활용한 실시간 추천 시스템 구축 (0) | 2025.03.19 |
EP24 | 고급 Python 활용 #13 | Spark GraphX를 활용한 그래프 데이터 분석 (0) | 2025.03.19 |
EP23 | 고급 Python 활용 #12 | Spark MLlib을 활용한 머신러닝 데이터 분석 (0) | 2025.03.19 |