728x90
이 글에서 다루는 개념
Apache Spark Streaming은 실시간 데이터 스트리밍을 처리하는 프레임워크입니다.
이번 글에서는 다음 내용을 학습합니다.
- Spark Streaming 개념 및 아키텍처
- PySpark Streaming 환경 설정
- 소켓 데이터 스트림 처리 (socketTextStream)
- Kafka와 Spark Streaming 연동
- 실시간 데이터 집계 및 분석
1️⃣ Spark Streaming이란?
📌 Spark Streaming이란?
- 실시간 데이터 처리 프레임워크
- 데이터 스트림을 작은 배치(batch) 단위로 처리하여 분석 가능
- Kafka, Flume, Kinesis, Socket 등 다양한 데이터 소스로부터 스트림 데이터 수집 가능
📌 Spark Streaming 아키텍처
개념 | 설명 |
DStream | Discretized Stream, Spark에서 스트림 데이터를 처리하는 단위 |
Receiver | 외부 소스(Kafka, Socket 등)에서 데이터를 수집 |
Batch Interval | 데이터를 처리하는 주기 (예: 1초 단위) |
2️⃣ PySpark Streaming 환경 설정
📌 Spark Streaming 실행을 위해 PySpark 설치 (pyspark 포함됨)
pip install pyspark
📌 Spark StreamingContext 생성
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# SparkContext 및 StreamingContext 생성 (5초 간격으로 배치)
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 5)
- "local[2]" → CPU 2개 사용
- batchInterval=5 → 5초 단위로 데이터를 처리
3️⃣ 소켓 데이터 스트림 처리 (socketTextStream)
📌 TCP 소켓을 통해 스트림 데이터 수집
lines = ssc.socketTextStream("localhost", 9999)
# 단어 분할 및 개수 계산
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts.pprint() # 콘솔에 출력
📌 스트리밍 시작 및 종료
ssc.start() # 스트리밍 시작
ssc.awaitTermination() # 프로그램 종료 방지
📌 서버 실행 후 테스트
- 새로운 터미널에서 간단한 서버 실행
nc -lk 9999
- Spark Streaming 실행 후, 터미널에서 문자 입력하면 해당 단어 개수가 실시간으로 출력됨
4️⃣ Kafka와 Spark Streaming 연동
📌 Kafka 설치 및 실행 (docker-compose 활용)
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
docker-compose up -d
📌 Kafka 토픽 생성
docker exec -it kafka kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
📌 Kafka Producer 실행 (kafka-console-producer.sh)
docker exec -it kafka kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
📌 Kafka Consumer 실행 (kafka-console-consumer.sh)
docker exec -it kafka kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
📌 Spark Streaming에서 Kafka 데이터 수신
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="KafkaStream")
ssc = StreamingContext(sc, 5)
# Kafka 스트림 연결
kafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], {"metadata.broker.list": "localhost:9092"})
# 메시지 출력
messages = kafkaStream.map(lambda x: x[1])
messages.pprint()
ssc.start()
ssc.awaitTermination()
5️⃣ 실시간 데이터 집계 및 분석
📌 누적 단어 개수 계산 (updateStateByKey())
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
word_counts = words.map(lambda word: (word, 1))
cumulative_counts = word_counts.updateStateByKey(updateFunc)
cumulative_counts.pprint()
📌 이전 배치 데이터를 유지하면서 누적 개수 출력
📌 실전 문제: Spark Streaming 연습하기
✅ 문제 1: socketTextStream()을 사용하여 실시간 데이터 수집
📌 로컬 TCP 소켓(포트 9999)에서 데이터를 수집하고 출력하세요.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 🔽 여기에 코드 작성
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "SocketStream")
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream("localhost", 9999)
lines.pprint()
ssc.start()
ssc.awaitTermination()
✅ 문제 2: 단어 개수 계산 (reduceByKey())
📌 실시간 데이터에서 단어 개수를 계산하고 출력하세요.
# 🔽 여기에 코드 작성
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts.pprint()
✅ 문제 3: Kafka에서 데이터 수집 및 출력
📌 Kafka test-topic에서 메시지를 읽고 출력하세요.
from pyspark.streaming.kafka import KafkaUtils
# 🔽 여기에 코드 작성
kafkaStream = KafkaUtils.createDirectStream(ssc, ["test-topic"], {"metadata.broker.list": "localhost:9092"})
messages = kafkaStream.map(lambda x: x[1])
messages.pprint()
✅ 문제 4: 누적 단어 개수(updateStateByKey()) 계산하기
📌 단어 개수를 누적하여 유지하면서 업데이트하세요.
# 🔽 여기에 코드 작성
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
cumulative_counts = word_counts.updateStateByKey(updateFunc)
cumulative_counts.pprint()
728x90
'Data Engineering > python' 카테고리의 다른 글
EP24 | 고급 Python 활용 #13 | Spark GraphX를 활용한 그래프 데이터 분석 (0) | 2025.03.19 |
---|---|
EP23 | 고급 Python 활용 #12 | Spark MLlib을 활용한 머신러닝 데이터 분석 (0) | 2025.03.19 |
EP21 | 고급 Python 활용 #10 | Spark와 Python을 활용한 대용량 데이터 처리 (0) | 2025.03.19 |
EP20 | 고급 Python 활용 #9 | 데이터 자동화 및 작업 스케줄링 (Airflow) (0) | 2025.03.19 |
EP19 | 고급 Python 활용 #8 | 웹 스크래핑 (BeautifulSoup, Selenium) (2) | 2025.03.19 |